sys_patch.py: Add docstrings and typing suggestions

This commit is contained in:
Mykola Grymalyuk
2023-04-07 12:57:05 -06:00
parent fcd0c7cd21
commit 96e96464f2

View File

@@ -46,7 +46,7 @@ from data import os_data
class PatchSysVolume:
def __init__(self, model: str, global_constants: constants.Constants, hardware_details: list = None):
def __init__(self, model: str, global_constants: constants.Constants, hardware_details: list = None) -> None:
self.model = model
self.constants: constants.Constants = global_constants
self.computer = self.constants.computer
@@ -67,19 +67,21 @@ class PatchSysVolume:
self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"]
def __del__(self):
# Ensures that each time we're patching, we're using a clean repository
def __del__(self) -> None:
"""
Ensures that each time we're patching, we're using a clean PatcherSupportPkg folder
"""
if Path(self.constants.payload_local_binaries_root_path).exists():
shutil.rmtree(self.constants.payload_local_binaries_root_path)
def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None):
def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None:
"""
Initializes the pathing for root volume patching
Parameters:
custom_root_mount_path (Path): Custom path to mount the root volume
custom_data_mount_path (Path): Custom path to mount the data volume
"""
if custom_root_mount_path and custom_data_mount_path:
self.mount_location = custom_root_mount_path
@@ -96,7 +98,18 @@ class PatchSysVolume:
self.mount_application_support = f"{self.mount_location_data}/Library/Application Support"
def _mount_root_vol(self):
def _mount_root_vol(self) -> bool:
"""
Attempts to mount the booted APFS volume as a writable volume
at /System/Volumes/Update/mnt1
Manual invocation:
'sudo mount -o nobrowse -t apfs /dev/diskXsY /System/Volumes/Update/mnt1'
Returns:
bool: True if successful, False if not
"""
# Returns boolean if Root Volume is available
self.root_mount_path = utilities.get_disk_path()
if self.root_mount_path.startswith("disk"):
@@ -122,7 +135,16 @@ class PatchSysVolume:
return False
def _merge_kdk_with_root(self, save_hid_cs=False):
def _merge_kdk_with_root(self, save_hid_cs=False) -> None:
"""
Merge Kernel Debug Kit (KDK) with the root volume
If no KDK is present, will call kdk_handler to download and install it
Parameters:
save_hid_cs (bool): If True, will save the HID CS file before merging KDK
Required for USB 1.1 downgrades on Ventura and newer
"""
if self.skip_root_kmutil_requirement is True:
return
if self.constants.detected_os < os_data.os_data.ventura:
@@ -221,25 +243,43 @@ class PatchSysVolume:
def _unpatch_root_vol(self):
if self.constants.detected_os > os_data.os_data.catalina and self.root_supports_snapshot is True:
logging.info("- Reverting to last signed APFS snapshot")
result = utilities.elevated(["bless", "--mount", self.mount_location, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("- Unable to revert root volume patches")
logging.info("Reason for unpatch Failure:")
logging.info(result.stdout.decode())
logging.info("- Failed to revert snapshot via Apple's 'bless' command")
else:
self._clean_skylight_plugins()
self._delete_nonmetal_enforcement()
self._clean_auxiliary_kc()
self.constants.root_patcher_succeeded = True
logging.info("- Unpatching complete")
logging.info("\nPlease reboot the machine for patches to take effect")
"""
Reverts APFS snapshot and cleans up any changes made to the root and data volume
"""
if self.constants.detected_os <= os_data.os_data.big_sur or self.root_supports_snapshot is False:
logging.info("- OS version does not support snapshotting, skipping revert")
logging.info("- Reverting to last signed APFS snapshot")
result = utilities.elevated(["bless", "--mount", self.mount_location, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("- Unable to revert root volume patches")
logging.info("Reason for unpatch Failure:")
logging.info(result.stdout.decode())
logging.info("- Failed to revert snapshot via Apple's 'bless' command")
else:
self._clean_skylight_plugins()
self._delete_nonmetal_enforcement()
self._clean_auxiliary_kc()
self.constants.root_patcher_succeeded = True
logging.info("- Unpatching complete")
logging.info("\nPlease reboot the machine for patches to take effect")
def _rebuild_root_volume(self) -> bool:
"""
Rebuilds the Root Volume:
- Rebuilds the Kernel Collection
- Updates the Preboot Kernel Cache
- Rebuilds the dyld Shared Cache
- Creates a new APFS Snapshot
Returns:
bool: True if successful, False if not
"""
def _rebuild_snapshot(self):
if self._rebuild_kernel_collection() is True:
self.update_preboot_kernel_cache()
self._update_preboot_kernel_cache()
self._rebuild_dyld_shared_cache()
if self._create_new_apfs_snapshot() is True:
logging.info("- Patching complete")
@@ -250,7 +290,20 @@ class PatchSysVolume:
if self.constants.gui_mode is False:
input("\nPress [ENTER] to continue")
def _rebuild_kernel_collection(self):
def _rebuild_kernel_collection(self) -> bool:
"""
Rebuilds the Kernel Collection
Supports following KC generation:
- Boot/SysKC (11.0+)
- AuxKC (11.0+)
- PrelinkedKernel (10.15-)
Returns:
bool: True if successful, False if not
"""
logging.info("- Rebuilding Kernel Cache (This may take some time)")
if self.constants.detected_os > os_data.os_data.catalina:
# Base Arguments
@@ -345,7 +398,15 @@ class PatchSysVolume:
logging.info("- Successfully built new kernel cache")
return True
def _create_new_apfs_snapshot(self):
def _create_new_apfs_snapshot(self) -> bool:
"""
Creates a new APFS snapshot of the root volume
Returns:
bool: True if snapshot was created, False if not
"""
if self.root_supports_snapshot is True:
logging.info("- Creating new APFS snapshot")
bless = utilities.elevated(
@@ -365,22 +426,44 @@ class PatchSysVolume:
self._unmount_drive()
return True
def _unmount_drive(self):
def _unmount_drive(self) -> None:
"""
Unmount root volume
"""
logging.info("- Unmounting Root Volume (Don't worry if this fails)")
utilities.elevated(["diskutil", "unmount", self.root_mount_path], stdout=subprocess.PIPE).stdout.decode().strip().encode()
def _rebuild_dyld_shared_cache(self):
def _rebuild_dyld_shared_cache(self) -> None:
"""
Rebuild the dyld shared cache
Only required on Mojave and older
"""
if self.constants.detected_os > os_data.os_data.catalina:
return
logging.info("- Rebuilding dyld shared cache")
utilities.process_status(utilities.elevated(["update_dyld_shared_cache", "-root", f"{self.mount_location}/"]))
def update_preboot_kernel_cache(self):
def _update_preboot_kernel_cache(self) -> None:
"""
Update the preboot kernel cache
Only required on Catalina
"""
if self.constants.detected_os == os_data.os_data.catalina:
logging.info("- Rebuilding preboot kernel cache")
utilities.process_status(utilities.elevated(["kcditto"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
def _clean_skylight_plugins(self):
def _clean_skylight_plugins(self) -> None:
"""
Clean non-Metal's SkylightPlugins folder
"""
if (Path(self.mount_application_support) / Path("SkyLightPlugins/")).exists():
logging.info("- Found SkylightPlugins folder, removing old plugins")
utilities.process_status(utilities.elevated(["rm", "-Rf", f"{self.mount_application_support}/SkyLightPlugins"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
@@ -389,18 +472,31 @@ class PatchSysVolume:
logging.info("- Creating SkylightPlugins folder")
utilities.process_status(utilities.elevated(["mkdir", "-p", f"{self.mount_application_support}/SkyLightPlugins/"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
def _delete_nonmetal_enforcement(self):
def _delete_nonmetal_enforcement(self) -> None:
"""
Remove defaults related to forced OpenGL rendering
Primarily for development purposes
"""
for arg in ["useMetal", "useIOP"]:
result = subprocess.run(["defaults", "read", "/Library/Preferences/com.apple.CoreDisplay", arg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode("utf-8").strip()
if result in ["0", "false", "1", "true"]:
logging.info(f"- Removing non-Metal Enforcement Preference: {arg}")
utilities.elevated(["defaults", "delete", "/Library/Preferences/com.apple.CoreDisplay", arg])
def _clean_auxiliary_kc(self):
# When reverting root volume patches, the AuxKC will still retain the UUID
# it was built against. Thus when Boot/SysKC are reverted, Aux will break
# To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC
# We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file
def _clean_auxiliary_kc(self) -> None:
"""
Clean the Auxiliary Kernel Collection
Logic:
When reverting root volume patches, the AuxKC will still retain the UUID
it was built against. Thus when Boot/SysKC are reverted, Aux will break
To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC
We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file
"""
if self.constants.detected_os < os_data.os_data.big_sur:
return
@@ -441,7 +537,15 @@ class PatchSysVolume:
# ex. Symlinks pointing to symlinks pointing to dead files
pass
def _write_patchset(self, patchset):
def _write_patchset(self, patchset: dict) -> None:
"""
Write patchset information to Root Volume
Parameters:
patchset (dict): Patchset information (generated by GenerateRootPatchSets)
"""
destination_path = f"{self.mount_location}/System/Library/CoreServices"
file_name = "OpenCore-Legacy-Patcher.plist"
destination_path_file = f"{destination_path}/{file_name}"
@@ -451,16 +555,31 @@ class PatchSysVolume:
utilities.process_status(utilities.elevated(["rm", destination_path_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
utilities.process_status(utilities.elevated(["cp", f"{self.constants.payload_path}/{file_name}", destination_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
def _add_auxkc_support(self, install_file, source_folder_path, install_patch_directory, destination_folder_path):
# In macOS Ventura, KDKs are required to build new Boot and System KCs
# However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions'
# kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry
# If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot',
# kernelmanagerd will require the kext to be installed in the Boot/SysKC
def _add_auxkc_support(self, install_file: str, source_folder_path: str, install_patch_directory: str, destination_folder_path: str) -> str:
"""
Patch provided Kext to support Auxiliary Kernel Collection
# Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed
# in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary'
Logic:
In macOS Ventura, KDKs are required to build new Boot and System KCs
However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions'
kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry
If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot',
kernelmanagerd will require the kext to be installed in the Boot/SysKC
Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed
in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary'
Parameters:
install_file (str): Kext file name
source_folder_path (str): Source folder path
install_patch_directory (str): Patch directory
destination_folder_path (str): Destination folder path
Returns:
str: Updated destination folder path
"""
if self.skip_root_kmutil_requirement is False:
return destination_folder_path
@@ -491,14 +610,24 @@ class PatchSysVolume:
return updated_install_location
def _check_kexts_needs_authentication(self, kext_name):
# Verify whether the user needs to authenticate in System Preferences
# Specifically under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist'
# ["kextsToBuild"][i]:
# ["bundlePathMainOS"] = /Library/Extensions/Test.kext
# ["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed)
# ["teamID"] = Team ID (blank on ad-hoc signed)
# To grab the CDHash of a kext, run 'codesign -dvvv <kext_path>'
def _check_kexts_needs_authentication(self, kext_name: str):
"""
Verify whether the user needs to authenticate in System Preferences
Sets 'needs_to_open_preferences' to True if the kext is not in the AuxKC
Logic:
Under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist'
["kextsToBuild"][i]:
["bundlePathMainOS"] = /Library/Extensions/Test.kext
["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed)
["teamID"] = Team ID (blank on ad-hoc signed)
To grab the CDHash of a kext, run 'codesign -dvvv <kext_path>'
Parameters:
kext_name (str): Name of the kext to check
"""
try:
aux_cache_path = Path(self.mount_location_data) / Path("/private/var/db/KernelExtensionManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist")
if aux_cache_path.exists():
@@ -513,7 +642,12 @@ class PatchSysVolume:
logging.info(f" - {kext_name} requires authentication in System Preferences")
self.constants.needs_to_open_preferences = True # Notify in GUI to open System Preferences
def _patch_root_vol(self):
"""
Patch root volume
"""
logging.info(f"- Running patches for {self.model}")
if self.patch_set_dictionary != {}:
self._execute_patchset(self.patch_set_dictionary)
@@ -523,9 +657,17 @@ class PatchSysVolume:
if self.constants.wxpython_variant is True and self.constants.detected_os >= os_data.os_data.big_sur:
sys_patch_auto.AutomaticSysPatch(self.constants).install_auto_patcher_launch_agent()
self._rebuild_snapshot()
self._rebuild_root_volume()
def _execute_patchset(self, required_patches: dict):
"""
Executes provided patchset
Parameters:
required_patches (dict): Patchset to execute (generated by sys_patch_generate.GenerateRootPatchSets)
"""
def _execute_patchset(self, required_patches):
source_files_path = str(self.constants.payload_local_binaries_root_path)
self._preflight_checks(required_patches, source_files_path)
for patch in required_patches:
@@ -579,9 +721,19 @@ class PatchSysVolume:
sys_patch_helpers.SysPatchHelpers(self.constants).disable_window_server_caching()
if any(x in required_patches for x in ["Intel Ivy Bridge", "Intel Haswell"]):
sys_patch_helpers.SysPatchHelpers(self.constants).remove_news_widgets()
self._write_patchset(required_patches)
def _preflight_checks(self, required_patches, source_files_path):
def _preflight_checks(self, required_patches: dict, source_files_path: Path) -> None:
"""
Runs preflight checks before patching
Parameters:
required_patches (dict): Patchset dictionary (from sys_patch_generate.GenerateRootPatchSets)
source_files_path (Path): Path to the source files (PatcherSupportPkg)
"""
logging.info("- Running Preflight Checks before patching")
# Make sure old SkyLight plugins aren't being used
@@ -606,16 +758,25 @@ class PatchSysVolume:
raise Exception(f"Failed to find {source_file}")
# Ensure KDK is properly installed
should_save_cs = False
if "Legacy USB 1.1" in required_patches:
should_save_cs = True
self._merge_kdk_with_root(save_hid_cs=should_save_cs)
self._merge_kdk_with_root(save_hid_cs=True if "Legacy USB 1.1" in required_patches else False)
logging.info("- Finished Preflight, starting patching")
def _install_new_file(self, source_folder, destination_folder, file_name):
# .frameworks are merged
# .kexts and .apps are deleted and replaced
def _install_new_file(self, source_folder: Path, destination_folder: Path, file_name: str) -> None:
"""
Installs a new file to the destination folder
File handling logic:
- .frameworks are merged with the destination folder
- Other files are deleted and replaced (ex. .kexts, .apps)
Parameters:
source_folder (Path): Path to the source folder
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to install
"""
file_name_str = str(file_name)
if not Path(destination_folder).exists():
@@ -646,7 +807,16 @@ class PatchSysVolume:
utilities.process_status(utilities.elevated(["cp", f"{source_folder}/{file_name}", destination_folder], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
self._fix_permissions(destination_folder + "/" + file_name)
def _remove_file(self, destination_folder, file_name):
def _remove_file(self, destination_folder: Path, file_name: str) -> None:
"""
Removes a file from the destination folder
Parameters:
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to remove
"""
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Removing: {file_name}")
if Path(destination_folder + "/" + file_name).is_dir():
@@ -655,7 +825,11 @@ class PatchSysVolume:
utilities.process_status(utilities.elevated(["rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
def _fix_permissions(self, destination_file):
def _fix_permissions(self, destination_file: Path) -> None:
"""
Fix file permissions for a given file or directory
"""
chmod_args = ["chmod", "-Rf", "755", destination_file]
chown_args = ["chown", "-Rf", "root:wheel", destination_file]
if not Path(destination_file).is_dir():
@@ -666,7 +840,14 @@ class PatchSysVolume:
utilities.process_status(utilities.elevated(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
def _check_files(self):
def _check_files(self) -> bool:
"""
Check if all files are present (primarily PatcherSupportPkg resources)
Returns:
bool: True if all files are present, False otherwise
"""
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
@@ -683,6 +864,10 @@ class PatchSysVolume:
# Entry Function
def start_patch(self):
"""
Entry function for the patching process
"""
logging.info("- Starting Patch Process")
logging.info(f"- Determining Required Patch set for Darwin {self.constants.detected_os}")
self.patch_set_dictionary = sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset
@@ -716,7 +901,11 @@ class PatchSysVolume:
else:
logging.info("- Returning to main menu")
def start_unpatch(self):
def start_unpatch(self) -> None:
"""
Entry function for unpatching the root volume
"""
logging.info("- Starting Unpatch Process")
if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is True:
if self._mount_root_vol() is True: