Modularize sys_patch_mount.py

This commit is contained in:
Mykola Grymalyuk
2024-08-12 16:38:05 -06:00
parent 35b365c8ca
commit c4cda81df6
4 changed files with 136 additions and 76 deletions
@@ -0,0 +1,16 @@
"""
mount: Library for mounting and unmounting the root volume and interacting with APFS snapshots.
Usage:
>>> from mount import RootVolumeMount
>>> RootVolumeMount(xnu_major).mount()
'/System/Volumes/Update/mnt1'
>>> RootVolumeMount(xnu_major).unmount()
>>> RootVolumeMount(xnu_major).create_snapshot()
>>> RootVolumeMount(xnu_major).revert_snapshot()
"""
from .mount import RootVolumeMount
from .snapshot import APFSSnapshot
@@ -1,62 +1,28 @@
""" """
sys_patch_mount.py: Handling macOS root volume mounting and unmounting, mount.py: Handling macOS root volume mounting and unmounting
as well as APFS snapshots for Big Sur and newer
""" """
import logging import logging
import plistlib import plistlib
import platform
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from ..datasets import os_data from .snapshot import APFSSnapshot
from ..support import subprocess_wrapper
from ...datasets import os_data
from ...support import subprocess_wrapper
class SysPatchMount: class RootVolumeMount:
def __init__(self, xnu_major: int, rosetta_status: bool) -> None: def __init__(self, xnu_major: int) -> None:
self.xnu_major = xnu_major self.xnu_major = xnu_major
self.rosetta_status = rosetta_status
self.root_volume_identifier = self._fetch_root_volume_identifier() self.root_volume_identifier = self._fetch_root_volume_identifier()
self.mount_path = None self.mount_path = None
def mount(self) -> str:
"""
Mount the root volume.
Returns the path to the root volume.
If none, failed to mount.
"""
result = self._mount_root_volume()
if result is None:
logging.error("Failed to mount root volume")
return None
if not Path(result).exists():
logging.error(f"Attempted to mount root volume, but failed: {result}")
return None
self.mount_path = result
return result
def unmount(self, ignore_errors: bool = True) -> bool:
"""
Unmount the root volume.
Returns True if successful, False otherwise.
Note for Big Sur and newer, a snapshot is created before unmounting.
And that unmounting is not critical to the process.
"""
return self._unmount_root_volume(ignore_errors=ignore_errors)
def _fetch_root_volume_identifier(self) -> str: def _fetch_root_volume_identifier(self) -> str:
""" """
Resolve path to disk identifier Resolve path to disk identifier
@@ -136,43 +102,48 @@ class SysPatchMount:
return True return True
def mount(self) -> str:
"""
Mount the root volume.
Returns the path to the root volume.
If none, failed to mount.
"""
result = self._mount_root_volume()
if result is None:
logging.error("Failed to mount root volume")
return None
if not Path(result).exists():
logging.error(f"Attempted to mount root volume, but failed: {result}")
return None
self.mount_path = result
return result
def unmount(self, ignore_errors: bool = True) -> bool:
"""
Unmount the root volume.
Returns True if successful, False otherwise.
Note for Big Sur and newer, a snapshot is created before unmounting.
And that unmounting is not critical to the process.
"""
return self._unmount_root_volume(ignore_errors=ignore_errors)
def create_snapshot(self) -> bool: def create_snapshot(self) -> bool:
""" """
Create APFS snapshot of the root volume. Create APFS snapshot of the root volume.
""" """
if self.xnu_major < os_data.os_data.big_sur.value: return APFSSnapshot(self.xnu_major, self.mount_path).create_snapshot()
return True
args = ["/usr/sbin/bless"]
if platform.machine() == "arm64" or self.rosetta_status is True:
args += ["--mount", self.mount_path, "--create-snapshot"]
else:
args += ["--folder", f"{self.mount_path}/System/Library/CoreServices", "--bootefi", "--create-snapshot"]
result = subprocess_wrapper.run_as_root(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to create APFS snapshot")
subprocess_wrapper.log(result)
if "Can't use last-sealed-snapshot or create-snapshot on non system volume" in result.stdout.decode():
logging.info("- This is an APFS bug with Monterey and newer! Perform a clean installation to ensure your APFS volume is built correctly")
return False
return True
def revert_snapshot(self) -> bool: def revert_snapshot(self) -> bool:
""" """
Revert APFS snapshot of the root volume. Revert APFS snapshot of the root volume.
""" """
if self.xnu_major < os_data.os_data.big_sur.value: return APFSSnapshot(self.xnu_major, self.mount_path).revert_snapshot()
return True
result = subprocess_wrapper.run_as_root(["/usr/sbin/bless", "--mount", self.mount_path, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to revert APFS snapshot")
subprocess_wrapper.log(result)
return False
return True
@@ -0,0 +1,69 @@
"""
snapshot.py: Handling APFS snapshots
"""
import logging
import platform
import subprocess
from ...datasets import os_data
from ...support import subprocess_wrapper
class APFSSnapshot:
def __init__(self, xnu_major: int, mount_path: str):
self.xnu_major = xnu_major
self.mount_path = mount_path
def _rosetta_status(self) -> bool:
"""
Check if currently running inside of Rosetta
"""
result = subprocess_wrapper.run(["/usr/sbin/sysctl", "-n", "sysctl.proc_translated"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
return False
return True if result.stdout.decode().strip() == "1" else False
def create_snapshot(self) -> bool:
"""
Create APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
args = ["/usr/sbin/bless"]
if platform.machine() == "arm64" or self._rosetta_status() is True:
args += ["--mount", self.mount_path, "--create-snapshot"]
else:
args += ["--folder", f"{self.mount_path}/System/Library/CoreServices", "--bootefi", "--create-snapshot"]
result = subprocess_wrapper.run_as_root(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to create APFS snapshot")
subprocess_wrapper.log(result)
if "Can't use last-sealed-snapshot or create-snapshot on non system volume" in result.stdout.decode():
logging.info("- This is an APFS bug with Monterey and newer! Perform a clean installation to ensure your APFS volume is built correctly")
return False
return True
def revert_snapshot(self) -> bool:
"""
Revert APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
result = subprocess_wrapper.run_as_root(["/usr/sbin/bless", "--mount", self.mount_path, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to revert APFS snapshot")
subprocess_wrapper.log(result)
return False
return True
@@ -41,7 +41,11 @@ import subprocess
import applescript import applescript
from pathlib import Path from pathlib import Path
from datetime import datetime
from .mount import (
RootVolumeMount,
APFSSnapshot
)
from .. import constants from .. import constants
@@ -57,7 +61,6 @@ from . import (
sys_patch_detect, sys_patch_detect,
sys_patch_helpers, sys_patch_helpers,
sys_patch_generate, sys_patch_generate,
sys_patch_mount,
kernelcache kernelcache
) )
from .auto_patcher import InstallAutomaticPatchingServices from .auto_patcher import InstallAutomaticPatchingServices
@@ -84,7 +87,7 @@ class PatchSysVolume:
self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"] self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"]
self.mount_obj = sys_patch_mount.SysPatchMount(self.constants.detected_os, self.computer.rosetta_active) self.mount_obj = RootVolumeMount(self.constants.detected_os)
def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None: def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None:
@@ -266,7 +269,8 @@ class PatchSysVolume:
""" """
Reverts APFS snapshot and cleans up any changes made to the root and data volume Reverts APFS snapshot and cleans up any changes made to the root and data volume
""" """
if self.mount_obj.revert_snapshot() is False:
if APFSSnapshot(self.constants.detected_os, self.mount_location).revert_snapshot() is False:
return return
self._clean_skylight_plugins() self._clean_skylight_plugins()
@@ -337,7 +341,7 @@ class PatchSysVolume:
Returns: Returns:
bool: True if snapshot was created, False if not bool: True if snapshot was created, False if not
""" """
return self.mount_obj.create_snapshot() return APFSSnapshot(self.constants.detected_os, self.mount_location).create_snapshot()
def _rebuild_dyld_shared_cache(self) -> None: def _rebuild_dyld_shared_cache(self) -> None: