From 96e96464f267ae9116555a0a24b477051865b2dd Mon Sep 17 00:00:00 2001 From: Mykola Grymalyuk Date: Fri, 7 Apr 2023 12:57:05 -0600 Subject: [PATCH] sys_patch.py: Add docstrings and typing suggestions --- resources/sys_patch/sys_patch.py | 323 ++++++++++++++++++++++++------- 1 file changed, 256 insertions(+), 67 deletions(-) diff --git a/resources/sys_patch/sys_patch.py b/resources/sys_patch/sys_patch.py index 1d0c5764d..163ebdbba 100644 --- a/resources/sys_patch/sys_patch.py +++ b/resources/sys_patch/sys_patch.py @@ -46,7 +46,7 @@ from data import os_data class PatchSysVolume: - def __init__(self, model: str, global_constants: constants.Constants, hardware_details: list = None): + def __init__(self, model: str, global_constants: constants.Constants, hardware_details: list = None) -> None: self.model = model self.constants: constants.Constants = global_constants self.computer = self.constants.computer @@ -67,19 +67,21 @@ class PatchSysVolume: self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"] - def __del__(self): - # Ensures that each time we're patching, we're using a clean repository + def __del__(self) -> None: + """ + Ensures that each time we're patching, we're using a clean PatcherSupportPkg folder + """ + if Path(self.constants.payload_local_binaries_root_path).exists(): shutil.rmtree(self.constants.payload_local_binaries_root_path) - def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None): + def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None: """ Initializes the pathing for root volume patching Parameters: custom_root_mount_path (Path): Custom path to mount the root volume custom_data_mount_path (Path): Custom path to mount the data volume - """ if custom_root_mount_path and custom_data_mount_path: self.mount_location = custom_root_mount_path @@ -96,7 +98,18 @@ class PatchSysVolume: self.mount_application_support = f"{self.mount_location_data}/Library/Application Support" - def _mount_root_vol(self): + def _mount_root_vol(self) -> bool: + """ + Attempts to mount the booted APFS volume as a writable volume + at /System/Volumes/Update/mnt1 + + Manual invocation: + 'sudo mount -o nobrowse -t apfs /dev/diskXsY /System/Volumes/Update/mnt1' + + Returns: + bool: True if successful, False if not + """ + # Returns boolean if Root Volume is available self.root_mount_path = utilities.get_disk_path() if self.root_mount_path.startswith("disk"): @@ -122,7 +135,16 @@ class PatchSysVolume: return False - def _merge_kdk_with_root(self, save_hid_cs=False): + def _merge_kdk_with_root(self, save_hid_cs=False) -> None: + """ + Merge Kernel Debug Kit (KDK) with the root volume + If no KDK is present, will call kdk_handler to download and install it + + Parameters: + save_hid_cs (bool): If True, will save the HID CS file before merging KDK + Required for USB 1.1 downgrades on Ventura and newer + """ + if self.skip_root_kmutil_requirement is True: return if self.constants.detected_os < os_data.os_data.ventura: @@ -221,25 +243,43 @@ class PatchSysVolume: def _unpatch_root_vol(self): - if self.constants.detected_os > os_data.os_data.catalina and self.root_supports_snapshot is True: - logging.info("- Reverting to last signed APFS snapshot") - result = utilities.elevated(["bless", "--mount", self.mount_location, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - if result.returncode != 0: - logging.info("- Unable to revert root volume patches") - logging.info("Reason for unpatch Failure:") - logging.info(result.stdout.decode()) - logging.info("- Failed to revert snapshot via Apple's 'bless' command") - else: - self._clean_skylight_plugins() - self._delete_nonmetal_enforcement() - self._clean_auxiliary_kc() - self.constants.root_patcher_succeeded = True - logging.info("- Unpatching complete") - logging.info("\nPlease reboot the machine for patches to take effect") + """ + Reverts APFS snapshot and cleans up any changes made to the root and data volume + """ + + if self.constants.detected_os <= os_data.os_data.big_sur or self.root_supports_snapshot is False: + logging.info("- OS version does not support snapshotting, skipping revert") + + logging.info("- Reverting to last signed APFS snapshot") + result = utilities.elevated(["bless", "--mount", self.mount_location, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + if result.returncode != 0: + logging.info("- Unable to revert root volume patches") + logging.info("Reason for unpatch Failure:") + logging.info(result.stdout.decode()) + logging.info("- Failed to revert snapshot via Apple's 'bless' command") + else: + self._clean_skylight_plugins() + self._delete_nonmetal_enforcement() + self._clean_auxiliary_kc() + self.constants.root_patcher_succeeded = True + logging.info("- Unpatching complete") + logging.info("\nPlease reboot the machine for patches to take effect") + + + def _rebuild_root_volume(self) -> bool: + """ + Rebuilds the Root Volume: + - Rebuilds the Kernel Collection + - Updates the Preboot Kernel Cache + - Rebuilds the dyld Shared Cache + - Creates a new APFS Snapshot + + Returns: + bool: True if successful, False if not + """ - def _rebuild_snapshot(self): if self._rebuild_kernel_collection() is True: - self.update_preboot_kernel_cache() + self._update_preboot_kernel_cache() self._rebuild_dyld_shared_cache() if self._create_new_apfs_snapshot() is True: logging.info("- Patching complete") @@ -250,7 +290,20 @@ class PatchSysVolume: if self.constants.gui_mode is False: input("\nPress [ENTER] to continue") - def _rebuild_kernel_collection(self): + + def _rebuild_kernel_collection(self) -> bool: + """ + Rebuilds the Kernel Collection + + Supports following KC generation: + - Boot/SysKC (11.0+) + - AuxKC (11.0+) + - PrelinkedKernel (10.15-) + + Returns: + bool: True if successful, False if not + """ + logging.info("- Rebuilding Kernel Cache (This may take some time)") if self.constants.detected_os > os_data.os_data.catalina: # Base Arguments @@ -345,7 +398,15 @@ class PatchSysVolume: logging.info("- Successfully built new kernel cache") return True - def _create_new_apfs_snapshot(self): + + def _create_new_apfs_snapshot(self) -> bool: + """ + Creates a new APFS snapshot of the root volume + + Returns: + bool: True if snapshot was created, False if not + """ + if self.root_supports_snapshot is True: logging.info("- Creating new APFS snapshot") bless = utilities.elevated( @@ -365,22 +426,44 @@ class PatchSysVolume: self._unmount_drive() return True - def _unmount_drive(self): + + def _unmount_drive(self) -> None: + """ + Unmount root volume + """ + logging.info("- Unmounting Root Volume (Don't worry if this fails)") utilities.elevated(["diskutil", "unmount", self.root_mount_path], stdout=subprocess.PIPE).stdout.decode().strip().encode() - def _rebuild_dyld_shared_cache(self): + + def _rebuild_dyld_shared_cache(self) -> None: + """ + Rebuild the dyld shared cache + Only required on Mojave and older + """ + if self.constants.detected_os > os_data.os_data.catalina: return logging.info("- Rebuilding dyld shared cache") utilities.process_status(utilities.elevated(["update_dyld_shared_cache", "-root", f"{self.mount_location}/"])) - def update_preboot_kernel_cache(self): + + def _update_preboot_kernel_cache(self) -> None: + """ + Update the preboot kernel cache + Only required on Catalina + """ + if self.constants.detected_os == os_data.os_data.catalina: logging.info("- Rebuilding preboot kernel cache") utilities.process_status(utilities.elevated(["kcditto"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) - def _clean_skylight_plugins(self): + + def _clean_skylight_plugins(self) -> None: + """ + Clean non-Metal's SkylightPlugins folder + """ + if (Path(self.mount_application_support) / Path("SkyLightPlugins/")).exists(): logging.info("- Found SkylightPlugins folder, removing old plugins") utilities.process_status(utilities.elevated(["rm", "-Rf", f"{self.mount_application_support}/SkyLightPlugins"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) @@ -389,18 +472,31 @@ class PatchSysVolume: logging.info("- Creating SkylightPlugins folder") utilities.process_status(utilities.elevated(["mkdir", "-p", f"{self.mount_application_support}/SkyLightPlugins/"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) - def _delete_nonmetal_enforcement(self): + + def _delete_nonmetal_enforcement(self) -> None: + """ + Remove defaults related to forced OpenGL rendering + Primarily for development purposes + """ + for arg in ["useMetal", "useIOP"]: result = subprocess.run(["defaults", "read", "/Library/Preferences/com.apple.CoreDisplay", arg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode("utf-8").strip() if result in ["0", "false", "1", "true"]: logging.info(f"- Removing non-Metal Enforcement Preference: {arg}") utilities.elevated(["defaults", "delete", "/Library/Preferences/com.apple.CoreDisplay", arg]) - def _clean_auxiliary_kc(self): - # When reverting root volume patches, the AuxKC will still retain the UUID - # it was built against. Thus when Boot/SysKC are reverted, Aux will break - # To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC - # We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file + + def _clean_auxiliary_kc(self) -> None: + """ + Clean the Auxiliary Kernel Collection + + Logic: + When reverting root volume patches, the AuxKC will still retain the UUID + it was built against. Thus when Boot/SysKC are reverted, Aux will break + To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC + We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file + """ + if self.constants.detected_os < os_data.os_data.big_sur: return @@ -441,7 +537,15 @@ class PatchSysVolume: # ex. Symlinks pointing to symlinks pointing to dead files pass - def _write_patchset(self, patchset): + + def _write_patchset(self, patchset: dict) -> None: + """ + Write patchset information to Root Volume + + Parameters: + patchset (dict): Patchset information (generated by GenerateRootPatchSets) + """ + destination_path = f"{self.mount_location}/System/Library/CoreServices" file_name = "OpenCore-Legacy-Patcher.plist" destination_path_file = f"{destination_path}/{file_name}" @@ -451,16 +555,31 @@ class PatchSysVolume: utilities.process_status(utilities.elevated(["rm", destination_path_file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) utilities.process_status(utilities.elevated(["cp", f"{self.constants.payload_path}/{file_name}", destination_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) - def _add_auxkc_support(self, install_file, source_folder_path, install_patch_directory, destination_folder_path): - # In macOS Ventura, KDKs are required to build new Boot and System KCs - # However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions' - # kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry - # If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot', - # kernelmanagerd will require the kext to be installed in the Boot/SysKC + def _add_auxkc_support(self, install_file: str, source_folder_path: str, install_patch_directory: str, destination_folder_path: str) -> str: + """ + Patch provided Kext to support Auxiliary Kernel Collection - # Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed - # in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary' + Logic: + In macOS Ventura, KDKs are required to build new Boot and System KCs + However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions' + + kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry + If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot', + kernelmanagerd will require the kext to be installed in the Boot/SysKC + + Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed + in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary' + + Parameters: + install_file (str): Kext file name + source_folder_path (str): Source folder path + install_patch_directory (str): Patch directory + destination_folder_path (str): Destination folder path + + Returns: + str: Updated destination folder path + """ if self.skip_root_kmutil_requirement is False: return destination_folder_path @@ -491,14 +610,24 @@ class PatchSysVolume: return updated_install_location - def _check_kexts_needs_authentication(self, kext_name): - # Verify whether the user needs to authenticate in System Preferences - # Specifically under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist' - # ["kextsToBuild"][i]: - # ["bundlePathMainOS"] = /Library/Extensions/Test.kext - # ["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed) - # ["teamID"] = Team ID (blank on ad-hoc signed) - # To grab the CDHash of a kext, run 'codesign -dvvv ' + + def _check_kexts_needs_authentication(self, kext_name: str): + """ + Verify whether the user needs to authenticate in System Preferences + Sets 'needs_to_open_preferences' to True if the kext is not in the AuxKC + + Logic: + Under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist' + ["kextsToBuild"][i]: + ["bundlePathMainOS"] = /Library/Extensions/Test.kext + ["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed) + ["teamID"] = Team ID (blank on ad-hoc signed) + To grab the CDHash of a kext, run 'codesign -dvvv ' + + Parameters: + kext_name (str): Name of the kext to check + """ + try: aux_cache_path = Path(self.mount_location_data) / Path("/private/var/db/KernelExtensionManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist") if aux_cache_path.exists(): @@ -513,7 +642,12 @@ class PatchSysVolume: logging.info(f" - {kext_name} requires authentication in System Preferences") self.constants.needs_to_open_preferences = True # Notify in GUI to open System Preferences + def _patch_root_vol(self): + """ + Patch root volume + """ + logging.info(f"- Running patches for {self.model}") if self.patch_set_dictionary != {}: self._execute_patchset(self.patch_set_dictionary) @@ -523,9 +657,17 @@ class PatchSysVolume: if self.constants.wxpython_variant is True and self.constants.detected_os >= os_data.os_data.big_sur: sys_patch_auto.AutomaticSysPatch(self.constants).install_auto_patcher_launch_agent() - self._rebuild_snapshot() + self._rebuild_root_volume() + + + def _execute_patchset(self, required_patches: dict): + """ + Executes provided patchset + + Parameters: + required_patches (dict): Patchset to execute (generated by sys_patch_generate.GenerateRootPatchSets) + """ - def _execute_patchset(self, required_patches): source_files_path = str(self.constants.payload_local_binaries_root_path) self._preflight_checks(required_patches, source_files_path) for patch in required_patches: @@ -579,9 +721,19 @@ class PatchSysVolume: sys_patch_helpers.SysPatchHelpers(self.constants).disable_window_server_caching() if any(x in required_patches for x in ["Intel Ivy Bridge", "Intel Haswell"]): sys_patch_helpers.SysPatchHelpers(self.constants).remove_news_widgets() + self._write_patchset(required_patches) - def _preflight_checks(self, required_patches, source_files_path): + + def _preflight_checks(self, required_patches: dict, source_files_path: Path) -> None: + """ + Runs preflight checks before patching + + Parameters: + required_patches (dict): Patchset dictionary (from sys_patch_generate.GenerateRootPatchSets) + source_files_path (Path): Path to the source files (PatcherSupportPkg) + """ + logging.info("- Running Preflight Checks before patching") # Make sure old SkyLight plugins aren't being used @@ -606,16 +758,25 @@ class PatchSysVolume: raise Exception(f"Failed to find {source_file}") # Ensure KDK is properly installed - should_save_cs = False - if "Legacy USB 1.1" in required_patches: - should_save_cs = True - self._merge_kdk_with_root(save_hid_cs=should_save_cs) + self._merge_kdk_with_root(save_hid_cs=True if "Legacy USB 1.1" in required_patches else False) logging.info("- Finished Preflight, starting patching") - def _install_new_file(self, source_folder, destination_folder, file_name): - # .frameworks are merged - # .kexts and .apps are deleted and replaced + + def _install_new_file(self, source_folder: Path, destination_folder: Path, file_name: str) -> None: + """ + Installs a new file to the destination folder + + File handling logic: + - .frameworks are merged with the destination folder + - Other files are deleted and replaced (ex. .kexts, .apps) + + Parameters: + source_folder (Path): Path to the source folder + destination_folder (Path): Path to the destination folder + file_name (str): Name of the file to install + """ + file_name_str = str(file_name) if not Path(destination_folder).exists(): @@ -646,7 +807,16 @@ class PatchSysVolume: utilities.process_status(utilities.elevated(["cp", f"{source_folder}/{file_name}", destination_folder], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) self._fix_permissions(destination_folder + "/" + file_name) - def _remove_file(self, destination_folder, file_name): + + def _remove_file(self, destination_folder: Path, file_name: str) -> None: + """ + Removes a file from the destination folder + + Parameters: + destination_folder (Path): Path to the destination folder + file_name (str): Name of the file to remove + """ + if Path(destination_folder + "/" + file_name).exists(): logging.info(f" - Removing: {file_name}") if Path(destination_folder + "/" + file_name).is_dir(): @@ -655,7 +825,11 @@ class PatchSysVolume: utilities.process_status(utilities.elevated(["rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) - def _fix_permissions(self, destination_file): + def _fix_permissions(self, destination_file: Path) -> None: + """ + Fix file permissions for a given file or directory + """ + chmod_args = ["chmod", "-Rf", "755", destination_file] chown_args = ["chown", "-Rf", "root:wheel", destination_file] if not Path(destination_file).is_dir(): @@ -666,7 +840,14 @@ class PatchSysVolume: utilities.process_status(utilities.elevated(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) - def _check_files(self): + def _check_files(self) -> bool: + """ + Check if all files are present (primarily PatcherSupportPkg resources) + + Returns: + bool: True if all files are present, False otherwise + """ + if Path(self.constants.payload_local_binaries_root_path).exists(): logging.info("- Local PatcherSupportPkg resources available, continuing...") return True @@ -683,6 +864,10 @@ class PatchSysVolume: # Entry Function def start_patch(self): + """ + Entry function for the patching process + """ + logging.info("- Starting Patch Process") logging.info(f"- Determining Required Patch set for Darwin {self.constants.detected_os}") self.patch_set_dictionary = sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset @@ -716,7 +901,11 @@ class PatchSysVolume: else: logging.info("- Returning to main menu") - def start_unpatch(self): + def start_unpatch(self) -> None: + """ + Entry function for unpatching the root volume + """ + logging.info("- Starting Unpatch Process") if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is True: if self._mount_root_vol() is True: