import asyncio import hashlib import math import time from aiohttp import web async def upload_file(request): reader = await request.multipart() field = await reader.next() assert field.name == 'file' start_time = time.time() filename = field.filename file_size = 0 sha1_hash = hashlib.sha1() md5_hash = hashlib.md5() while True: chunk = await field.read_chunk() if not chunk: break file_size += len(chunk) sha1_hash.update(chunk) md5_hash.update(chunk) size_suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] size_suffix_index = math.floor(math.log(file_size, 1024)) human_readable_size = f'{file_size / (1024 ** size_suffix_index):.2f} {size_suffixes[size_suffix_index]}' # await asyncio.sleep(2) response_data = { 'meta': { 'name': filename, 'sizeb': file_size, 'sizeh': human_readable_size, 'sha1': sha1_hash.hexdigest(), 'md5': md5_hash.hexdigest() } } processing_time = time.time() - start_time minutes, seconds = divmod(processing_time, 60) milliseconds = processing_time - int(processing_time) response_data['meta']['duration'] = f"{int(minutes):02d}:{int(seconds):02d}.{int(milliseconds * 1000):03d}" return web.json_response(response_data) async def serve_index(request): return web.FileResponse('index.html') async def serve_static(request): path = request.path[1:] if not path.startswith('static/'): return web.HTTPNotFound() return web.FileResponse(path) async def main(): app = web.Application() app.add_routes([ web.get('/', serve_index), web.post('/api/upload_file', upload_file), web.get('/static/{tail:.*}', serve_static) ]) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() print('binhop is running at http://localhost:8080') await asyncio.Event().wait() if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) asyncio.run(main())