mirror of
https://github.com/DarrylNixon/binhop
synced 2024-04-22 12:37:06 -07:00
163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import time
|
|
import tempfile
|
|
import binwalk
|
|
import os
|
|
from aiohttp_compress import compress_middleware
|
|
from aiohttp import web
|
|
from typing import List, Tuple, Dict, Union
|
|
import argparse
|
|
|
|
|
|
async def scan_file(filename: str, base_dir: str) -> List:
|
|
try:
|
|
scan = binwalk.scan(
|
|
filename,
|
|
signature=True,
|
|
quiet=True,
|
|
extract=True,
|
|
matryoshka=True,
|
|
remove_after_execute=False,
|
|
directory=base_dir,
|
|
)
|
|
return scan
|
|
except binwalk.ModuleException as e:
|
|
print("Critical failure: ", e)
|
|
|
|
|
|
async def build_listing(path: str) -> Tuple[int, int, Dict]:
|
|
result = {}
|
|
num_files, num_dirs = 0, 0
|
|
for item in os.listdir(path):
|
|
item_path = os.path.join(path, item)
|
|
if os.path.isdir(item_path):
|
|
files, dirs, result[item] = await build_listing(item_path)
|
|
num_files += files
|
|
num_dirs += 1 + dirs
|
|
else:
|
|
result[item] = {"s": os.path.getsize(item_path)}
|
|
num_files += 1
|
|
return num_files, num_dirs, result
|
|
|
|
|
|
async def upload_file(request: web.Request) -> web.json_response:
|
|
reader = await request.multipart()
|
|
field = await reader.next()
|
|
assert field.name == "file"
|
|
|
|
start_time = time.time()
|
|
filename = field.filename
|
|
file_size = 0
|
|
sha1_hash = hashlib.sha1()
|
|
md5_hash = hashlib.md5()
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(mode="ab", delete=False)
|
|
|
|
while True:
|
|
chunk = await field.read_chunk()
|
|
if not chunk:
|
|
break
|
|
temp_file.write(chunk)
|
|
file_size += len(chunk)
|
|
sha1_hash.update(chunk)
|
|
md5_hash.update(chunk)
|
|
|
|
with tempfile.TemporaryDirectory() as working_dir:
|
|
try:
|
|
scan = await scan_file(temp_file.name, working_dir)
|
|
sigs = scan[0]
|
|
extractor = sigs.extractor.output
|
|
finally:
|
|
os.unlink(temp_file.name)
|
|
|
|
carved, summary = [], []
|
|
for sig in sigs.results:
|
|
tmp_path = sig.file.path
|
|
summary.append("%s 0x%.8X %s" % (sig.file.path, sig.offset, sig.description))
|
|
if tmp_path in extractor:
|
|
if sig.offset in extractor[tmp_path].carved:
|
|
end_offset = sig.offset + os.path.getsize(extractor[tmp_path].carved[sig.offset])
|
|
summary.append(
|
|
"Carved data from offsets 0x%X-0x%X to %s"
|
|
% (sig.offset, end_offset, extractor[tmp_path].carved[sig.offset])
|
|
)
|
|
carved.append({"start": sig.offset, "end": end_offset, "d": sig.description})
|
|
if sig.offset in extractor[tmp_path].extracted:
|
|
extracted_files = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isfile(x)]
|
|
extracted_dirs = [x for x in extractor[tmp_path].extracted[sig.offset].files if os.path.isdir(x)]
|
|
summary.append(
|
|
"Extracted %d files and %d directories from offset 0x%X to using '%s'"
|
|
% (
|
|
len(extracted_files),
|
|
len(extracted_dirs),
|
|
sig.offset,
|
|
sigs.extractor.output[tmp_path].extracted[sig.offset].command,
|
|
)
|
|
)
|
|
num_files, num_dirs, listing = await build_listing(working_dir)
|
|
|
|
response_data = {
|
|
"meta": {
|
|
"name": filename,
|
|
"sizeb": file_size,
|
|
"sha1": sha1_hash.hexdigest(),
|
|
"md5": md5_hash.hexdigest(),
|
|
"sig_quant": len(sigs.magic.signatures),
|
|
"files": num_files,
|
|
"dirs": num_dirs,
|
|
},
|
|
"offsets": carved,
|
|
"ls": listing,
|
|
"ql": summary,
|
|
}
|
|
|
|
processing_time = time.time() - start_time
|
|
minutes, seconds = divmod(processing_time, 60)
|
|
milliseconds = processing_time - int(processing_time)
|
|
response_data["meta"]["duration"] = f"{int(minutes):02d}:{int(seconds):02d}.{int(milliseconds * 1000):03d}"
|
|
return web.json_response(response_data)
|
|
|
|
|
|
async def serve_index(request: web.Request) -> web.FileResponse:
|
|
return web.FileResponse("index.html")
|
|
|
|
|
|
async def serve_static(request: web.Request) -> Union[web.FileResponse, web.HTTPNotFound]:
|
|
path = request.path[1:]
|
|
if not path.startswith("static/"):
|
|
return web.HTTPNotFound()
|
|
return web.FileResponse(path)
|
|
|
|
|
|
async def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--port", "-p", type=int, default=8080, help="Port to serve on")
|
|
args = parser.parse_args()
|
|
|
|
app = web.Application()
|
|
app.middlewares.append(compress_middleware)
|
|
|
|
app.add_routes(
|
|
[
|
|
web.get("/", serve_index),
|
|
web.post("/api/upload_file", upload_file),
|
|
web.get("/static/{tail:.*}", serve_static),
|
|
]
|
|
)
|
|
|
|
runner = web.AppRunner(app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, "0.0.0.0", args.port)
|
|
await site.start()
|
|
|
|
print(f"binhop is running at http://localhost:{args.port}")
|
|
await asyncio.Event().wait()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
asyncio.run(main())
|