* feat(README.md): add detailed project information and installation instructions

* feat(cli.py): add command line arguments and async main function
* feat(influx.py): add InfluxDB class for handling InfluxDB operations
* feat(logs.py): add logger configuration
* feat(scan.py): add functions for running rustscan and parsing its output
* feat(validation.py): add function for validating CIDR or IPv4 address
* fix(pyproject.toml): update dependencies
* fix(README.md): change 'Examples' to 'Example'
* fix(bronzeburner.png): update image file
* fix(cli.py): update main function
* fix(scan.py): update run_rustscan function to use asyncio
*
This commit is contained in:
Darryl Nixon 2023-09-06 18:02:12 -07:00
parent c49335a360
commit e3072150d9
8 changed files with 270 additions and 9 deletions

View file

@ -7,17 +7,67 @@
use it to monitor your enterprise's ports over time<br/>
[About](#about) •
[Installation](#installation) •
[Examples](#examples) •
[Example](#example) •
[Contributing](#contributing) •
[License](#license)
</div>
## About
*bronzeburner* words
## Installation
TBD
This project was authored for use with Pypy3.10 for performance reasons, but will likely run fine with any full Python implementation.
Unfortunately, this means several useful libraries are yet incompatible (e.g., uvloop).
## Examples
### Requirements & Recommendations
- [RustScan](https://github.com/RustScan/RustScan) (required, in $PATH)
- [InfluxDB](https://github.com/influxdata/influxdb) (required)
- [Grafana](https://github.com/grafana/grafana) (optional, recommended)
- Docker (recommended)
### Instructions
These instructions assume you're running a Linux or macOS system. If you aren't, the instructions can easily be adapted.
If you don't already use [pyenv](https://github.com/pyenv/pyenv), look into using it to manage your Python versions. Use it to install
Pypy3.10 or install it manually. For macOS users, Pypy3.10 can be installed with `brew install pypy3.10`.
Clone this repository with `git clone ...`. Browse to the newly created project directory with `cd bronzeburner`. Create a new virtual
Python environment with `pypy3.10 -m venv venv` and activate it with `source venv/bin/activate`. Install bronzeburner and its dependencies
with `pip install .`.
Install Docker if you don't already use it. Create a persistent directory to store your data (i.e., `/opt/influxdb`). To run an InfluxDB instance,
run `docker run -v /opt/influxdb:/var/lib/influxdb2 -p 8086:8086 influxdb:2.7.1-alpine`. Browse to [http://127.0.0.1:8086/](http://127.0.0.1:8086/) and
set up your instance. Create a new API key with write access to your new org's new bucket and note it down.
You're ready to run bronzeburner.
```bash
bronzeburner -h
usage: bronzeburner [-h] -a ADDRESS -u URL -o ORG -b BUCKET -t TOKEN
A humble network scanner
options:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
IP address or CIDR range to scan
-u URL, --url URL InfluxDB server URL
-o ORG, --org ORG InfluxDB organization
-b BUCKET, --bucket BUCKET
InfluxDB bucket
-t TOKEN, --token TOKEN
InfluxDB token
```
Decide on a target. bronzeburner accepts IPv4 addresses and CIDR ranges as address targets but can be extended to include additional options. See the
example execution below.
## Example
TBD

Binary file not shown.

Before

Width:  |  Height:  |  Size: 34 KiB

After

Width:  |  Height:  |  Size: 17 KiB

View file

@ -1,16 +1,58 @@
import argparse
import asyncio
from .logs import logger
from .scan import run_rustscan
from .validation import validate_cidr_or_ipv4
from bronzeburner.influx import InfluxDB
def main():
pass
async def main(args: argparse.Namespace) -> None:
"""
Invoked after parsing args. Checks InfluxDB state and initiates network scan.
Args:
args (argparse.Namespace): All the arguments passed to the command line
Returns:
int: Exit code from invoking rustscan
Raises:
SystemExit: When there is an error connecting to InfluxDB
"""
if not await args.db.check_settings():
logger.error("There was an error connecting to InfluxDB. Check your settings!")
exit(1)
return await run_rustscan(args)
def run():
def run() -> None:
"""
This function creates and configures an ArgumentParser object for command line input,
instantiates the InfluxDB class with the provided arguments, and asynchronously runs the main function.
Command Line Arguments:
-a, --address: IP address or CIDR range to scan. Required.
-u, --url: InfluxDB server URL. Required.
-o, --org: InfluxDB organization. Required.
-b, --bucket: InfluxDB bucket. Required.
-t, --token: InfluxDB token. Required.
Raises:
argparse.ArgumentError: If any of the required arguments are not provided
"""
parser = argparse.ArgumentParser(description="A humble network scanner")
parser.add_argument(
"-a", "--address", help="IP address or CIDR range to scan", type=validate_cidr_or_ipv4, required=True
)
parser.add_argument("-u", "--url", help="InfluxDB server URL", type=str, required=True)
parser.add_argument("-o", "--org", help="InfluxDB organization", required=True)
parser.add_argument("-b", "--bucket", help="InfluxDB bucket", required=True)
parser.add_argument("-t", "--token", help="InfluxDB token", required=True)
args = parser.parse_args()
main(args)
args.db = InfluxDB(args.url, args.org, args.bucket, args.token)
asyncio.run(main(args))
if __name__ == "__main__":

36
bronzeburner/influx.py Normal file
View file

@ -0,0 +1,36 @@
from collections import namedtuple
import aiohttp
from .logs import logger
class InfluxDB:
def __init__(self, url: str, org: str, bucket: str, token: str) -> "InfluxDB":
self.url = url
self.org = org
self.bucket = bucket
self.headers = {"Authorization": f"Token {token}", "Content-Type": "text/plain"}
async def check_settings(self) -> bool:
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.url}/api/v2/orgs", headers=self.headers) as response:
return response.status == 200
async def insert(self, host: str, ports: namedtuple) -> None:
payload = "\n".join(
f'port_scan,host={host},port={p.port} state="{p.state}",protocol="{p.protocol}",service="{p.service}"'
for p in ports
)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.url}/api/v2/write?org={self.org}&bucket={self.bucket}&precision=s",
headers=self.headers,
data=payload,
) as response:
if response.status != 204:
logger.error(f"Failed write to InfluxDB (HTTP {response.status}): {await response.text()}")
return False
logger.info(f"Received HTTP {response.status} from InfluxDB")
return True

34
bronzeburner/logs.py Normal file
View file

@ -0,0 +1,34 @@
from loguru import logger
logger.add(
"app.log",
format="<level><light-blue>{time:YYYY-MM-DD HH:mm:ss} | {message}</light-blue></level>",
level="INFO",
rotation="1 day",
retention="30 days",
)
logger.add(
"errors.log",
format="<level><yellow> {time:YYYY-MM-DD HH:mm:ss} | {message}</yellow></level>",
level="WARNING",
rotation="1 day",
retention="30 days",
)
logger.add(
"error.log",
format="<level><red>⛔️ {time:YYYY-MM-DD HH:mm:ss} | {message}</red></level>",
level="ERROR",
rotation="1 day",
retention="30 days",
)
logger.add(
"critical.log",
format="<level><magenta>🚨 {time:YYYY-MM-DD HH:mm:ss} | {message}</magenta></level>",
level="CRITICAL",
rotation="1 day",
retention="30 days",
)

76
bronzeburner/scan.py Normal file
View file

@ -0,0 +1,76 @@
import argparse
import asyncio
import re
from collections import namedtuple
from .influx import InfluxDB
from .logs import logger
PORTS_LINE_RE = re.compile(r"Host: (\d+\.\d+\.\d+\.\d+).*Ports: (.+)$")
PORT_ENTRY_RE = re.compile(r"(\d+)/([^/]*)/([^/]*)/([^/]*)/([^/]*)/([^/]*)/([^/]*)/")
PortEntry = namedtuple("PortEntry", "port state protocol owner service rpc_info version")
async def parse_output_line(db: InfluxDB, line: str) -> None:
"""
This function parses a line of output from RustScan and stores port information in an InfluxDB database.
Args:
db (InfluxDB): The InfluxDB instance where the data will be stored.
line (str): A single line of output from RustScan.
Returns:
None
"""
if match := PORTS_LINE_RE.match(line):
host_ip, port_findings = match.groups()
port_entries = PORT_ENTRY_RE.findall(port_findings)
parsed_ports = [PortEntry(*p) for p in port_entries]
logger.info(f"Found {len(port_entries)} ports for {host_ip}: {','.join(str(p.port) for p in parsed_ports)}")
if await db.insert(host_ip, parsed_ports):
logger.info(f"Successfully wrote {len(parsed_ports)} ports to InfluxDB")
else:
logger.error(f"Failed to write {len(parsed_ports)} ports to InfluxDB")
async def run_rustscan(args: argparse.Namespace) -> int:
"""
Runs rustscan with specified arguments and processes its output line by line using parse_output_line().
Args:
args (argparse.Namespace): Parsed command line arguments containing the address to scan
Returns:
int: The exit code of the rustscan process
Raises:
May raise various exceptions depending on the rustscan output and parsing process
"""
rustscan_args = [
"rustscan",
"-t",
"500",
"-b",
"1500",
"--ulimit",
"5500",
"-a",
str(args.address),
"--",
"-oG",
"-",
]
logger.info(f"Invoking rustscan: {' '.join(rustscan_args)}")
process = await asyncio.create_subprocess_exec(*rustscan_args, stdout=asyncio.subprocess.PIPE)
async for line in process.stdout:
await parse_output_line(args.db, line.decode().strip())
returncode = await process.wait()
if returncode != 0:
logger.critical(f"rustscan exited with code {returncode}")
return returncode

View file

@ -0,0 +1,23 @@
import argparse
from ipaddress import AddressValueError
from ipaddress import IPv4Network
def validate_cidr_or_ipv4(value: str) -> IPv4Network:
"""
Validates whether a given value is a valid CIDR or IPv4 address
by attempting return a newly constructed IPv4Network object.
Args:
value (str): The string value to be validated as CIDR or IPv4 address
Returns:
IPv4Network: An IPv4Network instance constructed with the input value
Raises:
argparse.ArgumentTypeError: If the input value is not a valid CIDR or IPv4 address
"""
try:
return IPv4Network(value)
except AddressValueError as err:
raise argparse.ArgumentTypeError(str(err))

View file

@ -9,7 +9,7 @@ authors = [{ name = "Darryl Nixon", email = "git@nixon.mozmail.com" }]
description = "A humble network scanner"
requires-python = ">=3.9"
license = { text = "AGPL 3.0" }
dependencies = ["sh>=2.0.6", "rocketry>=2.5.1"]
dependencies = ["aiohttp>=3.8.5", "loguru>=0.7.1"]
[project.scripts]
bronzeburner = "bronzeburner.cli:run"