#!/usr/bin/env python3 import asyncio import hashlib import time import tempfile import binwalk import os from aiohttp_compress import compress_middleware from aiohttp import web from typing import List, Tuple, Dict, Union import argparse async def scan_file(filename: str, base_dir: str) -> List: try: scan = binwalk.scan( filename, signature=True, quiet=True, extract=True, matryoshka=True, remove_after_execute=False, directory=base_dir, ) return scan except binwalk.ModuleException as e: print("Critical failure: ", e) async def build_listing(path: str) -> Tuple[int, int, Dict]: result = {} num_files, num_dirs = 0, 0 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): files, dirs, result[item] = await build_listing(item_path) num_files += files num_dirs += 1 + dirs else: result[item] = {"s": os.path.getsize(item_path)} num_files += 1 return num_files, num_dirs, result async def upload_file(request: web.Request) -> web.json_response: reader = await request.multipart() field = await reader.next() assert field.name == "file" start_time = time.time() filename = field.filename file_size = 0 sha1_hash = hashlib.sha1() md5_hash = hashlib.md5() temp_file = tempfile.NamedTemporaryFile(mode="ab", delete=False) while True: chunk = await field.read_chunk() if not chunk: break temp_file.write(chunk) file_size += len(chunk) sha1_hash.update(chunk) md5_hash.update(chunk) with tempfile.TemporaryDirectory() as working_dir: try: scan = await scan_file(temp_file.name, working_dir) sigs = scan[0] extractor = sigs.extractor.output finally: os.unlink(temp_file.name) carved, summary = [], [] for sig in sigs.results: tmp_path = sig.file.path summary.append("%s 0x%.8X %s" % (sig.file.path, sig.offset, sig.description)) if tmp_path in extractor: if sig.offset in extractor[tmp_path].carved: end_offset = sig.offset + os.path.getsize(extractor[tmp_path].carved[sig.offset]) summary.append( "Carved data from offsets 0x%X-0x%X to %s" % (sig.offset, end_offset, extractor[tmp_path].carved[sig.offset]) ) carved.append({"start": sig.offset, "end": end_offset, "d": sig.description}) if sig.offset in extractor[tmp_path].extracted: extracted_files = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isfile(x)] extracted_dirs = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isdir(x)] summary.append( "Extracted %d files and %d directories from offset 0x%X to using '%s'" % ( len(extracted_files), len(extracted_dirs), sig.offset, sigs.extractor.output[tmp_path].extracted[sig.offset].command, ) ) num_files, num_dirs, listing = await build_listing(working_dir) response_data = { "meta": { "name": filename, "sizeb": file_size, "sha1": sha1_hash.hexdigest(), "md5": md5_hash.hexdigest(), "sig_quant": len(sigs.magic.signatures), "files": num_files, "dirs": num_dirs, }, "offsets": carved, "ls": listing, "ql": summary, } processing_time = time.time() - start_time minutes, seconds = divmod(processing_time, 60) milliseconds = processing_time - int(processing_time) response_data["meta"]["duration"] = f"{int(minutes):02d}:{int(seconds):02d}.{int(milliseconds * 1000):03d}" return web.json_response(response_data) async def serve_index(request: web.Request) -> web.FileResponse: return web.FileResponse("index.html") async def serve_static(request: web.Request) -> Union[web.FileResponse, web.HTTPNotFound]: path = request.path[1:] if not path.startswith("static/"): return web.HTTPNotFound() return web.FileResponse(path) async def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--port", "-p", type=int, default=8080, help="Port to serve on") args = parser.parse_args() app = web.Application() app.middlewares.append(compress_middleware) app.add_routes( [ web.get("/", serve_index), web.post("/api/upload_file", upload_file), web.get("/static/{tail:.*}", serve_static), ] ) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", args.port) await site.start() print(f"binhop is running at http://localhost:{args.port}") await asyncio.Event().wait() if __name__ == "__main__": loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) asyncio.run(main())