melamine/melamine/classes.py

202 lines
7.8 KiB
Python

import asyncio
import hashlib
from collections.abc import Generator
from pathlib import Path
from secrets import token_bytes
from typing import Set
from typing import Union
import aiofiles
from aiopath import AsyncPath
from .fileops import find_mount
from .fileops import mount_to_fs_handler
from .logs import logger
class AsyncObject(object):
async def __new__(cls, *a, **kw):
instance = super().__new__(cls)
await instance.__init__(*a, **kw)
return instance
async def __init__(self):
pass
class ShredDir(AsyncObject):
"""Class for tracking each directory to be shredded, and its contents."""
async def __init__(self, path: AsyncPath, recursive: bool) -> None:
# https://github.com/alexdelorenzo/aiopath/issues/30 :(
resolved = AsyncPath(Path(path).resolve())
self.absolute_path = await resolved.absolute()
self.mount_point = await find_mount(self.absolute_path)
self.contents = await self._get_contents(recursive)
self.mount_points = set(m for m in self.enumerate_mount_points())
self.fs_handler = await mount_to_fs_handler(self.mount_point)
self.byte_size = sum(item.byte_size for item in self.contents)
stat = await path.stat()
self.inode = stat.st_ino
async def _get_contents(self, recursive: bool) -> Set:
tasks = []
async for subpath in self.absolute_path.glob("*"):
if await subpath.is_dir():
if recursive:
if await subpath.is_symlink():
logger.warning(f"Symlink subdirectory found: {subpath}, skipping")
continue
tasks.append(ShredDir(subpath, recursive))
else:
logger.warning(f"Subdirectory found: {subpath}, skipping (see -r/--recursive))")
elif subpath.is_file():
tasks.append(ShredFile(subpath))
return set(await asyncio.gather(*tasks))
def enumerate_mount_points(self) -> Generator:
for item in self.contents:
if isinstance(item, ShredDir):
yield from item.enumerate_mount_points()
yield self.mount_point
async def shred(self, hash: bool = False, dryrun: bool = False) -> bool:
tasks = []
for item in self.contents:
tasks.append(item.shred(hash, dryrun))
return all(await asyncio.gather(*tasks))
def __hash__(self) -> int:
return hash(self.absolute_path)
async def delete_hardlinks_by_inode(self) -> None:
logger.info(f"Finding and deleting hardlinks inside {self.absolute_path.name}")
for path in self.contents:
await path.delete_hardlinks_by_inode()
proc = await asyncio.create_subprocess_exec("find", str(self.mount_point), "-inum", str(self.inode), "-delete")
stdout, _ = await proc.communicate()
if proc.returncode != 0:
err = f"Unable to delete hardlinks for {self.absolute_path.name}"
logger.error(err)
raise RuntimeError(err)
logger.info(f"Deleted hardlink for {self.absolute_path.name}")
class ShredFile(AsyncObject):
"""Class for tracking each file to be shredded."""
async def __init__(self, path: AsyncPath) -> None:
# https://github.com/alexdelorenzo/aiopath/issues/30 :(
resolved = AsyncPath(Path(path).resolve())
self.absolute_path = await resolved.absolute()
stat = await path.stat()
self.byte_size = stat.st_size
self.inode = stat.st_ino
self.mount_point = await find_mount(self.absolute_path)
self.fs_handler = await mount_to_fs_handler(self.mount_point)
self.hardlinks = None
async def shred(self, hash: bool = False, dryrun: bool = False) -> Union[bool, bytes]:
"""Shred the file with a single file descriptor."""
try:
logger.info(f"Shredding: {self.absolute_path}")
async with aiofiles.open(self.absolute_path, "rb+") as file:
if hash:
sha1 = hashlib.sha1(usedforsecurity=False)
async for chunk in aiofiles.iterate(file):
sha1.update(chunk)
self.sha1 = sha1.digest()
logger.info(f"Got hash {sha1.hexdigest()}")
# First pass: Overwrite with binary zeroes
log_buf = f"[1/4] Writing zeroes ({self.absolute_path.name})"
await file.seek(0)
if not dryrun:
await file.write(b"\x00" * self.byte_size)
else:
log_buf = "DRY RUN (no changes made) " + log_buf
logger.info(log_buf)
await file.flush()
# Second pass: Overwrite with binary ones
log_buf = f"[2/4] Writing ones ({self.absolute_path.name})"
await file.seek(0)
if not dryrun:
await file.write(b"\xff" * self.byte_size)
else:
log_buf = "DRY RUN (no changes made) " + log_buf
logger.info(log_buf)
await file.flush()
# Third pass: Overwrite with random data
log_buf = f"[3/4] Writing randoms ({self.absolute_path.name})"
await file.seek(0)
random_data = token_bytes(self.byte_size)
if not dryrun:
await file.write(random_data)
else:
log_buf = "DRY RUN (no changes made) " + log_buf
logger.info(log_buf)
await file.flush()
# Remove the file
log_buf = f"[4/4] Unlinking {self.absolute_path}"
if not dryrun:
await self.absolute_path.unlink()
else:
log_buf = "DRY RUN (no changes made) " + log_buf
logger.info(log_buf)
# Remove any hardlinks
if self.hardlinks:
log_buf = f"[5/4] Unlinking {len(self.hardlinks)} hardlinks"
if not dryrun:
tasks = [link.unlink() for link in self.hardlinks]
done, _ = await asyncio.wait(tasks)
for task in done:
e = task.exception()
if e:
logger.warning(f"Unable to unlink hardlink: {e}")
else:
log_buf = "DRY RUN (no changes made) " + log_buf
logger.info(log_buf)
return True
except Exception as e:
logger.error(f"File wipe failed: {e}")
return False
def __hash__(self) -> int:
return hash(self.absolute_path)
async def delete_hardlinks_by_inode(self) -> None:
proc = await asyncio.create_subprocess_exec("find", str(self.mount_point), "-inum", str(self.inode), "-delete")
stdout, _ = await proc.communicate()
if proc.returncode != 0:
err = f"Unable to delete hardlinks for {self.absolute_path.name}"
logger.error(err)
raise RuntimeError(err)
logger.info("Deleted hardlink for {self.absolute_path.name}")
async def get_all_hardlinks(paths: Set[Union[ShredFile, ShredDir]]) -> None:
for path in paths:
if isinstance(path, ShredFile):
logger.info(f"Getting hardlinks for {path.absolute_path}")
hardlink_count = 0
path.hardlinks = set()
async for link in path.fs_handler.get_hardlinks(path):
hardlink_count += 1
path.hardlinks.add(link)
logger.info(f"Found hardlink: {link}")
logger.info(f"Found {hardlink_count} hardlinks for {path.absolute_path.name}")
if isinstance(path, ShredDir):
path.contents = await get_all_hardlinks(path.contents)
return paths