Implement getattrlist for improved CoW detection

This commit is contained in:
Mykola Grymalyuk
2024-08-01 11:16:00 -06:00
parent 57356bcceb
commit 90092a296d
11 changed files with 222 additions and 23 deletions

View File

@@ -0,0 +1,46 @@
"""
volume: Volume utilities for macOS
-------------------------------------------------------------------------------
Usage - Checking if Copy on Write is supported between source and destination:
>>> from volume import can_copy_on_write
>>> source = "/path/to/source"
>>> destination = "/path/to/destination"
>>> can_copy_on_write(source, destination)
True
-------------------------------------------------------------------------------
Usage - Generating copy arguments:
>>> from volume import generate_copy_arguments
>>> source = "/path/to/source"
>>> destination = "/path/to/destination"
>>> _command = generate_copy_arguments(source, destination)
>>> _command
['/bin/cp', '-c', '/path/to/source', '/path/to/destination']
-------------------------------------------------------------------------------
Usage - Querying volume properties:
>>> from volume import PathAttributes
>>> path = "/path/to/file"
>>> obj = PathAttributes(path)
>>> obj.mount_point()
"/"
>>> obj.supports_clonefile()
True
"""
from .properties import PathAttributes
from .copy import can_copy_on_write, generate_copy_arguments

View File

@@ -0,0 +1,35 @@
"""
copy.py: Generate performant '/bin/cp' arguments for macOS
"""
from pathlib import Path
from .properties import PathAttributes
def can_copy_on_write(source: str, destination: str) -> bool:
"""
Check if Copy on Write is supported between source and destination
"""
source_obj = PathAttributes(source)
return source_obj.mount_point() == PathAttributes(str(Path(destination).parent)).mount_point() and source_obj.supports_clonefile()
def generate_copy_arguments(source: str, destination: str) -> list:
"""
Generate performant '/bin/cp' arguments for macOS
"""
_command = ["/bin/cp", source, destination]
if not Path(source).exists():
raise FileNotFoundError(f"Source file not found: {source}")
if not Path(destination).parent.exists():
raise FileNotFoundError(f"Destination directory not found: {destination}")
# Check if Copy on Write is supported.
if can_copy_on_write(source, destination):
_command.insert(1, "-c")
if Path(source).is_dir():
_command.insert(1, "-R")
return _command

View File

@@ -0,0 +1,110 @@
"""
properties.py: Query volume properties for a given path using macOS's getattrlist.
"""
import ctypes
class attrreference_t(ctypes.Structure):
_fields_ = [
("attr_dataoffset", ctypes.c_int32),
("attr_length", ctypes.c_uint32)
]
class attrlist_t(ctypes.Structure):
_fields_ = [
("bitmapcount", ctypes.c_ushort),
("reserved", ctypes.c_uint16),
("commonattr", ctypes.c_uint),
("volattr", ctypes.c_uint),
("dirattr", ctypes.c_uint),
("fileattr", ctypes.c_uint),
("forkattr", ctypes.c_uint)
]
class volattrbuf(ctypes.Structure):
_fields_ = [
("length", ctypes.c_uint32),
("mountPoint", attrreference_t),
("volCapabilities", ctypes.c_uint64),
("mountPointSpace", ctypes.c_char * 1024),
]
class PathAttributes:
def __init__(self, path: str) -> None:
self._path = path
if not isinstance(self._path, str):
try:
self._path = str(self._path)
except:
raise ValueError(f"Invalid path: {path}")
_libc = ctypes.CDLL("/usr/lib/libc.dylib")
# Reference:
# https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/getattrlist.2.html
try:
self._getattrlist = _libc.getattrlist
except AttributeError:
return
self._getattrlist.argtypes = [
ctypes.c_char_p, # Path
ctypes.POINTER(attrlist_t), # Attribute list
ctypes.c_void_p, # Attribute buffer
ctypes.c_ulong, # Attribute buffer size
ctypes.c_ulong # Options
]
self._getattrlist.restype = ctypes.c_int
# Reference:
# https://github.com/apple-oss-distributions/xnu/blob/xnu-10063.121.3/bsd/sys/attr.h
ATTR_BIT_MAP_COUNT = 0x00000005
ATTR_VOL_MOUNTPOINT = 0x00001000
ATTR_VOL_CAPABILITIES = 0x00020000
attrList = attrlist_t()
attrList.bitmapcount = ATTR_BIT_MAP_COUNT
attrList.volattr = ATTR_VOL_MOUNTPOINT | ATTR_VOL_CAPABILITIES
volAttrBuf = volattrbuf()
if self._getattrlist(self._path.encode(), ctypes.byref(attrList), ctypes.byref(volAttrBuf), ctypes.sizeof(volAttrBuf), 0) != 0:
return
self._volAttrBuf = volAttrBuf
def supports_clonefile(self) -> bool:
"""
Verify if path provided supports Apple's clonefile function.
Equivalent to checking for Copy on Write support.
"""
VOL_CAP_INT_CLONE = 0x00010000
if not hasattr(self, "_volAttrBuf"):
return False
if self._volAttrBuf.volCapabilities & VOL_CAP_INT_CLONE:
return True
return False
def mount_point(self) -> str:
"""
Return mount point of path.
"""
if not hasattr(self, "_volAttrBuf"):
return ""
mount_point_ptr = ctypes.cast(
ctypes.addressof(self._volAttrBuf.mountPoint) + self._volAttrBuf.mountPoint.attr_dataoffset,
ctypes.POINTER(ctypes.c_char * self._volAttrBuf.mountPoint.attr_length)
)
return mount_point_ptr.contents.value.decode()