binhop/binhop.py

159 lines
4.8 KiB
Python
Raw Normal View History

2023-05-19 17:34:59 -07:00
#!/usr/bin/env python3
2023-05-19 14:22:34 -07:00
import asyncio
import hashlib
import time
2023-05-19 17:34:59 -07:00
import tempfile
import binwalk
import os
from aiohttp_compress import compress_middleware
2023-05-19 14:22:34 -07:00
from aiohttp import web
2023-05-19 17:34:59 -07:00
async def scan_file(filename, base_dir):
try:
scan = binwalk.scan(
filename,
signature=True,
quiet=True,
extract=True,
matryoshka=True,
remove_after_execute=False,
directory=base_dir,
)
2023-05-19 17:34:59 -07:00
return scan
except binwalk.ModuleException as e:
print("Critical failure: ", e)
2023-05-19 17:34:59 -07:00
async def build_listing(path):
result = {}
num_files, num_dirs = 0, 0
2023-05-19 17:34:59 -07:00
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
files, dirs, result[item] = await build_listing(item_path)
num_files += files
num_dirs += 1 + dirs
2023-05-19 17:34:59 -07:00
else:
result[item] = {"s": os.path.getsize(item_path)}
num_files += 1
return num_files, num_dirs, result
2023-05-19 17:34:59 -07:00
2023-05-19 14:22:34 -07:00
async def upload_file(request):
reader = await request.multipart()
field = await reader.next()
assert field.name == "file"
2023-05-19 14:22:34 -07:00
start_time = time.time()
filename = field.filename
file_size = 0
sha1_hash = hashlib.sha1()
md5_hash = hashlib.md5()
temp_file = tempfile.NamedTemporaryFile(mode="ab", delete=False)
2023-05-19 17:34:59 -07:00
2023-05-19 14:22:34 -07:00
while True:
chunk = await field.read_chunk()
if not chunk:
break
2023-05-19 17:34:59 -07:00
temp_file.write(chunk)
2023-05-19 14:22:34 -07:00
file_size += len(chunk)
sha1_hash.update(chunk)
md5_hash.update(chunk)
2023-05-19 17:34:59 -07:00
try:
working_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
scan = await scan_file(temp_file.name, working_dir.name)
sigs = scan[0]
extractor = sigs.extractor.output
finally:
os.unlink(temp_file.name)
2023-05-19 14:22:34 -07:00
carved, summary = [], []
2023-05-19 17:34:59 -07:00
for sig in sigs.results:
tmp_path = sig.file.path
if tmp_path in extractor:
if sig.offset in extractor[tmp_path].carved:
end_offset = sig.offset + os.path.getsize(extractor[tmp_path].carved[sig.offset])
summary.append(
"Carved data from offsets 0x%X-0x%X to %s"
% (sig.offset, end_offset, extractor[tmp_path].carved[sig.offset])
)
2023-05-19 17:34:59 -07:00
carved.append({"start": sig.offset, "end": end_offset, "d": sig.description})
if sig.offset in extractor[tmp_path].extracted:
extracted_files = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isfile(x)]
extracted_dirs = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isdir(x)]
summary.append(
"Extracted %d files and %d directories from offset 0x%X to '%s' using '%s'"
% (
len(extracted_files),
len(extracted_dirs),
sig.offset,
extractor[tmp_path].extracted[sig.offset].files[0],
sigs.extractor.output[tmp_path].extracted[sig.offset].command,
)
)
num_files, num_dirs, listing = await build_listing(working_dir.name)
2023-05-19 17:34:59 -07:00
working_dir.cleanup()
2023-05-19 14:22:34 -07:00
response_data = {
"meta": {
"name": filename,
"sizeb": file_size,
"sha1": sha1_hash.hexdigest(),
"md5": md5_hash.hexdigest(),
"sig_quant": len(sigs.magic.signatures),
"files": num_files,
"dirs": num_dirs,
2023-05-19 17:34:59 -07:00
},
"offsets": carved,
"ls": listing,
"ql": summary,
2023-05-19 14:22:34 -07:00
}
processing_time = time.time() - start_time
minutes, seconds = divmod(processing_time, 60)
milliseconds = processing_time - int(processing_time)
response_data["meta"]["duration"] = f"{int(minutes):02d}:{int(seconds):02d}.{int(milliseconds * 1000):03d}"
2023-05-19 14:22:34 -07:00
return web.json_response(response_data)
async def serve_index(request):
return web.FileResponse("index.html")
2023-05-19 14:22:34 -07:00
async def serve_static(request):
path = request.path[1:]
if not path.startswith("static/"):
2023-05-19 14:22:34 -07:00
return web.HTTPNotFound()
return web.FileResponse(path)
async def main():
app = web.Application()
app.middlewares.append(compress_middleware)
2023-05-19 14:22:34 -07:00
app.add_routes(
[
web.get("/", serve_index),
web.post("/api/upload_file", upload_file),
web.get("/static/{tail:.*}", serve_static),
]
)
2023-05-19 14:22:34 -07:00
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 8080)
2023-05-19 14:22:34 -07:00
await site.start()
print("binhop is running at http://localhost:8080")
2023-05-19 14:22:34 -07:00
await asyncio.Event().wait()
if __name__ == "__main__":
2023-05-19 14:22:34 -07:00
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(main())