drawbridge/drawbridge/drawbridge.py
2023-07-02 14:00:34 -07:00

64 lines
1.9 KiB
Python

import asyncio
import atexit
from typing import Callable
from typing import Optional
import fnfqueue
from .net_queue import NetQueue
from .utils.logger import logger
class DrawBridge:
def __init__(self):
self.net_queues = []
atexit.register(self._delete_rules)
def add_queue(
self,
queue: int,
callback: Callable,
src_ip: Optional[str] = None,
dst_ip: Optional[str] = None,
src_port: Optional[int] = None,
dst_port: Optional[int] = None,
protocol: Optional[str] = "",
override: bool = False,
):
try:
new_queue = NetQueue(queue, callback, src_ip, dst_ip, src_port, dst_port, protocol, override)
new_queue.write_rule()
except Exception as e:
logger.error(f"Failed to initialize NetQueue: {e}")
raise e
self.net_queues.append(new_queue)
def run(self):
asyncio.run(self.raise_bridges())
async def _listen(self, listener, callback: Callable) -> None:
for packet in listener:
original = packet.payload
if asyncio.iscoroutinefunction(callback):
packet.payload = await callback(packet.payload)
else:
packet.payload = callback(packet.payload)
if packet.payload != original:
packet.mangle()
def _delete_rules(self):
for queue in self.net_queues:
try:
queue.delete_rule()
except Exception as e:
logger.error(f"Failed to delete rule: {e}")
async def raise_bridges(self):
tasks = []
for queue in self.net_queues:
connection = fnfqueue.Connection()
listener = connection.bind(queue.queue)
listener.set_mode(65535, fnfqueue.COPY_PACKET)
task = asyncio.create_task(self._listen(connection, queue.callback))
tasks.append(task)
await asyncio.gather(*tasks)