MVP for testing

This commit is contained in:
Darryl Nixon 2023-07-16 09:30:36 -07:00
parent 3396d2d69b
commit 9bc6f8d9e1
11 changed files with 459 additions and 3 deletions

View file

@ -0,0 +1,78 @@
import ctypes
from collections.abc import Generator
from pathlib import Path
class ext2_filsys(ctypes.Structure):
pass
class ext2_inode_scan(ctypes.Structure):
pass
class ext2_inode_large(ctypes.Structure):
_fields_ = [
("i_mode", ctypes.c_uint16),
("i_uid", ctypes.c_uint16),
("i_size", ctypes.c_uint32),
("i_atime", ctypes.c_uint32),
("i_ctime", ctypes.c_uint32),
("i_mtime", ctypes.c_uint32),
("i_dtime", ctypes.c_uint32),
("i_gid", ctypes.c_uint16),
("i_links_count", ctypes.c_uint16),
("i_blocks", ctypes.c_uint32),
("i_flags", ctypes.c_uint32),
("i_osd1", ctypes.c_uint32 * 3),
("i_block", ctypes.c_uint32 * 15),
("i_generation", ctypes.c_uint32),
("i_file_acl", ctypes.c_uint32),
("i_dir_acl", ctypes.c_uint32),
("i_faddr", ctypes.c_uint32),
("i_osd2", ctypes.c_uint8 * 12),
]
class ext2_inode_large_p(ctypes.POINTER(ext2_inode_large)):
pass
class EXT23Handler:
def __init__(self, fs: str) -> None:
self.fs = "ext2/ext3"
self.libext2fs = ctypes.CDLL("libext2fs.so.2")
self.libext2fs.ext2fs_open.restype = ctypes.c_int
self.libext2fs.ext2fs_open.argtypes = [
ctypes.c_char_p,
ctypes.c_int,
ctypes.c_int,
ctypes.c_uint32,
ctypes.POINTER(ext2_filsys),
]
self.libext2fs.ext2fs_close.argtypes = [ext2_filsys]
self.libext2fs.ext2fs_get_next_inode.argtypes = [ext2_inode_scan, ext2_inode_large_p]
self.libext2fs.ext2fs_get_next_inode.restype = ctypes.c_int
async def get_hardlinks(self, path: Path) -> Generator:
path = path.resolve().absolute()
inode = path.stat().st_ino
fs = ext2_filsys()
ret = self.libext2fs.ext2fs_open(path.encode(), 0, 0, 0, ctypes.byref(fs))
if ret != 0:
return []
scan = ext2_inode_scan()
ret = self.libext2fs.ext2fs_open_inode_scan(fs, ctypes.byref(scan))
if ret != 0:
self.libext2fs.ext2fs_close(fs)
return []
inode_large = ext2_inode_large()
while self.libext2fs.ext2fs_get_next_inode(scan, ctypes.byref(inode_large)) == 0:
if inode_large.i_links_count > 1 and inode_large.i_file_acl == inode:
yield Path(fs.fs_mount_point) / scan.name.decode()
self.libext2fs.ext2fs_close_inode_scan(scan)
self.libext2fs.ext2fs_close(fs)