2023-05-19 17:34:59 -07:00
#!/usr/bin/env python3
2023-05-19 14:22:34 -07:00
import asyncio
import hashlib
import time
2023-05-19 17:34:59 -07:00
import tempfile
import binwalk
import os
2023-05-19 14:22:34 -07:00
from aiohttp import web
2023-05-19 17:34:59 -07:00
async def scan_file ( filename , base_dir ) :
try :
scan = binwalk . scan ( filename , signature = True , quiet = True , extract = True , matryoshka = True , remove_after_execute = False , directory = base_dir )
return scan
except binwalk . ModuleException as e :
print ( " Critical failure: " , e )
async def build_listing ( path ) :
result = { }
for item in os . listdir ( path ) :
item_path = os . path . join ( path , item )
if os . path . isdir ( item_path ) :
result [ item ] = await build_listing ( item_path )
else :
result [ item ] = { " s " : os . path . getsize ( item_path ) }
print ( item )
return result
2023-05-19 14:22:34 -07:00
async def upload_file ( request ) :
reader = await request . multipart ( )
field = await reader . next ( )
assert field . name == ' file '
start_time = time . time ( )
filename = field . filename
file_size = 0
sha1_hash = hashlib . sha1 ( )
md5_hash = hashlib . md5 ( )
2023-05-19 17:34:59 -07:00
temp_file = tempfile . NamedTemporaryFile ( mode = ' ab ' , delete = False )
2023-05-19 14:22:34 -07:00
while True :
chunk = await field . read_chunk ( )
if not chunk :
break
2023-05-19 17:34:59 -07:00
temp_file . write ( chunk )
2023-05-19 14:22:34 -07:00
file_size + = len ( chunk )
sha1_hash . update ( chunk )
md5_hash . update ( chunk )
2023-05-19 17:34:59 -07:00
try :
working_dir = tempfile . TemporaryDirectory ( ignore_cleanup_errors = True )
scan = await scan_file ( temp_file . name , working_dir . name )
sigs = scan [ 0 ]
extractor = sigs . extractor . output
finally :
os . unlink ( temp_file . name )
2023-05-19 14:22:34 -07:00
2023-05-19 17:34:59 -07:00
carved = [ ]
for sig in sigs . results :
tmp_path = sig . file . path
if tmp_path in extractor :
if sig . offset in extractor [ tmp_path ] . carved :
end_offset = sig . offset + os . path . getsize ( extractor [ tmp_path ] . carved [ sig . offset ] )
print ( " Carved data from offsets 0x %X -0x %X to %s " % ( sig . offset , end_offset , extractor [ tmp_path ] . carved [ sig . offset ] ) )
carved . append ( { " start " : sig . offset , " end " : end_offset , " d " : sig . description } )
if sig . offset in extractor [ tmp_path ] . extracted :
extracted_files = [ x for x in extractor [ tmp_path ] . extracted [ sig . offset ] . files if os . path . isfile ( x ) ]
extracted_dirs = [ x for x in extractor [ tmp_path ] . extracted [ sig . offset ] . files if os . path . isdir ( x ) ]
print ( " Extracted %d files and %d directories from offset 0x %X to ' %s ' using ' %s ' " % ( len ( extracted_files ) , len ( extracted_dirs ) , sig . offset , extractor [ tmp_path ] . extracted [ sig . offset ] . files [ 0 ] , sigs . extractor . output [ tmp_path ] . extracted [ sig . offset ] . command ) )
for i in extractor [ tmp_path ] . extracted [ sig . offset ] . files :
print ( f " File: { i } " )
# listing = await build_listing(working_dir.name)
# print(listing)
working_dir . cleanup ( )
2023-05-19 14:22:34 -07:00
response_data = {
' meta ' : {
' name ' : filename ,
' sizeb ' : file_size ,
' sha1 ' : sha1_hash . hexdigest ( ) ,
2023-05-19 17:34:59 -07:00
' md5 ' : md5_hash . hexdigest ( ) ,
' sig_quant ' : len ( sigs . magic . signatures )
} ,
' offsets ' : carved
2023-05-19 14:22:34 -07:00
}
processing_time = time . time ( ) - start_time
minutes , seconds = divmod ( processing_time , 60 )
milliseconds = processing_time - int ( processing_time )
response_data [ ' meta ' ] [ ' duration ' ] = f " { int ( minutes ) : 02d } : { int ( seconds ) : 02d } . { int ( milliseconds * 1000 ) : 03d } "
return web . json_response ( response_data )
async def serve_index ( request ) :
return web . FileResponse ( ' index.html ' )
async def serve_static ( request ) :
path = request . path [ 1 : ]
if not path . startswith ( ' static/ ' ) :
return web . HTTPNotFound ( )
return web . FileResponse ( path )
async def main ( ) :
app = web . Application ( )
app . add_routes ( [
web . get ( ' / ' , serve_index ) ,
web . post ( ' /api/upload_file ' , upload_file ) ,
web . get ( ' /static/ { tail:.*} ' , serve_static )
] )
runner = web . AppRunner ( app )
await runner . setup ( )
site = web . TCPSite ( runner , ' localhost ' , 8080 )
await site . start ( )
print ( ' binhop is running at http://localhost:8080 ' )
await asyncio . Event ( ) . wait ( )
if __name__ == ' __main__ ' :
loop = asyncio . new_event_loop ( )
asyncio . set_event_loop ( loop )
asyncio . run ( main ( ) )