Further modularize sys_patch

This commit is contained in:
Mykola Grymalyuk
2024-08-13 13:07:58 -06:00
parent c4cda81df6
commit 53dd5d3477
13 changed files with 548 additions and 363 deletions

View File

@@ -12,7 +12,7 @@ import markdown2
import subprocess import subprocess
import webbrowser import webbrowser
from .. import sys_patch_detect from ..detections import DetectRootPatch
from ... import constants from ... import constants
@@ -142,7 +142,7 @@ Please check the Github page for more information about this release."""
if utilities.check_seal() is True: if utilities.check_seal() is True:
logging.info("- Detected Snapshot seal intact, detecting patches") logging.info("- Detected Snapshot seal intact, detecting patches")
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set() patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
if not any(not patch.startswith("Settings") and not patch.startswith("Validation") and patches[patch] is True for patch in patches): if not any(not patch.startswith("Settings") and not patch.startswith("Validation") and patches[patch] is True for patch in patches):
patches = {} patches = {}
if patches: if patches:

View File

@@ -0,0 +1,5 @@
"""
detections: Detect and generate patch sets for the host
"""
from .detect import DetectRootPatch
from .generate import GenerateRootPatchSets

View File

@@ -1,5 +1,5 @@
""" """
sys_patch_detect.py: Hardware Detection Logic for Root Patching detect.py: Hardware Detection Logic for Root Patching
""" """
import logging import logging
@@ -9,18 +9,18 @@ import packaging.version
from pathlib import Path from pathlib import Path
from .. import constants from ... import constants
from ..detections import ( from ...detections import (
amfi_detect, amfi_detect,
device_probe device_probe
) )
from ..support import ( from ...support import (
kdk_handler, kdk_handler,
network_handler, network_handler,
utilities utilities
) )
from ..datasets import ( from ...datasets import (
cpu_data, cpu_data,
model_array, model_array,
os_data, os_data,

View File

@@ -1,14 +1,14 @@
""" """
sys_patch_generate.py: Class for generating patch sets for the current host generate.py: Class for generating patch sets for the current host
""" """
import logging import logging
from .. import constants from ... import constants
from ..datasets import sys_patch_dict from ...datasets import sys_patch_dict
from ..support import utilities from ...support import utilities
from ..detections import device_probe from ...detections import device_probe
class GenerateRootPatchSets: class GenerateRootPatchSets:

View File

@@ -38,7 +38,6 @@ This is because Apple removed on-disk binaries (ref: https://github.com/dortania
import logging import logging
import plistlib import plistlib
import subprocess import subprocess
import applescript
from pathlib import Path from pathlib import Path
@@ -46,6 +45,12 @@ from .mount import (
RootVolumeMount, RootVolumeMount,
APFSSnapshot APFSSnapshot
) )
from .utilities import (
install_new_file,
remove_file,
PatcherSupportPkgMount,
KernelDebugKitMerge
)
from .. import constants from .. import constants
@@ -54,16 +59,14 @@ from ..volume import generate_copy_arguments
from ..support import ( from ..support import (
utilities, utilities,
kdk_handler,
subprocess_wrapper subprocess_wrapper
) )
from . import ( from . import (
sys_patch_detect,
sys_patch_helpers, sys_patch_helpers,
sys_patch_generate,
kernelcache kernelcache
) )
from .auto_patcher import InstallAutomaticPatchingServices from .auto_patcher import InstallAutomaticPatchingServices
from .detections import DetectRootPatch, GenerateRootPatchSets
class PatchSysVolume: class PatchSysVolume:
@@ -81,33 +84,24 @@ class PatchSysVolume:
# GUI will detect hardware patches before starting PatchSysVolume() # GUI will detect hardware patches before starting PatchSysVolume()
# However the TUI will not, so allow for data to be passed in manually avoiding multiple calls # However the TUI will not, so allow for data to be passed in manually avoiding multiple calls
if hardware_details is None: if hardware_details is None:
hardware_details = sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).detect_patch_set() hardware_details = DetectRootPatch(self.computer.real_model, self.constants).detect_patch_set()
self.hardware_details = hardware_details self.hardware_details = hardware_details
self._init_pathing(custom_root_mount_path=None, custom_data_mount_path=None) self._init_pathing()
self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"] self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"]
self.mount_obj = RootVolumeMount(self.constants.detected_os) self.mount_obj = RootVolumeMount(self.constants.detected_os)
def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None: def _init_pathing(self) -> None:
""" """
Initializes the pathing for root volume patching Initializes the pathing for root volume patching
Parameters:
custom_root_mount_path (Path): Custom path to mount the root volume
custom_data_mount_path (Path): Custom path to mount the data volume
""" """
if custom_root_mount_path and custom_data_mount_path: self.mount_location_data = ""
self.mount_location = custom_root_mount_path if self.root_supports_snapshot is True:
self.data_mount_location = custom_data_mount_path
elif self.root_supports_snapshot is True:
# Big Sur and newer use APFS snapshots
self.mount_location = "/System/Volumes/Update/mnt1" self.mount_location = "/System/Volumes/Update/mnt1"
self.mount_location_data = ""
else: else:
self.mount_location = "" self.mount_location = ""
self.mount_location_data = ""
self.mount_extensions = f"{self.mount_location}/System/Library/Extensions" self.mount_extensions = f"{self.mount_location}/System/Library/Extensions"
self.mount_application_support = f"{self.mount_location_data}/Library/Application Support" self.mount_application_support = f"{self.mount_location_data}/Library/Application Support"
@@ -167,102 +161,11 @@ class PatchSysVolume:
save_hid_cs (bool): If True, will save the HID CS file before merging KDK save_hid_cs (bool): If True, will save the HID CS file before merging KDK
Required for USB 1.1 downgrades on Ventura and newer Required for USB 1.1 downgrades on Ventura and newer
""" """
self.kdk_path = KernelDebugKitMerge(
if self.skip_root_kmutil_requirement is True: self.constants,
return self.mount_location,
if self.constants.detected_os < os_data.os_data.ventura: self.skip_root_kmutil_requirement
return ).merge(save_hid_cs)
if self.constants.kdk_download_path.exists():
if kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path) is False:
logging.info("Failed to install KDK")
raise Exception("Failed to install KDK")
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
kdk_download_obj = kdk_obj.retrieve_download()
if not kdk_download_obj:
logging.info(f"Could not retrieve KDK: {kdk_obj.error_msg}")
# Hold thread until download is complete
kdk_download_obj.download(spawn_thread=False)
if kdk_download_obj.download_complete is False:
error_msg = kdk_download_obj.error_msg
logging.info(f"Could not download KDK: {error_msg}")
raise Exception(f"Could not download KDK: {error_msg}")
if kdk_obj.validate_kdk_checksum() is False:
logging.info(f"KDK checksum validation failed: {kdk_obj.error_msg}")
raise Exception(f"KDK checksum validation failed: {kdk_obj.error_msg}")
kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path)
# re-init kdk_obj to get the new kdk_installed_path
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
# We shouldn't get here, but just in case
logging.warning(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
raise Exception(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
kdk_path = Path(kdk_obj.kdk_installed_path) if kdk_obj.kdk_installed_path != "" else None
oclp_plist = Path("/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist")
if (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists() and oclp_plist.exists():
# KDK was already merged, check if the KDK used is the same as the one we're using
# If not, we'll rsync over with the new KDK
try:
oclp_plist_data = plistlib.load(open(oclp_plist, "rb"))
if "Kernel Debug Kit Used" in oclp_plist_data:
if oclp_plist_data["Kernel Debug Kit Used"] == str(kdk_path):
logging.info("- Matching KDK determined to already be merged, skipping")
return
except:
pass
if kdk_path is None:
logging.info(f"- Unable to find Kernel Debug Kit")
raise Exception("Unable to find Kernel Debug Kit")
self.kdk_path = kdk_path
logging.info(f"- Found KDK at: {kdk_path}")
# Due to some IOHIDFamily oddities, we need to ensure their CodeSignature is retained
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if save_hid_cs is True and cs_path.exists():
logging.info("- Backing up IOHIDEventDriver CodeSignature")
# Note it's a folder, not a file
subprocess_wrapper.run_as_root(generate_copy_arguments(cs_path, f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
logging.info(f"- Merging KDK with Root Volume: {kdk_path.name}")
subprocess_wrapper.run_as_root(
# Only merge '/System/Library/Extensions'
# 'Kernels' and 'KernelSupport' is wasted space for root patching (we don't care above dev kernels)
["/usr/bin/rsync", "-r", "-i", "-a", f"{kdk_path}/System/Library/Extensions/", f"{self.mount_location}/System/Library/Extensions"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
# During reversing, we found that kmutil uses this path to determine whether the KDK was successfully merged
# Best to verify now before we cause any damage
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
logging.info("- Failed to merge KDK with Root Volume")
raise Exception("Failed to merge KDK with Root Volume")
logging.info("- Successfully merged KDK with Root Volume")
# Restore IOHIDEventDriver CodeSignature
if save_hid_cs is True and Path(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak").exists():
logging.info("- Restoring IOHIDEventDriver CodeSignature")
if not cs_path.exists():
logging.info(" - CodeSignature folder missing, creating")
subprocess_wrapper.run_as_root(["/bin/mkdir", "-p", cs_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(generate_copy_arguments(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak", cs_path), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(["/bin/rm", "-rf", f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _unpatch_root_vol(self): def _unpatch_root_vol(self):
@@ -298,41 +201,48 @@ class PatchSysVolume:
Returns: Returns:
bool: True if successful, False if not bool: True if successful, False if not
""" """
if self._rebuild_kernel_cache() is False:
return False
if self._rebuild_kernel_collection() is True: self._update_preboot_kernel_cache()
self._update_preboot_kernel_cache() self._rebuild_dyld_shared_cache()
self._rebuild_dyld_shared_cache()
if self._create_new_apfs_snapshot() is True: if self._create_new_apfs_snapshot() is False:
self._unmount_root_vol() return False
logging.info("- Patching complete")
logging.info("\nPlease reboot the machine for patches to take effect") self._unmount_root_vol()
if self.needs_kmutil_exemptions is True:
logging.info("Note: Apple will require you to open System Preferences -> Security to allow the new kernel extensions to be loaded") logging.info("- Patching complete")
self.constants.root_patcher_succeeded = True logging.info("\nPlease reboot the machine for patches to take effect")
return True
return False if self.needs_kmutil_exemptions is True:
logging.info("Note: Apple will require you to open System Preferences -> Security to allow the new kernel extensions to be loaded")
self.constants.root_patcher_succeeded = True
return True
def _rebuild_kernel_collection(self) -> bool: def _rebuild_kernel_cache(self) -> bool:
""" """
Rebuilds the Kernel Collection Rebuilds the Kernel Cache
Supports following KC generation:
- Boot/SysKC (11.0+)
- AuxKC (11.0+)
- PrelinkedKernel (10.15-)
Returns:
bool: True if successful, False if not
""" """
return kernelcache.RebuildKernelCache( result = kernelcache.RebuildKernelCache(
os_version=self.constants.detected_os, os_version=self.constants.detected_os,
mount_location=self.mount_location, mount_location=self.mount_location,
auxiliary_cache=self.needs_kmutil_exemptions, auxiliary_cache=self.needs_kmutil_exemptions,
auxiliary_cache_only=self.skip_root_kmutil_requirement auxiliary_cache_only=self.skip_root_kmutil_requirement
).rebuild() ).rebuild()
if result is False:
return False
if self.skip_root_kmutil_requirement is False:
sys_patch_helpers.SysPatchHelpers(self.constants).install_rsr_repair_binary()
return True
def _create_new_apfs_snapshot(self) -> bool: def _create_new_apfs_snapshot(self) -> bool:
""" """
@@ -421,7 +331,7 @@ class PatchSysVolume:
if self.patch_set_dictionary != {}: if self.patch_set_dictionary != {}:
self._execute_patchset(self.patch_set_dictionary) self._execute_patchset(self.patch_set_dictionary)
else: else:
self._execute_patchset(sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset) self._execute_patchset(GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset)
if self.constants.wxpython_variant is True and self.constants.detected_os >= os_data.os_data.big_sur: if self.constants.wxpython_variant is True and self.constants.detected_os >= os_data.os_data.big_sur:
needs_daemon = False needs_daemon = False
@@ -459,7 +369,7 @@ class PatchSysVolume:
destination_folder_path = str(self.mount_location) + remove_patch_directory destination_folder_path = str(self.mount_location) + remove_patch_directory
else: else:
destination_folder_path = str(self.mount_location_data) + remove_patch_directory destination_folder_path = str(self.mount_location_data) + remove_patch_directory
self._remove_file(destination_folder_path, remove_patch_file) remove_file(destination_folder_path, remove_patch_file)
for method_install in ["Install", "Install Non-Root"]: for method_install in ["Install", "Install Non-Root"]:
@@ -494,7 +404,7 @@ class PatchSysVolume:
destination_folder_path = updated_destination_folder_path destination_folder_path = updated_destination_folder_path
self._install_new_file(source_folder_path, destination_folder_path, install_file) install_new_file(source_folder_path, destination_folder_path, install_file)
if "Processes" in required_patches[patch]: if "Processes" in required_patches[patch]:
for process in required_patches[patch]["Processes"]: for process in required_patches[patch]["Processes"]:
@@ -506,6 +416,7 @@ class PatchSysVolume:
else: else:
logging.info(f"- Running Process:\n{process}") logging.info(f"- Running Process:\n{process}")
subprocess_wrapper.run_and_verify(process, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) subprocess_wrapper.run_and_verify(process, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
if any(x in required_patches for x in ["AMD Legacy GCN", "AMD Legacy Polaris", "AMD Legacy Vega"]): if any(x in required_patches for x in ["AMD Legacy GCN", "AMD Legacy Polaris", "AMD Legacy Vega"]):
sys_patch_helpers.SysPatchHelpers(self.constants).disable_window_server_caching() sys_patch_helpers.SysPatchHelpers(self.constants).disable_window_server_caching()
if "Metal 3802 Common Extended" in required_patches: if "Metal 3802 Common Extended" in required_patches:
@@ -543,12 +454,13 @@ class PatchSysVolume:
for patch in required_patches: for patch in required_patches:
# Check if all files are present # Check if all files are present
for method_type in ["Install", "Install Non-Root"]: for method_type in ["Install", "Install Non-Root"]:
if method_type in required_patches[patch]: if method_type not in required_patches[patch]:
for install_patch_directory in required_patches[patch][method_type]: continue
for install_file in required_patches[patch][method_type][install_patch_directory]: for install_patch_directory in required_patches[patch][method_type]:
source_file = source_files_path + "/" + required_patches[patch][method_type][install_patch_directory][install_file] + install_patch_directory + "/" + install_file for install_file in required_patches[patch][method_type][install_patch_directory]:
if not Path(source_file).exists(): source_file = source_files_path + "/" + required_patches[patch][method_type][install_patch_directory][install_file] + install_patch_directory + "/" + install_file
raise Exception(f"Failed to find {source_file}") if not Path(source_file).exists():
raise Exception(f"Failed to find {source_file}")
# Ensure KDK is properly installed # Ensure KDK is properly installed
self._merge_kdk_with_root(save_hid_cs=True if "Legacy USB 1.1" in required_patches else False) self._merge_kdk_with_root(save_hid_cs=True if "Legacy USB 1.1" in required_patches else False)
@@ -556,188 +468,6 @@ class PatchSysVolume:
logging.info("- Finished Preflight, starting patching") logging.info("- Finished Preflight, starting patching")
def _install_new_file(self, source_folder: Path, destination_folder: Path, file_name: str) -> None:
"""
Installs a new file to the destination folder
File handling logic:
- .frameworks are merged with the destination folder
- Other files are deleted and replaced (ex. .kexts, .apps)
Parameters:
source_folder (Path): Path to the source folder
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to install
"""
file_name_str = str(file_name)
if not Path(destination_folder).exists():
logging.info(f" - Skipping {file_name}, cannot locate {source_folder}")
return
if file_name_str.endswith(".framework"):
# merge with rsync
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root(["/usr/bin/rsync", "-r", "-i", "-a", f"{source_folder}/{file_name}", f"{destination_folder}/"], stdout=subprocess.PIPE)
self._fix_permissions(destination_folder + "/" + file_name)
elif Path(source_folder + "/" + file_name_str).is_dir():
# Applicable for .kext, .app, .plugin, .bundle, all of which are directories
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self._fix_permissions(destination_folder + "/" + file_name)
else:
# Assume it's an individual file, replace as normal
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self._fix_permissions(destination_folder + "/" + file_name)
def _remove_file(self, destination_folder: Path, file_name: str) -> None:
"""
Removes a file from the destination folder
Parameters:
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to remove
"""
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Removing: {file_name}")
if Path(destination_folder + "/" + file_name).is_dir():
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _fix_permissions(self, destination_file: Path) -> None:
"""
Fix file permissions for a given file or directory
"""
chmod_args = ["/bin/chmod", "-Rf", "755", destination_file]
chown_args = ["/usr/sbin/chown", "-Rf", "root:wheel", destination_file]
if not Path(destination_file).is_dir():
# Strip recursive arguments
chmod_args.pop(1)
chown_args.pop(1)
subprocess_wrapper.run_as_root_and_verify(chmod_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _check_files(self) -> bool:
"""
Check if all files are present (primarily PatcherSupportPkg resources)
Returns:
bool: True if all files are present, False otherwise
"""
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
if Path(self.constants.payload_local_binaries_root_path_dmg).exists():
logging.info("- Local PatcherSupportPkg resources available, mounting...")
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_local_binaries_root_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-nobrowse",
"-shadow", Path(self.constants.payload_path / Path("Universal-Binaries_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount Universal-Binaries.dmg")
subprocess_wrapper.log(output)
return False
logging.info("- Mounted Universal-Binaries.dmg")
if self.constants.cli_mode is False and Path(self.constants.overlay_psp_path_dmg).exists() and Path("~/.dortania_developer").expanduser().exists():
icon_path = str(self.constants.app_icon_path).replace("/", ":")[1:]
msg = "Welcome to the DortaniaInternal Program, please provided the decryption key to access internal resources. Press cancel to skip."
password = Path("~/.dortania_developer_key").expanduser().read_text().strip() if Path("~/.dortania_developer_key").expanduser().exists() else ""
for i in range(3):
try:
if password == "":
password = applescript.AppleScript(
f"""
set theResult to display dialog "{msg}" default answer "" with hidden answer with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
return the text returned of theResult
"""
).run()
result = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.overlay_psp_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("DortaniaInternal")),
"-nobrowse",
"-passphrase", password
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode == 0:
logging.info("- Mounted DortaniaInternal resources")
result = subprocess.run(
[
"/usr/bin/ditto", f"{self.constants.payload_path / Path('DortaniaInternal')}", f"{self.constants.payload_path / Path('Universal-Binaries')}"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode == 0:
return True
logging.info("- Failed to merge DortaniaInternal resources")
subprocess_wrapper.log(result)
return False
logging.info("- Failed to mount DortaniaInternal resources")
subprocess_wrapper.log(result)
if "Authentication error" not in result.stdout.decode():
try:
# Display that the disk image might be corrupted
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, please file an internal radar:\n\n{result.stdout.decode()}" with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
"""
).run()
return False
except Exception as e:
pass
break
msg = f"Decryption failed, please try again. {2 - i} attempts remaining. "
password = ""
if i == 2:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, too many incorrect passwords. If this continues with the correct decryption key, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
"""
).run()
return False
except Exception as e:
break
return True
logging.info("- PatcherSupportPkg resources missing, Patcher likely corrupted!!!")
return False
# Entry Function # Entry Function
def start_patch(self): def start_patch(self):
""" """
@@ -746,27 +476,33 @@ class PatchSysVolume:
logging.info("- Starting Patch Process") logging.info("- Starting Patch Process")
logging.info(f"- Determining Required Patch set for Darwin {self.constants.detected_os}") logging.info(f"- Determining Required Patch set for Darwin {self.constants.detected_os}")
self.patch_set_dictionary = sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset self.patch_set_dictionary = GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset
if self.patch_set_dictionary == {}: if self.patch_set_dictionary == {}:
logging.info("- No Root Patches required for your machine!") logging.info("- No Root Patches required for your machine!")
return return
logging.info("- Verifying whether Root Patching possible") logging.info("- Verifying whether Root Patching possible")
if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=not self.constants.wxpython_variant) is False: if DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=not self.constants.wxpython_variant) is False:
logging.error("- Cannot continue with patching!!!") logging.error("- Cannot continue with patching!!!")
return return
logging.info("- Patcher is capable of patching") logging.info("- Patcher is capable of patching")
if self._check_files(): if PatcherSupportPkgMount(self.constants).mount() is False:
if self._mount_root_vol() is True: logging.error("- Critical resources missing, cannot continue with patching!!!")
if self._run_sanity_checks(): return
self._patch_root_vol()
else: if self._mount_root_vol() is False:
self._unmount_root_vol() logging.error("- Failed to mount root volume, cannot continue with patching!!!")
logging.info("- Please ensure that you do not have any updates pending") return
else:
logging.info("- Recommend rebooting the machine and trying to patch again") if self._run_sanity_checks() is False:
self._unmount_root_vol()
logging.error("- Failed sanity checks, cannot continue with patching!!!")
logging.error("- Please ensure that you do not have any updates pending")
return
self._patch_root_vol()
def start_unpatch(self) -> None: def start_unpatch(self) -> None:
@@ -775,11 +511,12 @@ class PatchSysVolume:
""" """
logging.info("- Starting Unpatch Process") logging.info("- Starting Unpatch Process")
if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is False: if DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is False:
logging.error("- Cannot continue with unpatching!!!") logging.error("- Cannot continue with unpatching!!!")
return return
if self._mount_root_vol() is True: if self._mount_root_vol() is False:
self._unpatch_root_vol() logging.error("- Failed to mount root volume, cannot continue with unpatching!!!")
else: return
logging.info("- Recommend rebooting the machine and trying to patch again")
self._unpatch_root_vol()

View File

@@ -82,7 +82,7 @@ class SysPatchHelpers:
Generate patchset file for user reference Generate patchset file for user reference
Parameters: Parameters:
patchset (dict): Dictionary of patchset, see sys_patch_detect.py and sys_patch_dict.py patchset (dict): Dictionary of patchset, see detect.py and sys_patch_dict.py
file_name (str): Name of the file to write to file_name (str): Name of the file to write to
kdk_used (Path): Path to the KDK used, if any kdk_used (Path): Path to the KDK used, if any

View File

@@ -0,0 +1,6 @@
"""
utilities: General utility functions for root volume patching
"""
from .files import install_new_file, remove_file, fix_permissions
from .dmg_mount import PatcherSupportPkgMount
from .kdk_merge import KernelDebugKitMerge

View File

@@ -0,0 +1,181 @@
"""
dmg_mount.py: PatcherSupportPkg DMG Mounting. Handles Universal-Binaries and DortaniaInternalResources DMGs.
"""
import logging
import subprocess
import applescript
from pathlib import Path
from ... import constants
from ...support import subprocess_wrapper
class PatcherSupportPkgMount:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self.icon_path = str(self.constants.app_icon_path).replace("/", ":")[1:]
def _mount_universal_binaries_dmg(self) -> bool:
"""
Mount PatcherSupportPkg's Universal-Binaries.dmg
"""
if not Path(self.constants.payload_local_binaries_root_path_dmg).exists():
logging.info("- PatcherSupportPkg resources missing, Patcher likely corrupted!!!")
return False
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_local_binaries_root_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-nobrowse",
"-shadow", Path(self.constants.payload_path / Path("Universal-Binaries_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount Universal-Binaries.dmg")
subprocess_wrapper.log(output)
return False
logging.info("- Mounted Universal-Binaries.dmg")
return True
def _mount_dortania_internal_resources_dmg(self) -> bool:
"""
Mount PatcherSupportPkg's DortaniaInternalResources.dmg (if available)
"""
if not Path(self.constants.overlay_psp_path_dmg).exists():
return True
if not Path("~/.dortania_developer").expanduser().exists():
return True
if self.constants.cli_mode is True:
return True
logging.info("- Found DortaniaInternal resources, mounting...")
for i in range(3):
key = self._request_decryption_key(i)
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.overlay_psp_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("DortaniaInternal")),
"-nobrowse",
"-passphrase", key
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount DortaniaInternal resources")
subprocess_wrapper.log(output)
if "Authentication error" not in output.stdout.decode():
self._display_authentication_error()
if i == 2:
self._display_too_many_attempts()
return False
logging.info("- Mounted DortaniaInternal resources")
return self._merge_dortania_internal_resources()
def _merge_dortania_internal_resources(self) -> bool:
"""
Merge DortaniaInternal resources with Universal-Binaries
"""
result = subprocess.run(
[
"/usr/bin/ditto", f"{self.constants.payload_path / Path('DortaniaInternal')}", f"{self.constants.payload_path / Path('Universal-Binaries')}"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode != 0:
logging.info("- Failed to merge DortaniaInternal resources")
subprocess_wrapper.log(result)
return False
return True
def _request_decryption_key(self, attempt: int) -> str:
"""
Fetch the decryption key for DortaniaInternalResources.dmg
"""
# Only return on first attempt
if attempt == 0:
if Path("~/.dortania_developer_key").expanduser().exists():
return Path("~/.dortania_developer_key").expanduser().read_text().strip()
password = ""
msg = "Welcome to the DortaniaInternal Program, please provided the decryption key to access internal resources. Press cancel to skip."
if attempt > 0:
msg = f"Decryption failed, please try again. {2 - attempt} attempts remaining. "
try:
password = applescript.AppleScript(
f"""
set theResult to display dialog "{msg}" default answer "" with hidden answer with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
return the text returned of theResult
"""
).run()
except Exception as e:
pass
return password
def _display_authentication_error(self) -> None:
"""
Display authentication error dialog
"""
try:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
"""
).run()
except Exception as e:
pass
def _display_too_many_attempts(self) -> None:
"""
Display too many attempts dialog
"""
try:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, too many incorrect passwords. If this continues with the correct decryption key, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
"""
).run()
except Exception as e:
pass
def mount(self) -> bool:
"""
Mount PatcherSupportPkg resources
Returns:
bool: True if all resources are mounted, False otherwise
"""
# If already mounted, skip
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
if self._mount_universal_binaries_dmg() is False:
return False
if self._mount_dortania_internal_resources_dmg() is False:
return False
return True

View File

@@ -0,0 +1,88 @@
"""
utilities.py: Supporting functions for file handling during root volume patching
"""
import logging
import subprocess
from pathlib import Path
from ...volume import generate_copy_arguments
from ...support import subprocess_wrapper
def install_new_file(source_folder: Path, destination_folder: Path, file_name: str) -> None:
"""
Installs a new file to the destination folder
File handling logic:
- .frameworks are merged with the destination folder
- Other files are deleted and replaced (ex. .kexts, .apps)
Parameters:
source_folder (Path): Path to the source folder
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to install
"""
file_name_str = str(file_name)
if not Path(destination_folder).exists():
logging.info(f" - Skipping {file_name}, cannot locate {source_folder}")
return
if file_name_str.endswith(".framework"):
# merge with rsync
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root(["/usr/bin/rsync", "-r", "-i", "-a", f"{source_folder}/{file_name}", f"{destination_folder}/"], stdout=subprocess.PIPE)
fix_permissions(destination_folder + "/" + file_name)
elif Path(source_folder + "/" + file_name_str).is_dir():
# Applicable for .kext, .app, .plugin, .bundle, all of which are directories
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
fix_permissions(destination_folder + "/" + file_name)
else:
# Assume it's an individual file, replace as normal
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
fix_permissions(destination_folder + "/" + file_name)
def remove_file(destination_folder: Path, file_name: str) -> None:
"""
Removes a file from the destination folder
Parameters:
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to remove
"""
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Removing: {file_name}")
if Path(destination_folder + "/" + file_name).is_dir():
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def fix_permissions(destination_file: Path) -> None:
"""
Fix file permissions for a given file or directory
"""
chmod_args = ["/bin/chmod", "-Rf", "755", destination_file]
chown_args = ["/usr/sbin/chown", "-Rf", "root:wheel", destination_file]
if not Path(destination_file).is_dir():
# Strip recursive arguments
chmod_args.pop(1)
chown_args.pop(1)
subprocess_wrapper.run_as_root_and_verify(chmod_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

View File

@@ -0,0 +1,167 @@
import logging
import subprocess
import plistlib
from pathlib import Path
from ... import constants
from ...datasets import os_data
from ...support import subprocess_wrapper, kdk_handler
from ...volume import generate_copy_arguments
class KernelDebugKitMerge:
def __init__(self, global_constants: constants.Constants, mount_location: str, skip_root_kmutil_requirement: bool) -> None:
self.constants: constants.Constants = global_constants
self.mount_location = mount_location
self.skip_root_kmutil_requirement = skip_root_kmutil_requirement
def _matching_kdk_already_merged(self, kdk_path: str) -> bool:
"""
Check whether the KDK is already merged with the root volume
"""
oclp_plist = Path("/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist")
if not oclp_plist.exists():
return False
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
return False
try:
oclp_plist_data = plistlib.load(open(oclp_plist, "rb"))
if "Kernel Debug Kit Used" not in oclp_plist_data:
return False
if oclp_plist_data["Kernel Debug Kit Used"] == str(kdk_path):
logging.info("- Matching KDK determined to already be merged, skipping")
return True
except:
pass
return False
def _backup_hid_cs(self) -> None:
"""
Due to some IOHIDFamily oddities, we need to ensure their CodeSignature is retained
"""
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if not cs_path.exists():
return
logging.info("- Backing up IOHIDEventDriver CodeSignature")
subprocess_wrapper.run_as_root(generate_copy_arguments(cs_path, f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _restore_hid_cs(self) -> None:
"""
Restore IOHIDEventDriver CodeSignature
"""
if not Path(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak").exists():
return
logging.info("- Restoring IOHIDEventDriver CodeSignature")
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if not cs_path.exists():
logging.info(" - CodeSignature folder missing, creating")
subprocess_wrapper.run_as_root(["/bin/mkdir", "-p", cs_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(generate_copy_arguments(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak", cs_path), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(["/bin/rm", "-rf", f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _merge_kdk(self, kdk_path: str) -> None:
"""
Merge Kernel Debug Kit (KDK) with the root volume
"""
logging.info(f"- Merging KDK with Root Volume: {Path(kdk_path).name}")
subprocess_wrapper.run_as_root(
# Only merge '/System/Library/Extensions'
# 'Kernels' and 'KernelSupport' is wasted space for root patching (we don't care above dev kernels)
["/usr/bin/rsync", "-r", "-i", "-a", f"{kdk_path}/System/Library/Extensions/", f"{self.mount_location}/System/Library/Extensions"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
logging.info("- Failed to merge KDK with Root Volume")
raise Exception("Failed to merge KDK with Root Volume")
logging.info("- Successfully merged KDK with Root Volume")
def merge(self, save_hid_cs: bool = False) -> str:
"""
Merge the Kernel Debug Kit (KDK) with the root volume
Returns KDK used
"""
if self.skip_root_kmutil_requirement is True:
return None
if self.constants.detected_os < os_data.os_data.ventura:
return None
# If a KDK was pre-downloaded, install it
if self.constants.kdk_download_path.exists():
if kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path) is False:
logging.info("Failed to install KDK")
raise Exception("Failed to install KDK")
# Next, grab KDK information (ie. what's the latest KDK for this OS)
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
# If no KDK is installed, download and install it
if kdk_obj.kdk_already_installed is False:
kdk_download_obj = kdk_obj.retrieve_download()
if not kdk_download_obj:
logging.info(f"Could not retrieve KDK: {kdk_obj.error_msg}")
raise Exception(f"Could not retrieve KDK: {kdk_obj.error_msg}")
# Hold thread until download is complete
kdk_download_obj.download(spawn_thread=False)
if kdk_download_obj.download_complete is False:
error_msg = kdk_download_obj.error_msg
logging.info(f"Could not download KDK: {error_msg}")
raise Exception(f"Could not download KDK: {error_msg}")
if kdk_obj.validate_kdk_checksum() is False:
logging.info(f"KDK checksum validation failed: {kdk_obj.error_msg}")
raise Exception(f"KDK checksum validation failed: {kdk_obj.error_msg}")
kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path)
# re-init kdk_obj to get the new kdk_installed_path
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
# We shouldn't get here, but just in case
logging.warning(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
raise Exception(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
kdk_path = Path(kdk_obj.kdk_installed_path) if kdk_obj.kdk_installed_path != "" else None
if kdk_path is None:
logging.info(f"- Unable to find Kernel Debug Kit")
raise Exception("Unable to find Kernel Debug Kit")
logging.info(f"- Found KDK at: {kdk_path}")
if self._matching_kdk_already_merged(kdk_path):
return kdk_path
if save_hid_cs is True:
self._backup_hid_cs()
self._merge_kdk(kdk_path)
if save_hid_cs is True:
self._restore_hid_cs()
return kdk_path

View File

@@ -12,7 +12,7 @@ from Cocoa import NSApp, NSApplication
from .. import constants from .. import constants
from ..sys_patch import sys_patch_detect from ..sys_patch.detections import DetectRootPatch
from ..wx_gui import ( from ..wx_gui import (
gui_cache_os_update, gui_cache_os_update,
@@ -64,7 +64,7 @@ class EntryPoint:
if "--gui_patch" in sys.argv or "--gui_unpatch" in sys.argv or start_patching is True : if "--gui_patch" in sys.argv or "--gui_unpatch" in sys.argv or start_patching is True :
entry = gui_sys_patch_start.SysPatchStartFrame entry = gui_sys_patch_start.SysPatchStartFrame
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set() patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
logging.info(f"Entry point set: {entry.__name__}") logging.info(f"Entry point set: {entry.__name__}")

View File

@@ -11,7 +11,7 @@ from pathlib import Path
from .. import constants from .. import constants
from ..sys_patch import sys_patch_detect from ..sys_patch.detections import DetectRootPatch
from ..wx_gui import ( from ..wx_gui import (
gui_main_menu, gui_main_menu,
@@ -86,7 +86,7 @@ class SysPatchDisplayFrame(wx.Frame):
patches: dict = {} patches: dict = {}
def _fetch_patches(self) -> None: def _fetch_patches(self) -> None:
nonlocal patches nonlocal patches
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set() patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
thread = threading.Thread(target=_fetch_patches, args=(self,)) thread = threading.Thread(target=_fetch_patches, args=(self,))
thread.start() thread.start()

View File

@@ -20,7 +20,6 @@ from ..support import kdk_handler
from ..sys_patch import ( from ..sys_patch import (
sys_patch, sys_patch,
sys_patch_detect
) )
from ..wx_gui import ( from ..wx_gui import (
gui_main_menu, gui_main_menu,
@@ -28,6 +27,8 @@ from ..wx_gui import (
gui_download, gui_download,
) )
from ..sys_patch.detections import DetectRootPatch
class SysPatchStartFrame(wx.Frame): class SysPatchStartFrame(wx.Frame):
@@ -50,7 +51,7 @@ class SysPatchStartFrame(wx.Frame):
self.Centre() self.Centre()
if self.patches == {}: if self.patches == {}:
self.patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set() self.patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
def _kdk_download(self, frame: wx.Frame = None) -> bool: def _kdk_download(self, frame: wx.Frame = None) -> bool: