Merge pull request #1146 from dortania/kernel-management

Modularize System Volume Patching System
This commit is contained in:
Mykola Grymalyuk
2024-08-14 09:18:38 -06:00
committed by GitHub
27 changed files with 1328 additions and 797 deletions

View File

@@ -17,15 +17,14 @@ from .. import constants
from ..wx_gui import gui_entry
from ..efi_builder import build
from ..sys_patch import sys_patch
from ..sys_patch.auto_patcher import StartAutomaticPatching
from ..datasets import (
model_array,
os_data
)
from ..sys_patch import (
sys_patch,
sys_patch_auto
)
from . import (
utilities,
defaults,
@@ -118,7 +117,7 @@ class arguments:
"""
logging.info("Set Auto patching")
sys_patch_auto.AutomaticSysPatch(self.constants).start_auto_patch()
StartAutomaticPatching(self.constants).start_auto_patch()
def _prepare_for_update_handler(self) -> None:

View File

@@ -0,0 +1,17 @@
"""
auto_patcher: Automatic system volume patching after updates, etc.
Usage:
>>> # Installing launch services
>>> from auto_patcher import InstallAutomaticPatchingServices
>>> InstallAutomaticPatchingServices(self.constants).install_auto_patcher_launch_agent()
>>> # When patching the system volume (ex. launch service)
>>> from auto_patcher import StartAutomaticPatching
>>> StartAutomaticPatching(self.constants).start_auto_patch()
"""
from .install import InstallAutomaticPatchingServices
from .start import StartAutomaticPatching

View File

@@ -0,0 +1,116 @@
"""
install.py: Install the auto patcher launch services
"""
import hashlib
import logging
import plistlib
import subprocess
from pathlib import Path
from ... import constants
from ...volume import generate_copy_arguments
from ...support import (
utilities,
subprocess_wrapper
)
class InstallAutomaticPatchingServices:
"""
Install the auto patcher launch services
"""
def __init__(self, global_constants: constants.Constants):
self.constants: constants.Constants = global_constants
def install_auto_patcher_launch_agent(self, kdk_caching_needed: bool = False):
"""
Install patcher launch services
See start_auto_patch() comments for more info
"""
if self.constants.launcher_script is not None:
logging.info("- Skipping Auto Patcher Launch Agent, not supported when running from source")
return
services = {
self.constants.auto_patch_launch_agent_path: "/Library/LaunchAgents/com.dortania.opencore-legacy-patcher.auto-patch.plist",
self.constants.update_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.macos-update.plist",
**({ self.constants.rsr_monitor_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.rsr-monitor.plist" } if self._create_rsr_monitor_daemon() else {}),
**({ self.constants.kdk_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.os-caching.plist" } if kdk_caching_needed is True else {} ),
}
for service in services:
name = Path(service).name
logging.info(f"- Installing {name}")
if Path(services[service]).exists():
if hashlib.sha256(open(service, "rb").read()).hexdigest() == hashlib.sha256(open(services[service], "rb").read()).hexdigest():
logging.info(f" - {name} checksums match, skipping")
continue
logging.info(f" - Existing service found, removing")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Create parent directories
if not Path(services[service]).parent.exists():
logging.info(f" - Creating {Path(services[service]).parent} directory")
subprocess_wrapper.run_as_root_and_verify(["/bin/mkdir", "-p", Path(services[service]).parent], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(service, services[service]), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Set the permissions on the service
subprocess_wrapper.run_as_root_and_verify(["/bin/chmod", "644", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(["/usr/sbin/chown", "root:wheel", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _create_rsr_monitor_daemon(self) -> bool:
# Get kext list in /Library/Extensions that have the 'GPUCompanionBundles' property
# This is used to determine if we need to run the RSRMonitor
logging.info("- Checking if RSRMonitor is needed")
cryptex_path = f"/System/Volumes/Preboot/{utilities.get_preboot_uuid()}/cryptex1/current/OS.dmg"
if not Path(cryptex_path).exists():
logging.info("- No OS.dmg, skipping RSRMonitor")
return False
kexts = []
for kext in Path("/Library/Extensions").glob("*.kext"):
if not Path(f"{kext}/Contents/Info.plist").exists():
continue
try:
kext_plist = plistlib.load(open(f"{kext}/Contents/Info.plist", "rb"))
except Exception as e:
logging.info(f" - Failed to load plist for {kext.name}: {e}")
continue
if "GPUCompanionBundles" not in kext_plist:
continue
logging.info(f" - Found kext with GPUCompanionBundles: {kext.name}")
kexts.append(kext.name)
# If we have no kexts, we don't need to run the RSRMonitor
if not kexts:
logging.info("- No kexts found with GPUCompanionBundles, skipping RSRMonitor")
return False
# Load the RSRMonitor plist
rsr_monitor_plist = plistlib.load(open(self.constants.rsr_monitor_launch_daemon_path, "rb"))
arguments = ["/bin/rm", "-Rfv"]
arguments += [f"/Library/Extensions/{kext}" for kext in kexts]
# Add the arguments to the RSRMonitor plist
rsr_monitor_plist["ProgramArguments"] = arguments
# Next add monitoring for '/System/Volumes/Preboot/{UUID}/cryptex1/OS.dmg'
logging.info(f" - Adding monitor: {cryptex_path}")
rsr_monitor_plist["WatchPaths"] = [
cryptex_path,
]
# Write the RSRMonitor plist
plistlib.dump(rsr_monitor_plist, Path(self.constants.rsr_monitor_launch_daemon_path).open("wb"))
return True

View File

@@ -1,11 +1,10 @@
"""
sys_patch_auto.py: Library of functions for launch services, including automatic patching
start.py: Start automatic patching of host
"""
import wx
import wx.html2
import hashlib
import logging
import plistlib
import requests
@@ -13,31 +12,27 @@ import markdown2
import subprocess
import webbrowser
from pathlib import Path
from ..detections import DetectRootPatch
from . import sys_patch_detect
from ... import constants
from .. import constants
from ...datasets import css_data
from ..datasets import css_data
from ..volume import generate_copy_arguments
from ..wx_gui import (
from ...wx_gui import (
gui_entry,
gui_support
)
from ..support import (
from ...support import (
utilities,
updates,
global_settings,
network_handler,
subprocess_wrapper
)
class AutomaticSysPatch:
class StartAutomaticPatching:
"""
Library of functions for launch agent, including automatic patching
Start automatic patching of host
"""
def __init__(self, global_constants: constants.Constants):
@@ -147,7 +142,7 @@ Please check the Github page for more information about this release."""
if utilities.check_seal() is True:
logging.info("- Detected Snapshot seal intact, detecting patches")
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
if not any(not patch.startswith("Settings") and not patch.startswith("Validation") and patches[patch] is True for patch in patches):
patches = {}
if patches:
@@ -317,92 +312,4 @@ Please check the Github page for more information about this release."""
gui_entry.EntryPoint(self.constants).start(entry=gui_entry.SupportedEntryPoints.BUILD_OC)
except KeyError:
logging.info("- Unable to determine if boot disk is removable, skipping prompt")
def install_auto_patcher_launch_agent(self, kdk_caching_needed: bool = False):
"""
Install patcher launch services
See start_auto_patch() comments for more info
"""
if self.constants.launcher_script is not None:
logging.info("- Skipping Auto Patcher Launch Agent, not supported when running from source")
return
services = {
self.constants.auto_patch_launch_agent_path: "/Library/LaunchAgents/com.dortania.opencore-legacy-patcher.auto-patch.plist",
self.constants.update_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.macos-update.plist",
**({ self.constants.rsr_monitor_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.rsr-monitor.plist" } if self._create_rsr_monitor_daemon() else {}),
**({ self.constants.kdk_launch_daemon_path: "/Library/LaunchDaemons/com.dortania.opencore-legacy-patcher.os-caching.plist" } if kdk_caching_needed is True else {} ),
}
for service in services:
name = Path(service).name
logging.info(f"- Installing {name}")
if Path(services[service]).exists():
if hashlib.sha256(open(service, "rb").read()).hexdigest() == hashlib.sha256(open(services[service], "rb").read()).hexdigest():
logging.info(f" - {name} checksums match, skipping")
continue
logging.info(f" - Existing service found, removing")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Create parent directories
if not Path(services[service]).parent.exists():
logging.info(f" - Creating {Path(services[service]).parent} directory")
subprocess_wrapper.run_as_root_and_verify(["/bin/mkdir", "-p", Path(services[service]).parent], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(service, services[service]), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Set the permissions on the service
subprocess_wrapper.run_as_root_and_verify(["/bin/chmod", "644", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(["/usr/sbin/chown", "root:wheel", services[service]], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _create_rsr_monitor_daemon(self) -> bool:
# Get kext list in /Library/Extensions that have the 'GPUCompanionBundles' property
# This is used to determine if we need to run the RSRMonitor
logging.info("- Checking if RSRMonitor is needed")
cryptex_path = f"/System/Volumes/Preboot/{utilities.get_preboot_uuid()}/cryptex1/current/OS.dmg"
if not Path(cryptex_path).exists():
logging.info("- No OS.dmg, skipping RSRMonitor")
return False
kexts = []
for kext in Path("/Library/Extensions").glob("*.kext"):
if not Path(f"{kext}/Contents/Info.plist").exists():
continue
try:
kext_plist = plistlib.load(open(f"{kext}/Contents/Info.plist", "rb"))
except Exception as e:
logging.info(f" - Failed to load plist for {kext.name}: {e}")
continue
if "GPUCompanionBundles" not in kext_plist:
continue
logging.info(f" - Found kext with GPUCompanionBundles: {kext.name}")
kexts.append(kext.name)
# If we have no kexts, we don't need to run the RSRMonitor
if not kexts:
logging.info("- No kexts found with GPUCompanionBundles, skipping RSRMonitor")
return False
# Load the RSRMonitor plist
rsr_monitor_plist = plistlib.load(open(self.constants.rsr_monitor_launch_daemon_path, "rb"))
arguments = ["/bin/rm", "-Rfv"]
arguments += [f"/Library/Extensions/{kext}" for kext in kexts]
# Add the arguments to the RSRMonitor plist
rsr_monitor_plist["ProgramArguments"] = arguments
# Next add monitoring for '/System/Volumes/Preboot/{UUID}/cryptex1/OS.dmg'
logging.info(f" - Adding monitor: {cryptex_path}")
rsr_monitor_plist["WatchPaths"] = [
cryptex_path,
]
# Write the RSRMonitor plist
plistlib.dump(rsr_monitor_plist, Path(self.constants.rsr_monitor_launch_daemon_path).open("wb"))
return True
logging.info("- Unable to determine if boot disk is removable, skipping prompt")

View File

@@ -0,0 +1,5 @@
"""
detections: Detect and generate patch sets for the host
"""
from .detect import DetectRootPatch
from .generate import GenerateRootPatchSets

View File

@@ -1,5 +1,5 @@
"""
sys_patch_detect.py: Hardware Detection Logic for Root Patching
detect.py: Hardware Detection Logic for Root Patching
"""
import logging
@@ -9,18 +9,18 @@ import packaging.version
from pathlib import Path
from .. import constants
from ... import constants
from ..detections import (
from ...detections import (
amfi_detect,
device_probe
)
from ..support import (
from ...support import (
kdk_handler,
network_handler,
utilities
)
from ..datasets import (
from ...datasets import (
cpu_data,
model_array,
os_data,

View File

@@ -1,14 +1,14 @@
"""
sys_patch_generate.py: Class for generating patch sets for the current host
generate.py: Class for generating patch sets for the current host
"""
import logging
from .. import constants
from ... import constants
from ..datasets import sys_patch_dict
from ..support import utilities
from ..detections import device_probe
from ...datasets import sys_patch_dict
from ...support import utilities
from ...detections import device_probe
class GenerateRootPatchSets:

View File

@@ -0,0 +1,11 @@
"""
kernelcache: Library for rebuilding macOS kernelcache files.
Usage:
>>> from kernelcache import RebuildKernelCache
>>> RebuildKernelCache(os_version, mount_location, auxiliary_cache, auxiliary_cache_only).rebuild()
"""
from .rebuild import RebuildKernelCache
from .kernel_collection.support import KernelCacheSupport

View File

@@ -0,0 +1,8 @@
"""
cache.py: Base class for kernel cache management
"""
class BaseKernelCache:
def rebuild(self) -> None:
raise NotImplementedError("To be implemented in subclass")

View File

@@ -0,0 +1,72 @@
"""
auxiliary.py: Auxiliary Kernel Collection management
"""
import logging
import subprocess
from ..base.cache import BaseKernelCache
from ....support import subprocess_wrapper
class AuxiliaryKernelCollection(BaseKernelCache):
def __init__(self, mount_location: str) -> None:
self.mount_location = mount_location
def _kmutil_arguments(self) -> list[str]:
args = ["/usr/bin/kmutil", "create", "--allow-missing-kdk"]
args.append("--new")
args.append("aux")
args.append("--boot-path")
args.append(f"{self.mount_location}/System/Library/KernelCollections/BootKernelExtensions.kc")
args.append("--system-path")
args.append(f"{self.mount_location}/System/Library/KernelCollections/SystemKernelExtensions.kc")
return args
def _force_auxiliary_usage(self) -> bool:
"""
Force the auxiliary kernel collection to be used.
This is required as Apple doesn't offer a public way
to rebuild the auxiliary kernel collection. Instead deleting
necessary files and directories will force the newly built
collection to be used.
"""
print("- Forcing Auxiliary Kernel Collection usage")
result = subprocess_wrapper.run_as_root(["/usr/bin/killall", "syspolicyd", "kernelmanagerd"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("- Unable to kill syspolicyd and kernelmanagerd")
subprocess_wrapper.log(result)
return False
for file in ["KextPolicy", "KextPolicy-shm", "KextPolicy-wal"]:
result = subprocess_wrapper.run_as_root(["/bin/rm", f"/private/var/db/SystemPolicyConfiguration/{file}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info(f"- Unable to remove {file}")
subprocess_wrapper.log(result)
return False
return True
def rebuild(self) -> None:
logging.info("- Building new Auxiliary Kernel Collection")
result = subprocess_wrapper.run_as_root(self._kmutil_arguments(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("- Unable to build Auxiliary Kernel Collection")
subprocess_wrapper.log(result)
return False
if self._force_auxiliary_usage() is False:
return False
return True

View File

@@ -0,0 +1,62 @@
"""
boot_system.py: Boot and System Kernel Collection management
"""
import logging
import subprocess
from ..base.cache import BaseKernelCache
from ....support import subprocess_wrapper
from ....datasets import os_data
class BootSystemKernelCollections(BaseKernelCache):
def __init__(self, mount_location: str, detected_os: int, auxiliary_kc: bool) -> None:
self.mount_location = mount_location
self.detected_os = detected_os
self.auxiliary_kc = auxiliary_kc
def _kmutil_arguments(self) -> list[str]:
"""
Generate kmutil arguments for creating or updating
the boot, system and auxiliary kernel collections
"""
args = ["/usr/bin/kmutil"]
if self.detected_os >= os_data.os_data.ventura:
args.append("create")
args.append("--allow-missing-kdk")
else:
args.append("install")
args.append("--volume-root")
args.append(self.mount_location)
args.append("--update-all")
args.append("--variant-suffix")
args.append("release")
if self.auxiliary_kc is True:
# Following arguments are supposed to skip kext consent
# prompts when creating auxiliary KCs with SIP disabled
args.append("--no-authentication")
args.append("--no-authorization")
return args
def rebuild(self) -> bool:
logging.info(f"- Rebuilding {'Boot and System' if self.auxiliary_kc is False else 'Boot, System and Auxiliary'} Kernel Collections")
if self.auxiliary_kc is True:
logging.info(" (You will get a prompt by System Preferences, ignore for now)")
result = subprocess_wrapper.run_as_root(self._kmutil_arguments(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
subprocess_wrapper.log(result)
return False
return True

View File

@@ -0,0 +1,162 @@
"""
support.py: Kernel Cache support functions
"""
import logging
import plistlib
from pathlib import Path
from datetime import datetime
from ....datasets import os_data
from ....support import subprocess_wrapper
class KernelCacheSupport:
def __init__(self, mount_location_data: str, detected_os: int, skip_root_kmutil_requirement: bool) -> None:
self.mount_location_data = mount_location_data
self.detected_os = detected_os
self.skip_root_kmutil_requirement = skip_root_kmutil_requirement
def check_kexts_needs_authentication(self, kext_name: str) -> bool:
"""
Verify whether the user needs to authenticate in System Preferences
Sets 'needs_to_open_preferences' to True if the kext is not in the AuxKC
Logic:
Under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist'
["kextsToBuild"][i]:
["bundlePathMainOS"] = /Library/Extensions/Test.kext
["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed)
["teamID"] = Team ID (blank on ad-hoc signed)
To grab the CDHash of a kext, run 'codesign -dvvv <kext_path>'
"""
try:
aux_cache_path = Path(self.mount_location_data) / Path("/private/var/db/KernelExtensionManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist")
if aux_cache_path.exists():
aux_cache_data = plistlib.load((aux_cache_path).open("rb"))
for kext in aux_cache_data["kextsToBuild"]:
if "bundlePathMainOS" in aux_cache_data["kextsToBuild"][kext]:
if aux_cache_data["kextsToBuild"][kext]["bundlePathMainOS"] == f"/Library/Extensions/{kext_name}":
return False
except PermissionError:
pass
logging.info(f" - {kext_name} requires authentication in System Preferences")
return True
def add_auxkc_support(self, install_file: str, source_folder_path: str, install_patch_directory: str, destination_folder_path: str) -> str:
"""
Patch provided Kext to support Auxiliary Kernel Collection
Logic:
In macOS Ventura, KDKs are required to build new Boot and System KCs
However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions'
kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry
If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot',
kernelmanagerd will require the kext to be installed in the Boot/SysKC
Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed
in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary'
Parameters:
install_file (str): Kext file name
source_folder_path (str): Source folder path
install_patch_directory (str): Patch directory
destination_folder_path (str): Destination folder path
Returns:
str: Updated destination folder path
"""
if self.skip_root_kmutil_requirement is False:
return destination_folder_path
if not install_file.endswith(".kext"):
return destination_folder_path
if install_patch_directory != "/System/Library/Extensions":
return destination_folder_path
if self.detected_os < os_data.os_data.ventura:
return destination_folder_path
updated_install_location = str(self.mount_location_data) + "/Library/Extensions"
logging.info(f" - Adding AuxKC support to {install_file}")
plist_path = Path(Path(source_folder_path) / Path(install_file) / Path("Contents/Info.plist"))
plist_data = plistlib.load((plist_path).open("rb"))
# Check if we need to update the 'OSBundleRequired' entry
if not plist_data["CFBundleIdentifier"].startswith("com.apple."):
return updated_install_location
if "OSBundleRequired" in plist_data:
if plist_data["OSBundleRequired"] == "Auxiliary":
return updated_install_location
plist_data["OSBundleRequired"] = "Auxiliary"
plistlib.dump(plist_data, plist_path.open("wb"))
return updated_install_location
def clean_auxiliary_kc(self) -> None:
"""
Clean the Auxiliary Kernel Collection
Logic:
When reverting root volume patches, the AuxKC will still retain the UUID
it was built against. Thus when Boot/SysKC are reverted, Aux will break
To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC
We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file
"""
if self.detected_os < os_data.os_data.big_sur:
return
logging.info("- Cleaning Auxiliary Kernel Collection")
oclp_path = "/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist"
if Path(oclp_path).exists():
oclp_plist_data = plistlib.load(Path(oclp_path).open("rb"))
for key in oclp_plist_data:
if isinstance(oclp_plist_data[key], (bool, int)):
continue
for install_type in ["Install", "Install Non-Root"]:
if install_type not in oclp_plist_data[key]:
continue
for location in oclp_plist_data[key][install_type]:
if not location.endswith("Extensions"):
continue
for file in oclp_plist_data[key][install_type][location]:
if not file.endswith(".kext"):
continue
if not Path(f"/Library/Extensions/{file}").exists():
continue
logging.info(f" - Removing {file}")
subprocess_wrapper.run_as_root(["/bin/rm", "-Rf", f"/Library/Extensions/{file}"])
# Handle situations where users migrated from older OSes with a lot of garbage in /L*/E*
# ex. Nvidia Web Drivers, NetUSB, dosdude1's patches, etc.
# Move if file's age is older than October 2021 (year before Ventura)
if self.detected_os < os_data.os_data.ventura:
return
relocation_path = "/Library/Relocated Extensions"
if not Path(relocation_path).exists():
subprocess_wrapper.run_as_root(["/bin/mkdir", relocation_path])
for file in Path("/Library/Extensions").glob("*.kext"):
try:
if datetime.fromtimestamp(file.stat().st_mtime) < datetime(2021, 10, 1):
logging.info(f" - Relocating {file.name} kext to {relocation_path}")
if Path(relocation_path) / Path(file.name).exists():
subprocess_wrapper.run_as_root(["/bin/rm", "-Rf", relocation_path / Path(file.name)])
subprocess_wrapper.run_as_root(["/bin/mv", file, relocation_path])
except:
# Some users have the most cursed /L*/E* folders
# ex. Symlinks pointing to symlinks pointing to dead files
pass

View File

@@ -0,0 +1,32 @@
"""
mkext.py: MKext cache management
"""
import logging
import subprocess
from ..base.cache import BaseKernelCache
from ....support import subprocess_wrapper
class MKext(BaseKernelCache):
def __init__(self, mount_location: str) -> None:
self.mount_location = mount_location
def _mkext_arguments(self) -> list[str]:
args = ["/usr/bin/touch", f"{self.mount_location}/System/Library/Extensions"]
return args
def rebuild(self) -> None:
logging.info("- Rebuilding MKext cache")
result = subprocess_wrapper.run_as_root(self._mkext_arguments(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
subprocess_wrapper.log(result)
return False
return True

View File

@@ -0,0 +1,48 @@
"""
prelinked.py: Prelinked Kernel cache management
"""
import logging
import subprocess
from pathlib import Path
from ..base.cache import BaseKernelCache
from ....support import subprocess_wrapper
class PrelinkedKernel(BaseKernelCache):
def __init__(self, mount_location: str) -> None:
self.mount_location = mount_location
def _kextcache_arguments(self) -> list[str]:
args = ["/usr/sbin/kextcache", "-invalidate", f"{self.mount_location}/"]
return args
def _update_preboot_kernel_cache(self) -> bool:
"""
Ensure Preboot volume's kernel cache is updated
"""
if not Path("/usr/sbin/kcditto").exists():
return
logging.info("- Syncing Kernel Cache to Preboot")
subprocess_wrapper.run_as_root_and_verify(["/usr/sbin/kcditto"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def rebuild(self) -> None:
logging.info("- Rebuilding Prelinked Kernel")
result = subprocess_wrapper.run_as_root(self._kextcache_arguments(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# kextcache notes:
# - kextcache always returns 0, even if it fails
# - Check the output for 'KernelCache ID' to see if the cache was successfully rebuilt
if "KernelCache ID" not in result.stdout.decode():
subprocess_wrapper.log(result)
return False
self._update_preboot_kernel_cache()
return True

View File

@@ -0,0 +1,51 @@
"""
rebuild.py: Manage kernel cache rebuilding regardless of macOS version
"""
from .base.cache import BaseKernelCache
from ...datasets import os_data
class RebuildKernelCache:
"""
RebuildKernelCache: Rebuild the kernel cache
Parameters:
- os_version: macOS version
- mount_location: Path to the mounted volume
- auxiliary_cache: Whether to create auxiliary kernel cache (Big Sur and later)
- auxiliary_cache_only: Whether to only create auxiliary kernel cache (Ventura and later)
"""
def __init__(self, os_version: os_data.os_data, mount_location: str, auxiliary_cache: bool, auxiliary_cache_only: bool) -> None:
self.os_version = os_version
self.mount_location = mount_location
self.auxiliary_cache = auxiliary_cache
self.auxiliary_cache_only = auxiliary_cache_only
def _rebuild_method(self) -> BaseKernelCache:
"""
Determine the correct method to rebuild the kernel cache
"""
if self.os_version >= os_data.os_data.big_sur:
if self.os_version >= os_data.os_data.ventura:
if self.auxiliary_cache_only:
from .kernel_collection.auxiliary import AuxiliaryKernelCollection
return AuxiliaryKernelCollection(self.mount_location)
from .kernel_collection.boot_system import BootSystemKernelCollections
return BootSystemKernelCollections(self.mount_location, self.os_version, self.auxiliary_cache)
if os_data.os_data.catalina >= self.os_version >= os_data.os_data.lion:
from .prelinked.prelinked import PrelinkedKernel
return PrelinkedKernel(self.mount_location)
from .mkext.mkext import MKext
return MKext(self.mount_location)
def rebuild(self) -> bool:
"""
Rebuild the kernel cache
"""
return self._rebuild_method().rebuild()

View File

@@ -0,0 +1,16 @@
"""
mount: Library for mounting and unmounting the root volume and interacting with APFS snapshots.
Usage:
>>> from mount import RootVolumeMount
>>> RootVolumeMount(xnu_major).mount()
'/System/Volumes/Update/mnt1'
>>> RootVolumeMount(xnu_major).unmount()
>>> RootVolumeMount(xnu_major).create_snapshot()
>>> RootVolumeMount(xnu_major).revert_snapshot()
"""
from .mount import RootVolumeMount
from .snapshot import APFSSnapshot

View File

@@ -1,62 +1,28 @@
"""
sys_patch_mount.py: Handling macOS root volume mounting and unmounting,
as well as APFS snapshots for Big Sur and newer
mount.py: Handling macOS root volume mounting and unmounting
"""
import logging
import plistlib
import platform
import subprocess
from pathlib import Path
from ..datasets import os_data
from ..support import subprocess_wrapper
from .snapshot import APFSSnapshot
from ...datasets import os_data
from ...support import subprocess_wrapper
class SysPatchMount:
class RootVolumeMount:
def __init__(self, xnu_major: int, rosetta_status: bool) -> None:
def __init__(self, xnu_major: int) -> None:
self.xnu_major = xnu_major
self.rosetta_status = rosetta_status
self.root_volume_identifier = self._fetch_root_volume_identifier()
self.mount_path = None
def mount(self) -> str:
"""
Mount the root volume.
Returns the path to the root volume.
If none, failed to mount.
"""
result = self._mount_root_volume()
if result is None:
logging.error("Failed to mount root volume")
return None
if not Path(result).exists():
logging.error(f"Attempted to mount root volume, but failed: {result}")
return None
self.mount_path = result
return result
def unmount(self, ignore_errors: bool = True) -> bool:
"""
Unmount the root volume.
Returns True if successful, False otherwise.
Note for Big Sur and newer, a snapshot is created before unmounting.
And that unmounting is not critical to the process.
"""
return self._unmount_root_volume(ignore_errors=ignore_errors)
def _fetch_root_volume_identifier(self) -> str:
"""
Resolve path to disk identifier
@@ -136,43 +102,48 @@ class SysPatchMount:
return True
def mount(self) -> str:
"""
Mount the root volume.
Returns the path to the root volume.
If none, failed to mount.
"""
result = self._mount_root_volume()
if result is None:
logging.error("Failed to mount root volume")
return None
if not Path(result).exists():
logging.error(f"Attempted to mount root volume, but failed: {result}")
return None
self.mount_path = result
return result
def unmount(self, ignore_errors: bool = True) -> bool:
"""
Unmount the root volume.
Returns True if successful, False otherwise.
Note for Big Sur and newer, a snapshot is created before unmounting.
And that unmounting is not critical to the process.
"""
return self._unmount_root_volume(ignore_errors=ignore_errors)
def create_snapshot(self) -> bool:
"""
Create APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
args = ["/usr/sbin/bless"]
if platform.machine() == "arm64" or self.rosetta_status is True:
args += ["--mount", self.mount_path, "--create-snapshot"]
else:
args += ["--folder", f"{self.mount_path}/System/Library/CoreServices", "--bootefi", "--create-snapshot"]
result = subprocess_wrapper.run_as_root(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to create APFS snapshot")
subprocess_wrapper.log(result)
if "Can't use last-sealed-snapshot or create-snapshot on non system volume" in result.stdout.decode():
logging.info("- This is an APFS bug with Monterey and newer! Perform a clean installation to ensure your APFS volume is built correctly")
return False
return True
return APFSSnapshot(self.xnu_major, self.mount_path).create_snapshot()
def revert_snapshot(self) -> bool:
"""
Revert APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
result = subprocess_wrapper.run_as_root(["/usr/sbin/bless", "--mount", self.mount_path, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to revert APFS snapshot")
subprocess_wrapper.log(result)
return False
return True
return APFSSnapshot(self.xnu_major, self.mount_path).revert_snapshot()

View File

@@ -0,0 +1,69 @@
"""
snapshot.py: Handling APFS snapshots
"""
import logging
import platform
import subprocess
from ...datasets import os_data
from ...support import subprocess_wrapper
class APFSSnapshot:
def __init__(self, xnu_major: int, mount_path: str):
self.xnu_major = xnu_major
self.mount_path = mount_path
def _rosetta_status(self) -> bool:
"""
Check if currently running inside of Rosetta
"""
result = subprocess_wrapper.run(["/usr/sbin/sysctl", "-n", "sysctl.proc_translated"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
return False
return True if result.stdout.decode().strip() == "1" else False
def create_snapshot(self) -> bool:
"""
Create APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
args = ["/usr/sbin/bless"]
if platform.machine() == "arm64" or self._rosetta_status() is True:
args += ["--mount", self.mount_path, "--create-snapshot"]
else:
args += ["--folder", f"{self.mount_path}/System/Library/CoreServices", "--bootefi", "--create-snapshot"]
result = subprocess_wrapper.run_as_root(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to create APFS snapshot")
subprocess_wrapper.log(result)
if "Can't use last-sealed-snapshot or create-snapshot on non system volume" in result.stdout.decode():
logging.info("- This is an APFS bug with Monterey and newer! Perform a clean installation to ensure your APFS volume is built correctly")
return False
return True
def revert_snapshot(self) -> bool:
"""
Revert APFS snapshot of the root volume.
"""
if self.xnu_major < os_data.os_data.big_sur.value:
return True
result = subprocess_wrapper.run_as_root(["/usr/sbin/bless", "--mount", self.mount_path, "--bootefi", "--last-sealed-snapshot"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.error("Failed to revert APFS snapshot")
subprocess_wrapper.log(result)
return False
return True

View File

@@ -38,10 +38,19 @@ This is because Apple removed on-disk binaries (ref: https://github.com/dortania
import logging
import plistlib
import subprocess
import applescript
from pathlib import Path
from datetime import datetime
from .mount import (
RootVolumeMount,
APFSSnapshot
)
from .utilities import (
install_new_file,
remove_file,
PatcherSupportPkgMount,
KernelDebugKitMerge
)
from .. import constants
@@ -50,16 +59,14 @@ from ..volume import generate_copy_arguments
from ..support import (
utilities,
kdk_handler,
subprocess_wrapper
)
from . import (
sys_patch_detect,
sys_patch_auto,
sys_patch_helpers,
sys_patch_generate,
sys_patch_mount
kernelcache
)
from .auto_patcher import InstallAutomaticPatchingServices
from .detections import DetectRootPatch, GenerateRootPatchSets
class PatchSysVolume:
@@ -77,33 +84,24 @@ class PatchSysVolume:
# GUI will detect hardware patches before starting PatchSysVolume()
# However the TUI will not, so allow for data to be passed in manually avoiding multiple calls
if hardware_details is None:
hardware_details = sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).detect_patch_set()
hardware_details = DetectRootPatch(self.computer.real_model, self.constants).detect_patch_set()
self.hardware_details = hardware_details
self._init_pathing(custom_root_mount_path=None, custom_data_mount_path=None)
self._init_pathing()
self.skip_root_kmutil_requirement = self.hardware_details["Settings: Supports Auxiliary Cache"]
self.mount_obj = sys_patch_mount.SysPatchMount(self.constants.detected_os, self.computer.rosetta_active)
self.mount_obj = RootVolumeMount(self.constants.detected_os)
def _init_pathing(self, custom_root_mount_path: Path = None, custom_data_mount_path: Path = None) -> None:
def _init_pathing(self) -> None:
"""
Initializes the pathing for root volume patching
Parameters:
custom_root_mount_path (Path): Custom path to mount the root volume
custom_data_mount_path (Path): Custom path to mount the data volume
"""
if custom_root_mount_path and custom_data_mount_path:
self.mount_location = custom_root_mount_path
self.data_mount_location = custom_data_mount_path
elif self.root_supports_snapshot is True:
# Big Sur and newer use APFS snapshots
self.mount_location_data = ""
if self.root_supports_snapshot is True:
self.mount_location = "/System/Volumes/Update/mnt1"
self.mount_location_data = ""
else:
self.mount_location = ""
self.mount_location_data = ""
self.mount_extensions = f"{self.mount_location}/System/Library/Extensions"
self.mount_application_support = f"{self.mount_location_data}/Library/Application Support"
@@ -163,114 +161,30 @@ class PatchSysVolume:
save_hid_cs (bool): If True, will save the HID CS file before merging KDK
Required for USB 1.1 downgrades on Ventura and newer
"""
if self.skip_root_kmutil_requirement is True:
return
if self.constants.detected_os < os_data.os_data.ventura:
return
if self.constants.kdk_download_path.exists():
if kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path) is False:
logging.info("Failed to install KDK")
raise Exception("Failed to install KDK")
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
kdk_download_obj = kdk_obj.retrieve_download()
if not kdk_download_obj:
logging.info(f"Could not retrieve KDK: {kdk_obj.error_msg}")
# Hold thread until download is complete
kdk_download_obj.download(spawn_thread=False)
if kdk_download_obj.download_complete is False:
error_msg = kdk_download_obj.error_msg
logging.info(f"Could not download KDK: {error_msg}")
raise Exception(f"Could not download KDK: {error_msg}")
if kdk_obj.validate_kdk_checksum() is False:
logging.info(f"KDK checksum validation failed: {kdk_obj.error_msg}")
raise Exception(f"KDK checksum validation failed: {kdk_obj.error_msg}")
kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path)
# re-init kdk_obj to get the new kdk_installed_path
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
# We shouldn't get here, but just in case
logging.warning(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
raise Exception(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
kdk_path = Path(kdk_obj.kdk_installed_path) if kdk_obj.kdk_installed_path != "" else None
oclp_plist = Path("/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist")
if (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists() and oclp_plist.exists():
# KDK was already merged, check if the KDK used is the same as the one we're using
# If not, we'll rsync over with the new KDK
try:
oclp_plist_data = plistlib.load(open(oclp_plist, "rb"))
if "Kernel Debug Kit Used" in oclp_plist_data:
if oclp_plist_data["Kernel Debug Kit Used"] == str(kdk_path):
logging.info("- Matching KDK determined to already be merged, skipping")
return
except:
pass
if kdk_path is None:
logging.info(f"- Unable to find Kernel Debug Kit")
raise Exception("Unable to find Kernel Debug Kit")
self.kdk_path = kdk_path
logging.info(f"- Found KDK at: {kdk_path}")
# Due to some IOHIDFamily oddities, we need to ensure their CodeSignature is retained
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if save_hid_cs is True and cs_path.exists():
logging.info("- Backing up IOHIDEventDriver CodeSignature")
# Note it's a folder, not a file
subprocess_wrapper.run_as_root(generate_copy_arguments(cs_path, f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
logging.info(f"- Merging KDK with Root Volume: {kdk_path.name}")
subprocess_wrapper.run_as_root(
# Only merge '/System/Library/Extensions'
# 'Kernels' and 'KernelSupport' is wasted space for root patching (we don't care above dev kernels)
["/usr/bin/rsync", "-r", "-i", "-a", f"{kdk_path}/System/Library/Extensions/", f"{self.mount_location}/System/Library/Extensions"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
# During reversing, we found that kmutil uses this path to determine whether the KDK was successfully merged
# Best to verify now before we cause any damage
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
logging.info("- Failed to merge KDK with Root Volume")
raise Exception("Failed to merge KDK with Root Volume")
logging.info("- Successfully merged KDK with Root Volume")
# Restore IOHIDEventDriver CodeSignature
if save_hid_cs is True and Path(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak").exists():
logging.info("- Restoring IOHIDEventDriver CodeSignature")
if not cs_path.exists():
logging.info(" - CodeSignature folder missing, creating")
subprocess_wrapper.run_as_root(["/bin/mkdir", "-p", cs_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(generate_copy_arguments(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak", cs_path), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(["/bin/rm", "-rf", f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self.kdk_path = KernelDebugKitMerge(
self.constants,
self.mount_location,
self.skip_root_kmutil_requirement
).merge(save_hid_cs)
def _unpatch_root_vol(self):
"""
Reverts APFS snapshot and cleans up any changes made to the root and data volume
"""
if self.mount_obj.revert_snapshot() is False:
if APFSSnapshot(self.constants.detected_os, self.mount_location).revert_snapshot() is False:
return
self._clean_skylight_plugins()
self._delete_nonmetal_enforcement()
self._clean_auxiliary_kc()
kernelcache.KernelCacheSupport(
mount_location_data=self.mount_location_data,
detected_os=self.constants.detected_os,
skip_root_kmutil_requirement=self.skip_root_kmutil_requirement
).clean_auxiliary_kc()
self.constants.root_patcher_succeeded = True
logging.info("- Unpatching complete")
logging.info("\nPlease reboot the machine for patches to take effect")
@@ -287,120 +201,46 @@ class PatchSysVolume:
Returns:
bool: True if successful, False if not
"""
if self._rebuild_kernel_collection() is True:
self._update_preboot_kernel_cache()
self._rebuild_dyld_shared_cache()
if self._create_new_apfs_snapshot() is True:
self._unmount_root_vol()
logging.info("- Patching complete")
logging.info("\nPlease reboot the machine for patches to take effect")
if self.needs_kmutil_exemptions is True:
logging.info("Note: Apple will require you to open System Preferences -> Security to allow the new kernel extensions to be loaded")
self.constants.root_patcher_succeeded = True
return True
return False
def _rebuild_kernel_collection(self) -> bool:
"""
Rebuilds the Kernel Collection
Supports following KC generation:
- Boot/SysKC (11.0+)
- AuxKC (11.0+)
- PrelinkedKernel (10.15-)
Returns:
bool: True if successful, False if not
"""
logging.info("- Rebuilding Kernel Cache (This may take some time)")
if self.constants.detected_os > os_data.os_data.catalina:
# Base Arguments
args = ["/usr/bin/kmutil", "install"]
if self.skip_root_kmutil_requirement is True:
# Only rebuild the Auxiliary Kernel Collection
args.append("--new")
args.append("aux")
args.append("--boot-path")
args.append(f"{self.mount_location}/System/Library/KernelCollections/BootKernelExtensions.kc")
args.append("--system-path")
args.append(f"{self.mount_location}/System/Library/KernelCollections/SystemKernelExtensions.kc")
else:
# Rebuild Boot, System and Auxiliary Kernel Collections
args.append("--volume-root")
args.append(self.mount_location)
# Build Boot, Sys and Aux KC
args.append("--update-all")
# If multiple kernels found, only build release KCs
args.append("--variant-suffix")
args.append("release")
if self.constants.detected_os >= os_data.os_data.ventura:
# With Ventura, we're required to provide a KDK in some form
# to rebuild the Kernel Cache
#
# However since we already merged the KDK onto root with 'ditto',
# We can add '--allow-missing-kdk' to skip parsing the KDK
#
# This allows us to only delete/overwrite kexts inside of
# /System/Library/Extensions and not the entire KDK
args.append("--allow-missing-kdk")
# 'install' and '--update-all' cannot be used together in Ventura.
# kmutil will request the usage of 'create' instead:
# Warning: kmutil install's usage of --update-all is deprecated.
# Use kmutil create --update-install instead'
args[1] = "create"
if self.needs_kmutil_exemptions is True:
# When installing to '/Library/Extensions', following args skip kext consent
# prompt in System Preferences when SIP's disabled
logging.info(" (You will get a prompt by System Preferences, ignore for now)")
args.append("--no-authentication")
args.append("--no-authorization")
else:
args = ["/usr/sbin/kextcache", "-i", f"{self.mount_location}/"]
result = subprocess_wrapper.run_as_root(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# kextcache notes:
# - kextcache always returns 0, even if it fails
# - Check the output for 'KernelCache ID' to see if the cache was successfully rebuilt
# kmutil notes:
# - will return 71 on failure to build KCs
# - will return 31 on 'No binaries or codeless kexts were provided'
# - will return -10 if the volume is missing (ie. unmounted by another process)
if result.returncode != 0 or (self.constants.detected_os < os_data.os_data.catalina and "KernelCache ID" not in result.stdout.decode()):
logging.info("- Unable to build new kernel cache")
subprocess_wrapper.log(result)
logging.info("")
logging.info("\nPlease reboot the machine to avoid potential issues rerunning the patcher")
if self._rebuild_kernel_cache() is False:
return False
if self.skip_root_kmutil_requirement is True:
# Force rebuild the Auxiliary KC
result = subprocess_wrapper.run_as_root(["/usr/bin/killall", "syspolicyd", "kernelmanagerd"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("- Unable to remove kernel extension policy files")
subprocess_wrapper.log(result)
logging.info("")
logging.info("\nPlease reboot the machine to avoid potential issues rerunning the patcher")
return False
self._update_preboot_kernel_cache()
self._rebuild_dyld_shared_cache()
for file in ["KextPolicy", "KextPolicy-shm", "KextPolicy-wal"]:
self._remove_file("/private/var/db/SystemPolicyConfiguration/", file)
else:
# Install RSRHelper utility to handle desynced KCs
if self._create_new_apfs_snapshot() is False:
return False
self._unmount_root_vol()
logging.info("- Patching complete")
logging.info("\nPlease reboot the machine for patches to take effect")
if self.needs_kmutil_exemptions is True:
logging.info("Note: Apple will require you to open System Preferences -> Security to allow the new kernel extensions to be loaded")
self.constants.root_patcher_succeeded = True
return True
def _rebuild_kernel_cache(self) -> bool:
"""
Rebuilds the Kernel Cache
"""
result = kernelcache.RebuildKernelCache(
os_version=self.constants.detected_os,
mount_location=self.mount_location,
auxiliary_cache=self.needs_kmutil_exemptions,
auxiliary_cache_only=self.skip_root_kmutil_requirement
).rebuild()
if result is False:
return False
if self.skip_root_kmutil_requirement is False:
sys_patch_helpers.SysPatchHelpers(self.constants).install_rsr_repair_binary()
logging.info("- Successfully built new kernel cache")
return True
@@ -411,7 +251,7 @@ class PatchSysVolume:
Returns:
bool: True if snapshot was created, False if not
"""
return self.mount_obj.create_snapshot()
return APFSSnapshot(self.constants.detected_os, self.mount_location).create_snapshot()
def _rebuild_dyld_shared_cache(self) -> None:
@@ -464,61 +304,6 @@ class PatchSysVolume:
subprocess_wrapper.run_as_root(["/usr/bin/defaults", "delete", "/Library/Preferences/com.apple.CoreDisplay", arg])
def _clean_auxiliary_kc(self) -> None:
"""
Clean the Auxiliary Kernel Collection
Logic:
When reverting root volume patches, the AuxKC will still retain the UUID
it was built against. Thus when Boot/SysKC are reverted, Aux will break
To resolve this, delete all installed kexts in /L*/E* and rebuild the AuxKC
We can verify our binaries based off the OpenCore-Legacy-Patcher.plist file
"""
if self.constants.detected_os < os_data.os_data.big_sur:
return
logging.info("- Cleaning Auxiliary Kernel Collection")
oclp_path = "/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist"
if Path(oclp_path).exists():
oclp_plist_data = plistlib.load(Path(oclp_path).open("rb"))
for key in oclp_plist_data:
if isinstance(oclp_plist_data[key], (bool, int)):
continue
for install_type in ["Install", "Install Non-Root"]:
if install_type not in oclp_plist_data[key]:
continue
for location in oclp_plist_data[key][install_type]:
if not location.endswith("Extensions"):
continue
for file in oclp_plist_data[key][install_type][location]:
if not file.endswith(".kext"):
continue
self._remove_file("/Library/Extensions", file)
# Handle situations where users migrated from older OSes with a lot of garbage in /L*/E*
# ex. Nvidia Web Drivers, NetUSB, dosdude1's patches, etc.
# Move if file's age is older than October 2021 (year before Ventura)
if self.constants.detected_os < os_data.os_data.ventura:
return
relocation_path = "/Library/Relocated Extensions"
if not Path(relocation_path).exists():
subprocess_wrapper.run_as_root(["/bin/mkdir", relocation_path])
for file in Path("/Library/Extensions").glob("*.kext"):
try:
if datetime.fromtimestamp(file.stat().st_mtime) < datetime(2021, 10, 1):
logging.info(f" - Relocating {file.name} kext to {relocation_path}")
if Path(relocation_path) / Path(file.name).exists():
subprocess_wrapper.run_as_root(["/bin/rm", "-Rf", relocation_path / Path(file.name)])
subprocess_wrapper.run_as_root(["/bin/mv", file, relocation_path])
except:
# Some users have the most cursed /L*/E* folders
# ex. Symlinks pointing to symlinks pointing to dead files
pass
def _write_patchset(self, patchset: dict) -> None:
"""
Write patchset information to Root Volume
@@ -537,93 +322,6 @@ class PatchSysVolume:
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{self.constants.payload_path}/{file_name}", destination_path), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _add_auxkc_support(self, install_file: str, source_folder_path: str, install_patch_directory: str, destination_folder_path: str) -> str:
"""
Patch provided Kext to support Auxiliary Kernel Collection
Logic:
In macOS Ventura, KDKs are required to build new Boot and System KCs
However for some patch sets, we're able to use the Auxiliary KCs with '/Library/Extensions'
kernelmanagerd determines which kext is installed by their 'OSBundleRequired' entry
If a kext is labeled as 'OSBundleRequired: Root' or 'OSBundleRequired: Safe Boot',
kernelmanagerd will require the kext to be installed in the Boot/SysKC
Additionally, kexts starting with 'com.apple.' are not natively allowed to be installed
in the AuxKC. So we need to explicitly set our 'OSBundleRequired' to 'Auxiliary'
Parameters:
install_file (str): Kext file name
source_folder_path (str): Source folder path
install_patch_directory (str): Patch directory
destination_folder_path (str): Destination folder path
Returns:
str: Updated destination folder path
"""
if self.skip_root_kmutil_requirement is False:
return destination_folder_path
if not install_file.endswith(".kext"):
return destination_folder_path
if install_patch_directory != "/System/Library/Extensions":
return destination_folder_path
if self.constants.detected_os < os_data.os_data.ventura:
return destination_folder_path
updated_install_location = str(self.mount_location_data) + "/Library/Extensions"
logging.info(f" - Adding AuxKC support to {install_file}")
plist_path = Path(Path(source_folder_path) / Path(install_file) / Path("Contents/Info.plist"))
plist_data = plistlib.load((plist_path).open("rb"))
# Check if we need to update the 'OSBundleRequired' entry
if not plist_data["CFBundleIdentifier"].startswith("com.apple."):
return updated_install_location
if "OSBundleRequired" in plist_data:
if plist_data["OSBundleRequired"] == "Auxiliary":
return updated_install_location
plist_data["OSBundleRequired"] = "Auxiliary"
plistlib.dump(plist_data, plist_path.open("wb"))
self._check_kexts_needs_authentication(install_file)
return updated_install_location
def _check_kexts_needs_authentication(self, kext_name: str):
"""
Verify whether the user needs to authenticate in System Preferences
Sets 'needs_to_open_preferences' to True if the kext is not in the AuxKC
Logic:
Under 'private/var/db/KernelManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist'
["kextsToBuild"][i]:
["bundlePathMainOS"] = /Library/Extensions/Test.kext
["cdHash"] = Bundle's CDHash (random on ad-hoc signed, static on dev signed)
["teamID"] = Team ID (blank on ad-hoc signed)
To grab the CDHash of a kext, run 'codesign -dvvv <kext_path>'
Parameters:
kext_name (str): Name of the kext to check
"""
try:
aux_cache_path = Path(self.mount_location_data) / Path("/private/var/db/KernelExtensionManagement/AuxKC/CurrentAuxKC/com.apple.kcgen.instructions.plist")
if aux_cache_path.exists():
aux_cache_data = plistlib.load((aux_cache_path).open("rb"))
for kext in aux_cache_data["kextsToBuild"]:
if "bundlePathMainOS" in aux_cache_data["kextsToBuild"][kext]:
if aux_cache_data["kextsToBuild"][kext]["bundlePathMainOS"] == f"/Library/Extensions/{kext_name}":
return
except PermissionError:
pass
logging.info(f" - {kext_name} requires authentication in System Preferences")
self.constants.needs_to_open_preferences = True # Notify in GUI to open System Preferences
def _patch_root_vol(self):
"""
Patch root volume
@@ -633,13 +331,13 @@ class PatchSysVolume:
if self.patch_set_dictionary != {}:
self._execute_patchset(self.patch_set_dictionary)
else:
self._execute_patchset(sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset)
self._execute_patchset(GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset)
if self.constants.wxpython_variant is True and self.constants.detected_os >= os_data.os_data.big_sur:
needs_daemon = False
if self.constants.detected_os >= os_data.os_data.ventura and self.skip_root_kmutil_requirement is False:
needs_daemon = True
sys_patch_auto.AutomaticSysPatch(self.constants).install_auto_patcher_launch_agent(kdk_caching_needed=needs_daemon)
InstallAutomaticPatchingServices(self.constants).install_auto_patcher_launch_agent(kdk_caching_needed=needs_daemon)
self._rebuild_root_volume()
@@ -652,6 +350,12 @@ class PatchSysVolume:
required_patches (dict): Patchset to execute (generated by sys_patch_generate.GenerateRootPatchSets)
"""
kc_support_obj = kernelcache.KernelCacheSupport(
mount_location_data=self.mount_location_data,
detected_os=self.constants.detected_os,
skip_root_kmutil_requirement=self.skip_root_kmutil_requirement
)
source_files_path = str(self.constants.payload_local_binaries_root_path)
self._preflight_checks(required_patches, source_files_path)
for patch in required_patches:
@@ -665,35 +369,42 @@ class PatchSysVolume:
destination_folder_path = str(self.mount_location) + remove_patch_directory
else:
destination_folder_path = str(self.mount_location_data) + remove_patch_directory
self._remove_file(destination_folder_path, remove_patch_file)
remove_file(destination_folder_path, remove_patch_file)
for method_install in ["Install", "Install Non-Root"]:
if method_install in required_patches[patch]:
for install_patch_directory in list(required_patches[patch][method_install]):
logging.info(f"- Handling Installs in: {install_patch_directory}")
for install_file in list(required_patches[patch][method_install][install_patch_directory]):
source_folder_path = source_files_path + "/" + required_patches[patch][method_install][install_patch_directory][install_file] + install_patch_directory
if method_install == "Install":
destination_folder_path = str(self.mount_location) + install_patch_directory
else:
if install_patch_directory == "/Library/Extensions":
self.needs_kmutil_exemptions = True
self._check_kexts_needs_authentication(install_file)
destination_folder_path = str(self.mount_location_data) + install_patch_directory
if method_install not in required_patches[patch]:
continue
updated_destination_folder_path = self._add_auxkc_support(install_file, source_folder_path, install_patch_directory, destination_folder_path)
for install_patch_directory in list(required_patches[patch][method_install]):
logging.info(f"- Handling Installs in: {install_patch_directory}")
for install_file in list(required_patches[patch][method_install][install_patch_directory]):
source_folder_path = source_files_path + "/" + required_patches[patch][method_install][install_patch_directory][install_file] + install_patch_directory
if method_install == "Install":
destination_folder_path = str(self.mount_location) + install_patch_directory
else:
if install_patch_directory == "/Library/Extensions":
self.needs_kmutil_exemptions = True
if kc_support_obj.check_kexts_needs_authentication(install_file) is True:
self.constants.needs_to_open_preferences = True
if destination_folder_path != updated_destination_folder_path:
# Update required_patches to reflect the new destination folder path
if updated_destination_folder_path not in required_patches[patch][method_install]:
required_patches[patch][method_install].update({updated_destination_folder_path: {}})
required_patches[patch][method_install][updated_destination_folder_path].update({install_file: required_patches[patch][method_install][install_patch_directory][install_file]})
required_patches[patch][method_install][install_patch_directory].pop(install_file)
destination_folder_path = str(self.mount_location_data) + install_patch_directory
destination_folder_path = updated_destination_folder_path
updated_destination_folder_path = kc_support_obj.add_auxkc_support(install_file, source_folder_path, install_patch_directory, destination_folder_path)
self._install_new_file(source_folder_path, destination_folder_path, install_file)
if kc_support_obj.check_kexts_needs_authentication(install_file) is True:
self.constants.needs_to_open_preferences = True
if destination_folder_path != updated_destination_folder_path:
# Update required_patches to reflect the new destination folder path
if updated_destination_folder_path not in required_patches[patch][method_install]:
required_patches[patch][method_install].update({updated_destination_folder_path: {}})
required_patches[patch][method_install][updated_destination_folder_path].update({install_file: required_patches[patch][method_install][install_patch_directory][install_file]})
required_patches[patch][method_install][install_patch_directory].pop(install_file)
destination_folder_path = updated_destination_folder_path
install_new_file(source_folder_path, destination_folder_path, install_file)
if "Processes" in required_patches[patch]:
for process in required_patches[patch]["Processes"]:
@@ -705,6 +416,7 @@ class PatchSysVolume:
else:
logging.info(f"- Running Process:\n{process}")
subprocess_wrapper.run_and_verify(process, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
if any(x in required_patches for x in ["AMD Legacy GCN", "AMD Legacy Polaris", "AMD Legacy Vega"]):
sys_patch_helpers.SysPatchHelpers(self.constants).disable_window_server_caching()
if "Metal 3802 Common Extended" in required_patches:
@@ -729,7 +441,11 @@ class PatchSysVolume:
# Make sure non-Metal Enforcement preferences are not present
self._delete_nonmetal_enforcement()
# Make sure we clean old kexts in /L*/E* that are not in the patchset
self._clean_auxiliary_kc()
kernelcache.KernelCacheSupport(
mount_location_data=self.mount_location_data,
detected_os=self.constants.detected_os,
skip_root_kmutil_requirement=self.skip_root_kmutil_requirement
).clean_auxiliary_kc()
# Make sure SNB kexts are compatible with the host
if "Intel Sandy Bridge" in required_patches:
@@ -738,12 +454,13 @@ class PatchSysVolume:
for patch in required_patches:
# Check if all files are present
for method_type in ["Install", "Install Non-Root"]:
if method_type in required_patches[patch]:
for install_patch_directory in required_patches[patch][method_type]:
for install_file in required_patches[patch][method_type][install_patch_directory]:
source_file = source_files_path + "/" + required_patches[patch][method_type][install_patch_directory][install_file] + install_patch_directory + "/" + install_file
if not Path(source_file).exists():
raise Exception(f"Failed to find {source_file}")
if method_type not in required_patches[patch]:
continue
for install_patch_directory in required_patches[patch][method_type]:
for install_file in required_patches[patch][method_type][install_patch_directory]:
source_file = source_files_path + "/" + required_patches[patch][method_type][install_patch_directory][install_file] + install_patch_directory + "/" + install_file
if not Path(source_file).exists():
raise Exception(f"Failed to find {source_file}")
# Ensure KDK is properly installed
self._merge_kdk_with_root(save_hid_cs=True if "Legacy USB 1.1" in required_patches else False)
@@ -751,188 +468,6 @@ class PatchSysVolume:
logging.info("- Finished Preflight, starting patching")
def _install_new_file(self, source_folder: Path, destination_folder: Path, file_name: str) -> None:
"""
Installs a new file to the destination folder
File handling logic:
- .frameworks are merged with the destination folder
- Other files are deleted and replaced (ex. .kexts, .apps)
Parameters:
source_folder (Path): Path to the source folder
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to install
"""
file_name_str = str(file_name)
if not Path(destination_folder).exists():
logging.info(f" - Skipping {file_name}, cannot locate {source_folder}")
return
if file_name_str.endswith(".framework"):
# merge with rsync
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root(["/usr/bin/rsync", "-r", "-i", "-a", f"{source_folder}/{file_name}", f"{destination_folder}/"], stdout=subprocess.PIPE)
self._fix_permissions(destination_folder + "/" + file_name)
elif Path(source_folder + "/" + file_name_str).is_dir():
# Applicable for .kext, .app, .plugin, .bundle, all of which are directories
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self._fix_permissions(destination_folder + "/" + file_name)
else:
# Assume it's an individual file, replace as normal
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self._fix_permissions(destination_folder + "/" + file_name)
def _remove_file(self, destination_folder: Path, file_name: str) -> None:
"""
Removes a file from the destination folder
Parameters:
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to remove
"""
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Removing: {file_name}")
if Path(destination_folder + "/" + file_name).is_dir():
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _fix_permissions(self, destination_file: Path) -> None:
"""
Fix file permissions for a given file or directory
"""
chmod_args = ["/bin/chmod", "-Rf", "755", destination_file]
chown_args = ["/usr/sbin/chown", "-Rf", "root:wheel", destination_file]
if not Path(destination_file).is_dir():
# Strip recursive arguments
chmod_args.pop(1)
chown_args.pop(1)
subprocess_wrapper.run_as_root_and_verify(chmod_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _check_files(self) -> bool:
"""
Check if all files are present (primarily PatcherSupportPkg resources)
Returns:
bool: True if all files are present, False otherwise
"""
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
if Path(self.constants.payload_local_binaries_root_path_dmg).exists():
logging.info("- Local PatcherSupportPkg resources available, mounting...")
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_local_binaries_root_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-nobrowse",
"-shadow", Path(self.constants.payload_path / Path("Universal-Binaries_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount Universal-Binaries.dmg")
subprocess_wrapper.log(output)
return False
logging.info("- Mounted Universal-Binaries.dmg")
if self.constants.cli_mode is False and Path(self.constants.overlay_psp_path_dmg).exists() and Path("~/.dortania_developer").expanduser().exists():
icon_path = str(self.constants.app_icon_path).replace("/", ":")[1:]
msg = "Welcome to the DortaniaInternal Program, please provided the decryption key to access internal resources. Press cancel to skip."
password = Path("~/.dortania_developer_key").expanduser().read_text().strip() if Path("~/.dortania_developer_key").expanduser().exists() else ""
for i in range(3):
try:
if password == "":
password = applescript.AppleScript(
f"""
set theResult to display dialog "{msg}" default answer "" with hidden answer with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
return the text returned of theResult
"""
).run()
result = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.overlay_psp_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("DortaniaInternal")),
"-nobrowse",
"-passphrase", password
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode == 0:
logging.info("- Mounted DortaniaInternal resources")
result = subprocess.run(
[
"/usr/bin/ditto", f"{self.constants.payload_path / Path('DortaniaInternal')}", f"{self.constants.payload_path / Path('Universal-Binaries')}"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode == 0:
return True
logging.info("- Failed to merge DortaniaInternal resources")
subprocess_wrapper.log(result)
return False
logging.info("- Failed to mount DortaniaInternal resources")
subprocess_wrapper.log(result)
if "Authentication error" not in result.stdout.decode():
try:
# Display that the disk image might be corrupted
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, please file an internal radar:\n\n{result.stdout.decode()}" with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
"""
).run()
return False
except Exception as e:
pass
break
msg = f"Decryption failed, please try again. {2 - i} attempts remaining. "
password = ""
if i == 2:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, too many incorrect passwords. If this continues with the correct decryption key, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{icon_path}"
"""
).run()
return False
except Exception as e:
break
return True
logging.info("- PatcherSupportPkg resources missing, Patcher likely corrupted!!!")
return False
# Entry Function
def start_patch(self):
"""
@@ -941,27 +476,33 @@ class PatchSysVolume:
logging.info("- Starting Patch Process")
logging.info(f"- Determining Required Patch set for Darwin {self.constants.detected_os}")
self.patch_set_dictionary = sys_patch_generate.GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset
self.patch_set_dictionary = GenerateRootPatchSets(self.computer.real_model, self.constants, self.hardware_details).patchset
if self.patch_set_dictionary == {}:
logging.info("- No Root Patches required for your machine!")
return
logging.info("- Verifying whether Root Patching possible")
if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=not self.constants.wxpython_variant) is False:
if DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=not self.constants.wxpython_variant) is False:
logging.error("- Cannot continue with patching!!!")
return
logging.info("- Patcher is capable of patching")
if self._check_files():
if self._mount_root_vol() is True:
if self._run_sanity_checks():
self._patch_root_vol()
else:
self._unmount_root_vol()
logging.info("- Please ensure that you do not have any updates pending")
else:
logging.info("- Recommend rebooting the machine and trying to patch again")
if PatcherSupportPkgMount(self.constants).mount() is False:
logging.error("- Critical resources missing, cannot continue with patching!!!")
return
if self._mount_root_vol() is False:
logging.error("- Failed to mount root volume, cannot continue with patching!!!")
return
if self._run_sanity_checks() is False:
self._unmount_root_vol()
logging.error("- Failed sanity checks, cannot continue with patching!!!")
logging.error("- Please ensure that you do not have any updates pending")
return
self._patch_root_vol()
def start_unpatch(self) -> None:
@@ -970,11 +511,12 @@ class PatchSysVolume:
"""
logging.info("- Starting Unpatch Process")
if sys_patch_detect.DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is False:
if DetectRootPatch(self.computer.real_model, self.constants).verify_patch_allowed(print_errors=True) is False:
logging.error("- Cannot continue with unpatching!!!")
return
if self._mount_root_vol() is True:
self._unpatch_root_vol()
else:
logging.info("- Recommend rebooting the machine and trying to patch again")
if self._mount_root_vol() is False:
logging.error("- Failed to mount root volume, cannot continue with unpatching!!!")
return
self._unpatch_root_vol()

View File

@@ -82,7 +82,7 @@ class SysPatchHelpers:
Generate patchset file for user reference
Parameters:
patchset (dict): Dictionary of patchset, see sys_patch_detect.py and sys_patch_dict.py
patchset (dict): Dictionary of patchset, see detect.py and sys_patch_dict.py
file_name (str): Name of the file to write to
kdk_used (Path): Path to the KDK used, if any

View File

@@ -0,0 +1,6 @@
"""
utilities: General utility functions for root volume patching
"""
from .files import install_new_file, remove_file, fix_permissions
from .dmg_mount import PatcherSupportPkgMount
from .kdk_merge import KernelDebugKitMerge

View File

@@ -0,0 +1,181 @@
"""
dmg_mount.py: PatcherSupportPkg DMG Mounting. Handles Universal-Binaries and DortaniaInternalResources DMGs.
"""
import logging
import subprocess
import applescript
from pathlib import Path
from ... import constants
from ...support import subprocess_wrapper
class PatcherSupportPkgMount:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self.icon_path = str(self.constants.app_icon_path).replace("/", ":")[1:]
def _mount_universal_binaries_dmg(self) -> bool:
"""
Mount PatcherSupportPkg's Universal-Binaries.dmg
"""
if not Path(self.constants.payload_local_binaries_root_path_dmg).exists():
logging.info("- PatcherSupportPkg resources missing, Patcher likely corrupted!!!")
return False
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_local_binaries_root_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-nobrowse",
"-shadow", Path(self.constants.payload_path / Path("Universal-Binaries_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount Universal-Binaries.dmg")
subprocess_wrapper.log(output)
return False
logging.info("- Mounted Universal-Binaries.dmg")
return True
def _mount_dortania_internal_resources_dmg(self) -> bool:
"""
Mount PatcherSupportPkg's DortaniaInternalResources.dmg (if available)
"""
if not Path(self.constants.overlay_psp_path_dmg).exists():
return True
if not Path("~/.dortania_developer").expanduser().exists():
return True
if self.constants.cli_mode is True:
return True
logging.info("- Found DortaniaInternal resources, mounting...")
for i in range(3):
key = self._request_decryption_key(i)
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.overlay_psp_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("DortaniaInternal")),
"-nobrowse",
"-passphrase", key
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("- Failed to mount DortaniaInternal resources")
subprocess_wrapper.log(output)
if "Authentication error" not in output.stdout.decode():
self._display_authentication_error()
if i == 2:
self._display_too_many_attempts()
return False
logging.info("- Mounted DortaniaInternal resources")
return self._merge_dortania_internal_resources()
def _merge_dortania_internal_resources(self) -> bool:
"""
Merge DortaniaInternal resources with Universal-Binaries
"""
result = subprocess.run(
[
"/usr/bin/ditto", f"{self.constants.payload_path / Path('DortaniaInternal')}", f"{self.constants.payload_path / Path('Universal-Binaries')}"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode != 0:
logging.info("- Failed to merge DortaniaInternal resources")
subprocess_wrapper.log(result)
return False
return True
def _request_decryption_key(self, attempt: int) -> str:
"""
Fetch the decryption key for DortaniaInternalResources.dmg
"""
# Only return on first attempt
if attempt == 0:
if Path("~/.dortania_developer_key").expanduser().exists():
return Path("~/.dortania_developer_key").expanduser().read_text().strip()
password = ""
msg = "Welcome to the DortaniaInternal Program, please provided the decryption key to access internal resources. Press cancel to skip."
if attempt > 0:
msg = f"Decryption failed, please try again. {2 - attempt} attempts remaining. "
try:
password = applescript.AppleScript(
f"""
set theResult to display dialog "{msg}" default answer "" with hidden answer with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
return the text returned of theResult
"""
).run()
except Exception as e:
pass
return password
def _display_authentication_error(self) -> None:
"""
Display authentication error dialog
"""
try:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
"""
).run()
except Exception as e:
pass
def _display_too_many_attempts(self) -> None:
"""
Display too many attempts dialog
"""
try:
applescript.AppleScript(
f"""
display dialog "Failed to mount DortaniaInternal resources, too many incorrect passwords. If this continues with the correct decryption key, please file an internal radar." with title "OpenCore Legacy Patcher" with icon file "{self.icon_path}"
"""
).run()
except Exception as e:
pass
def mount(self) -> bool:
"""
Mount PatcherSupportPkg resources
Returns:
bool: True if all resources are mounted, False otherwise
"""
# If already mounted, skip
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
if self._mount_universal_binaries_dmg() is False:
return False
if self._mount_dortania_internal_resources_dmg() is False:
return False
return True

View File

@@ -0,0 +1,88 @@
"""
utilities.py: Supporting functions for file handling during root volume patching
"""
import logging
import subprocess
from pathlib import Path
from ...volume import generate_copy_arguments
from ...support import subprocess_wrapper
def install_new_file(source_folder: Path, destination_folder: Path, file_name: str) -> None:
"""
Installs a new file to the destination folder
File handling logic:
- .frameworks are merged with the destination folder
- Other files are deleted and replaced (ex. .kexts, .apps)
Parameters:
source_folder (Path): Path to the source folder
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to install
"""
file_name_str = str(file_name)
if not Path(destination_folder).exists():
logging.info(f" - Skipping {file_name}, cannot locate {source_folder}")
return
if file_name_str.endswith(".framework"):
# merge with rsync
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root(["/usr/bin/rsync", "-r", "-i", "-a", f"{source_folder}/{file_name}", f"{destination_folder}/"], stdout=subprocess.PIPE)
fix_permissions(destination_folder + "/" + file_name)
elif Path(source_folder + "/" + file_name_str).is_dir():
# Applicable for .kext, .app, .plugin, .bundle, all of which are directories
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
fix_permissions(destination_folder + "/" + file_name)
else:
# Assume it's an individual file, replace as normal
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Found existing {file_name}, overwriting...")
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
logging.info(f" - Installing: {file_name}")
subprocess_wrapper.run_as_root_and_verify(generate_copy_arguments(f"{source_folder}/{file_name}", destination_folder), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
fix_permissions(destination_folder + "/" + file_name)
def remove_file(destination_folder: Path, file_name: str) -> None:
"""
Removes a file from the destination folder
Parameters:
destination_folder (Path): Path to the destination folder
file_name (str): Name of the file to remove
"""
if Path(destination_folder + "/" + file_name).exists():
logging.info(f" - Removing: {file_name}")
if Path(destination_folder + "/" + file_name).is_dir():
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", "-R", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
subprocess_wrapper.run_as_root_and_verify(["/bin/rm", f"{destination_folder}/{file_name}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def fix_permissions(destination_file: Path) -> None:
"""
Fix file permissions for a given file or directory
"""
chmod_args = ["/bin/chmod", "-Rf", "755", destination_file]
chown_args = ["/usr/sbin/chown", "-Rf", "root:wheel", destination_file]
if not Path(destination_file).is_dir():
# Strip recursive arguments
chmod_args.pop(1)
chown_args.pop(1)
subprocess_wrapper.run_as_root_and_verify(chmod_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root_and_verify(chown_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

View File

@@ -0,0 +1,167 @@
import logging
import subprocess
import plistlib
from pathlib import Path
from ... import constants
from ...datasets import os_data
from ...support import subprocess_wrapper, kdk_handler
from ...volume import generate_copy_arguments
class KernelDebugKitMerge:
def __init__(self, global_constants: constants.Constants, mount_location: str, skip_root_kmutil_requirement: bool) -> None:
self.constants: constants.Constants = global_constants
self.mount_location = mount_location
self.skip_root_kmutil_requirement = skip_root_kmutil_requirement
def _matching_kdk_already_merged(self, kdk_path: str) -> bool:
"""
Check whether the KDK is already merged with the root volume
"""
oclp_plist = Path("/System/Library/CoreServices/OpenCore-Legacy-Patcher.plist")
if not oclp_plist.exists():
return False
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
return False
try:
oclp_plist_data = plistlib.load(open(oclp_plist, "rb"))
if "Kernel Debug Kit Used" not in oclp_plist_data:
return False
if oclp_plist_data["Kernel Debug Kit Used"] == str(kdk_path):
logging.info("- Matching KDK determined to already be merged, skipping")
return True
except:
pass
return False
def _backup_hid_cs(self) -> None:
"""
Due to some IOHIDFamily oddities, we need to ensure their CodeSignature is retained
"""
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if not cs_path.exists():
return
logging.info("- Backing up IOHIDEventDriver CodeSignature")
subprocess_wrapper.run_as_root(generate_copy_arguments(cs_path, f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _restore_hid_cs(self) -> None:
"""
Restore IOHIDEventDriver CodeSignature
"""
if not Path(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak").exists():
return
logging.info("- Restoring IOHIDEventDriver CodeSignature")
cs_path = Path(self.mount_location) / Path("System/Library/Extensions/IOHIDFamily.kext/Contents/PlugIns/IOHIDEventDriver.kext/Contents/_CodeSignature")
if not cs_path.exists():
logging.info(" - CodeSignature folder missing, creating")
subprocess_wrapper.run_as_root(["/bin/mkdir", "-p", cs_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(generate_copy_arguments(f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak", cs_path), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(["/bin/rm", "-rf", f"{self.constants.payload_path}/IOHIDEventDriver_CodeSignature.bak"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _merge_kdk(self, kdk_path: str) -> None:
"""
Merge Kernel Debug Kit (KDK) with the root volume
"""
logging.info(f"- Merging KDK with Root Volume: {Path(kdk_path).name}")
subprocess_wrapper.run_as_root(
# Only merge '/System/Library/Extensions'
# 'Kernels' and 'KernelSupport' is wasted space for root patching (we don't care above dev kernels)
["/usr/bin/rsync", "-r", "-i", "-a", f"{kdk_path}/System/Library/Extensions/", f"{self.mount_location}/System/Library/Extensions"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if not (Path(self.mount_location) / Path("System/Library/Extensions/System.kext/PlugIns/Libkern.kext/Libkern")).exists():
logging.info("- Failed to merge KDK with Root Volume")
raise Exception("Failed to merge KDK with Root Volume")
logging.info("- Successfully merged KDK with Root Volume")
def merge(self, save_hid_cs: bool = False) -> str:
"""
Merge the Kernel Debug Kit (KDK) with the root volume
Returns KDK used
"""
if self.skip_root_kmutil_requirement is True:
return None
if self.constants.detected_os < os_data.os_data.ventura:
return None
# If a KDK was pre-downloaded, install it
if self.constants.kdk_download_path.exists():
if kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path) is False:
logging.info("Failed to install KDK")
raise Exception("Failed to install KDK")
# Next, grab KDK information (ie. what's the latest KDK for this OS)
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
# If no KDK is installed, download and install it
if kdk_obj.kdk_already_installed is False:
kdk_download_obj = kdk_obj.retrieve_download()
if not kdk_download_obj:
logging.info(f"Could not retrieve KDK: {kdk_obj.error_msg}")
raise Exception(f"Could not retrieve KDK: {kdk_obj.error_msg}")
# Hold thread until download is complete
kdk_download_obj.download(spawn_thread=False)
if kdk_download_obj.download_complete is False:
error_msg = kdk_download_obj.error_msg
logging.info(f"Could not download KDK: {error_msg}")
raise Exception(f"Could not download KDK: {error_msg}")
if kdk_obj.validate_kdk_checksum() is False:
logging.info(f"KDK checksum validation failed: {kdk_obj.error_msg}")
raise Exception(f"KDK checksum validation failed: {kdk_obj.error_msg}")
kdk_handler.KernelDebugKitUtilities().install_kdk_dmg(self.constants.kdk_download_path)
# re-init kdk_obj to get the new kdk_installed_path
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
logging.info(f"Unable to get KDK info: {kdk_obj.error_msg}")
raise Exception(f"Unable to get KDK info: {kdk_obj.error_msg}")
if kdk_obj.kdk_already_installed is False:
# We shouldn't get here, but just in case
logging.warning(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
raise Exception(f"KDK was not installed, but should have been: {kdk_obj.error_msg}")
kdk_path = Path(kdk_obj.kdk_installed_path) if kdk_obj.kdk_installed_path != "" else None
if kdk_path is None:
logging.info(f"- Unable to find Kernel Debug Kit")
raise Exception("Unable to find Kernel Debug Kit")
logging.info(f"- Found KDK at: {kdk_path}")
if self._matching_kdk_already_merged(kdk_path):
return kdk_path
if save_hid_cs is True:
self._backup_hid_cs()
self._merge_kdk(kdk_path)
if save_hid_cs is True:
self._restore_hid_cs()
return kdk_path

View File

@@ -12,7 +12,7 @@ from Cocoa import NSApp, NSApplication
from .. import constants
from ..sys_patch import sys_patch_detect
from ..sys_patch.detections import DetectRootPatch
from ..wx_gui import (
gui_cache_os_update,
@@ -64,7 +64,7 @@ class EntryPoint:
if "--gui_patch" in sys.argv or "--gui_unpatch" in sys.argv or start_patching is True :
entry = gui_sys_patch_start.SysPatchStartFrame
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
logging.info(f"Entry point set: {entry.__name__}")

View File

@@ -11,7 +11,7 @@ from pathlib import Path
from .. import constants
from ..sys_patch import sys_patch_detect
from ..sys_patch.detections import DetectRootPatch
from ..wx_gui import (
gui_main_menu,
@@ -86,7 +86,7 @@ class SysPatchDisplayFrame(wx.Frame):
patches: dict = {}
def _fetch_patches(self) -> None:
nonlocal patches
patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
thread = threading.Thread(target=_fetch_patches, args=(self,))
thread.start()

View File

@@ -20,7 +20,6 @@ from ..support import kdk_handler
from ..sys_patch import (
sys_patch,
sys_patch_detect
)
from ..wx_gui import (
gui_main_menu,
@@ -28,6 +27,8 @@ from ..wx_gui import (
gui_download,
)
from ..sys_patch.detections import DetectRootPatch
class SysPatchStartFrame(wx.Frame):
@@ -50,7 +51,7 @@ class SysPatchStartFrame(wx.Frame):
self.Centre()
if self.patches == {}:
self.patches = sys_patch_detect.DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
self.patches = DetectRootPatch(self.constants.computer.real_model, self.constants).detect_patch_set()
def _kdk_download(self, frame: wx.Frame = None) -> bool: