Move utilities directory to support

This commit is contained in:
Mykola Grymalyuk
2024-04-01 11:15:34 -06:00
parent 10da04ca3e
commit 83fec2952e
42 changed files with 53 additions and 40 deletions

View File

@@ -0,0 +1,148 @@
"""
analytics_handler.py: Analytics and Crash Reporting Handler
"""
import json
import datetime
import plistlib
from pathlib import Path
from .. import constants
from . import (
network_handler,
global_settings
)
DATE_FORMAT: str = "%Y-%m-%d %H-%M-%S"
ANALYTICS_SERVER: str = ""
SITE_KEY: str = ""
CRASH_URL: str = ANALYTICS_SERVER + "/crash"
VALID_ANALYTICS_ENTRIES: dict = {
'KEY': str, # Prevent abuse (embedded at compile time)
'UNIQUE_IDENTITY': str, # Host's UUID as SHA1 hash
'APPLICATION_NAME': str, # ex. OpenCore Legacy Patcher
'APPLICATION_VERSION': str, # ex. 0.2.0
'OS_VERSION': str, # ex. 10.15.7
'MODEL': str, # ex. MacBookPro11,5
'GPUS': list, # ex. ['Intel Iris Pro', 'AMD Radeon R9 M370X']
'FIRMWARE': str, # ex. APPLE
'LOCATION': str, # ex. 'US' (just broad region, don't need to be specific)
'TIMESTAMP': datetime.datetime, # ex. 2021-09-01-12-00-00
}
VALID_CRASH_ENTRIES: dict = {
'KEY': str, # Prevent abuse (embedded at compile time)
'APPLICATION_VERSION': str, # ex. 0.2.0
'APPLICATION_COMMIT': str, # ex. 0.2.0 or {commit hash if not a release}
'OS_VERSION': str, # ex. 10.15.7
'MODEL': str, # ex. MacBookPro11,5
'TIMESTAMP': datetime.datetime, # ex. 2021-09-01-12-00-00
'CRASH_LOG': str, # ex. "This is a crash log"
}
class Analytics:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self.unique_identity = str(self.constants.computer.uuid_sha1)
self.application = str("OpenCore Legacy Patcher")
self.version = str(self.constants.patcher_version)
self.os = str(self.constants.detected_os_version)
self.model = str(self.constants.computer.real_model)
self.date = str(datetime.datetime.now().strftime(DATE_FORMAT))
def send_analytics(self) -> None:
if global_settings.GlobalEnviromentSettings().read_property("DisableCrashAndAnalyticsReporting") is True:
return
self._generate_base_data()
self._post_analytics_data()
def send_crash_report(self, log_file: Path) -> None:
if ANALYTICS_SERVER == "":
return
if SITE_KEY == "":
return
if global_settings.GlobalEnviromentSettings().read_property("DisableCrashAndAnalyticsReporting") is True:
return
if not log_file.exists():
return
if self.constants.commit_info[0].startswith("refs/tags"):
# Avoid being overloaded with crash reports
return
commit_info = self.constants.commit_info[0].split("/")[-1] + "_" + self.constants.commit_info[1].split("T")[0] + "_" + self.constants.commit_info[2].split("/")[-1]
crash_data= {
"KEY": SITE_KEY,
"APPLICATION_VERSION": self.version,
"APPLICATION_COMMIT": commit_info,
"OS_VERSION": self.os,
"MODEL": self.model,
"TIMESTAMP": self.date,
"CRASH_LOG": log_file.read_text()
}
network_handler.NetworkUtilities().post(CRASH_URL, json = crash_data)
def _get_country(self) -> str:
# Get approximate country from .GlobalPreferences.plist
path = "/Library/Preferences/.GlobalPreferences.plist"
if not Path(path).exists():
return "US"
try:
result = plistlib.load(Path(path).open("rb"))
except:
return "US"
if "Country" not in result:
return "US"
return result["Country"]
def _generate_base_data(self) -> None:
self.gpus = []
self.firmware = str(self.constants.computer.firmware_vendor)
self.location = str(self._get_country())
for gpu in self.constants.computer.gpus:
self.gpus.append(str(gpu.arch))
self.data = {
'KEY': SITE_KEY,
'UNIQUE_IDENTITY': self.unique_identity,
'APPLICATION_NAME': self.application,
'APPLICATION_VERSION': self.version,
'OS_VERSION': self.os,
'MODEL': self.model,
'GPUS': self.gpus,
'FIRMWARE': self.firmware,
'LOCATION': self.location,
'TIMESTAMP': self.date,
}
# convert to JSON:
self.data = json.dumps(self.data)
def _post_analytics_data(self) -> None:
# Post data to analytics server
if ANALYTICS_SERVER == "":
return
if SITE_KEY == "":
return
network_handler.NetworkUtilities().post(ANALYTICS_SERVER, json = self.data)

View File

@@ -0,0 +1,273 @@
"""
arguments.py: CLI argument handling
"""
import sys
import time
import logging
import plistlib
import threading
import subprocess
from pathlib import Path
from .. import constants
from ..wx_gui import gui_entry
from ..efi_builder import build
from ..datasets import (
model_array,
os_data
)
from ..sys_patch import (
sys_patch,
sys_patch_auto
)
from . import (
utilities,
defaults,
validation
)
# Generic building args
class arguments:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self.args = utilities.check_cli_args()
self._parse_arguments()
def _parse_arguments(self) -> None:
"""
Parses arguments passed to the patcher
"""
if self.args.validate:
self._validation_handler()
return
if self.args.build:
self._build_handler()
return
if self.args.patch_sys_vol:
self._sys_patch_handler()
return
if self.args.unpatch_sys_vol:
self._sys_unpatch_handler()
return
if self.args.prepare_for_update:
self._prepare_for_update_handler()
return
if self.args.cache_os:
self._cache_os_handler()
return
if self.args.auto_patch:
self._sys_patch_auto_handler()
return
def _validation_handler(self) -> None:
"""
Enter validation mode
"""
logging.info("Set Validation Mode")
validation.PatcherValidation(self.constants)
def _sys_patch_handler(self) -> None:
"""
Start root volume patching
"""
logging.info("Set System Volume patching")
if "Library/InstallerSandboxes/" in str(self.constants.payload_path):
logging.info("- Running from Installer Sandbox, blocking OS updaters")
thread = threading.Thread(target=sys_patch.PatchSysVolume(self.constants.custom_model or self.constants.computer.real_model, self.constants, None).start_patch)
thread.start()
while thread.is_alive():
utilities.block_os_updaters()
time.sleep(1)
else:
sys_patch.PatchSysVolume(self.constants.custom_model or self.constants.computer.real_model, self.constants, None).start_patch()
def _sys_unpatch_handler(self) -> None:
"""
Start root volume unpatching
"""
logging.info("Set System Volume unpatching")
sys_patch.PatchSysVolume(self.constants.custom_model or self.constants.computer.real_model, self.constants, None).start_unpatch()
def _sys_patch_auto_handler(self) -> None:
"""
Start root volume auto patching
"""
logging.info("Set Auto patching")
sys_patch_auto.AutomaticSysPatch(self.constants).start_auto_patch()
def _prepare_for_update_handler(self) -> None:
"""
Prepare host for macOS update
"""
logging.info("Preparing host for macOS update")
os_data = utilities.fetch_staged_update(variant="Update")
if os_data[0] is None:
logging.info("No update staged, skipping")
return
os_version = os_data[0]
os_build = os_data[1]
logging.info(f"Preparing for update to {os_version} ({os_build})")
self._clean_le_handler()
def _cache_os_handler(self) -> None:
"""
Fetch KDK for incoming OS
"""
results = subprocess.run(["/bin/ps", "-ax"], stdout=subprocess.PIPE)
if results.stdout.decode("utf-8").count("OpenCore-Patcher --cache_os") > 1:
logging.info("Another instance of OS caching is running, exiting")
return
gui_entry.EntryPoint(self.constants).start(entry=gui_entry.SupportedEntryPoints.OS_CACHE)
def _clean_le_handler(self) -> None:
"""
Clean /Library/Extensions of problematic kexts
Note macOS Ventura and older do this automatically
"""
if self.constants.detected_os < os_data.os_data.sonoma:
return
logging.info("Cleaning /Library/Extensions")
for kext in Path("/Library/Extensions").glob("*.kext"):
if not Path(f"{kext}/Contents/Info.plist").exists():
continue
try:
kext_plist = plistlib.load(open(f"{kext}/Contents/Info.plist", "rb"))
except Exception as e:
logging.info(f" - Failed to load plist for {kext.name}: {e}")
continue
if "GPUCompanionBundles" not in kext_plist:
continue
logging.info(f" - Removing {kext.name}")
subprocess.run(["/bin/rm", "-rf", kext])
def _build_handler(self) -> None:
"""
Start config building process
"""
logging.info("Set OpenCore Build")
if self.args.model:
if self.args.model:
logging.info(f"- Using custom model: {self.args.model}")
self.constants.custom_model = self.args.model
defaults.GenerateDefaults(self.constants.custom_model, False, self.constants)
elif self.constants.computer.real_model not in model_array.SupportedSMBIOS and self.constants.allow_oc_everywhere is False:
logging.info(
"""Your model is not supported by this patcher for running unsupported OSes!"
If you plan to create the USB for another machine, please select the "Change Model" option in the menu."""
)
sys.exit(1)
else:
logging.info(f"- Using detected model: {self.constants.computer.real_model}")
defaults.GenerateDefaults(self.constants.custom_model, True, self.constants)
if self.args.verbose:
logging.info("- Set verbose configuration")
self.constants.verbose_debug = True
else:
self.constants.verbose_debug = False # Override Defaults detected
if self.args.debug_oc:
logging.info("- Set OpenCore DEBUG configuration")
self.constants.opencore_debug = True
if self.args.debug_kext:
logging.info("- Set kext DEBUG configuration")
self.constants.kext_debug = True
if self.args.hide_picker:
logging.info("- Set HidePicker configuration")
self.constants.showpicker = False
if self.args.disable_sip:
logging.info("- Set Disable SIP configuration")
self.constants.sip_status = False
else:
self.constants.sip_status = True # Override Defaults detected
if self.args.disable_smb:
logging.info("- Set Disable SecureBootModel configuration")
self.constants.secure_status = False
else:
self.constants.secure_status = True # Override Defaults detected
if self.args.vault:
logging.info("- Set Vault configuration")
self.constants.vault = True
if self.args.firewire:
logging.info("- Set FireWire Boot configuration")
self.constants.firewire_boot = True
if self.args.nvme:
logging.info("- Set NVMe Boot configuration")
self.constants.nvme_boot = True
if self.args.wlan:
logging.info("- Set Wake on WLAN configuration")
self.constants.enable_wake_on_wlan = True
if self.args.disable_tb:
logging.info("- Set Disable Thunderbolt configuration")
self.constants.disable_tb = True
if self.args.force_surplus:
logging.info("- Forcing SurPlus override configuration")
self.constants.force_surplus = True
if self.args.moderate_smbios:
logging.info("- Set Moderate SMBIOS Patching configuration")
self.constants.serial_settings = "Moderate"
if self.args.smbios_spoof:
if self.args.smbios_spoof == "Minimal":
self.constants.serial_settings = "Minimal"
elif self.args.smbios_spoof == "Moderate":
self.constants.serial_settings = "Moderate"
elif self.args.smbios_spoof == "Advanced":
self.constants.serial_settings = "Advanced"
else:
logging.info(f"- Unknown SMBIOS arg passed: {self.args.smbios_spoof}")
if self.args.support_all:
logging.info("- Building for natively supported model")
self.constants.allow_oc_everywhere = True
self.constants.serial_settings = "None"
build.BuildOpenCore(self.constants.custom_model or self.constants.computer.real_model, self.constants)

View File

@@ -0,0 +1,292 @@
#################################################################################
# Copyright (C) 2009-2011 Vladimir "Farcaller" Pouzanov <farcaller@gmail.com> #
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy #
# of this software and associated documentation files (the "Software"), to deal #
# in the Software without restriction, including without limitation the rights #
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell #
# copies of the Software, and to permit persons to whom the Software is #
# furnished to do so, subject to the following conditions: #
# #
# The above copyright notice and this permission notice shall be included in #
# all copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE #
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, #
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN #
# THE SOFTWARE. #
#################################################################################
import struct
import codecs
from datetime import datetime, timedelta
class BPListWriter(object):
def __init__(self, objects):
self.bplist = ""
self.objects = objects
def binary(self):
'''binary -> string
Generates bplist
'''
self.data = 'bplist00'
# TODO: flatten objects and count max length size
# TODO: write objects and save offsets
# TODO: write offsets
# TODO: write metadata
return self.data
def write(self, filename):
'''
Writes bplist to file
'''
if self.bplist != "":
pass
# TODO: save self.bplist to file
else:
raise Exception('BPlist not yet generated')
class BPListReader(object):
def __init__(self, s):
self.data = s
self.objects = []
self.resolved = {}
def __unpackIntStruct(self, sz, s):
'''__unpackIntStruct(size, string) -> int
Unpacks the integer of given size (1, 2 or 4 bytes) from string
'''
if sz == 1:
ot = '!B'
elif sz == 2:
ot = '!H'
elif sz == 4:
ot = '!I'
elif sz == 8:
ot = '!Q'
else:
raise Exception('int unpack size '+str(sz)+' unsupported')
return struct.unpack(ot, s)[0]
def __unpackInt(self, offset):
'''__unpackInt(offset) -> int
Unpacks int field from plist at given offset
'''
return self.__unpackIntMeta(offset)[1]
def __unpackIntMeta(self, offset):
'''__unpackIntMeta(offset) -> (size, int)
Unpacks int field from plist at given offset and returns its size and value
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
int_sz = 2**obj_info
return int_sz, self.__unpackIntStruct(int_sz, self.data[offset+1:offset+1+int_sz])
def __resolveIntSize(self, obj_info, offset):
'''__resolveIntSize(obj_info, offset) -> (count, offset)
Calculates count of objref* array entries and returns count and offset to first element
'''
if obj_info == 0x0F:
ofs, obj_count = self.__unpackIntMeta(offset+1)
objref = offset+2+ofs
else:
obj_count = obj_info
objref = offset+1
return obj_count, objref
def __unpackFloatStruct(self, sz, s):
'''__unpackFloatStruct(size, string) -> float
Unpacks the float of given size (4 or 8 bytes) from string
'''
if sz == 4:
ot = '!f'
elif sz == 8:
ot = '!d'
else:
raise Exception('float unpack size '+str(sz)+' unsupported')
return struct.unpack(ot, s)[0]
def __unpackFloat(self, offset):
'''__unpackFloat(offset) -> float
Unpacks float field from plist at given offset
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
int_sz = 2**obj_info
return int_sz, self.__unpackFloatStruct(int_sz, self.data[offset+1:offset+1+int_sz])
def __unpackDate(self, offset):
td = int(struct.unpack(">d", self.data[offset+1:offset+9])[0])
return datetime(year=2001,month=1,day=1) + timedelta(seconds=td)
def __unpackItem(self, offset):
'''__unpackItem(offset)
Unpacks and returns an item from plist
'''
obj_header = self.data[offset]
obj_type, obj_info = (obj_header & 0xF0), (obj_header & 0x0F)
if obj_type == 0x00:
if obj_info == 0x00: # null 0000 0000
return None
elif obj_info == 0x08: # bool 0000 1000 // false
return False
elif obj_info == 0x09: # bool 0000 1001 // true
return True
elif obj_info == 0x0F: # fill 0000 1111 // fill byte
raise Exception("0x0F Not Implemented") # this is really pad byte, FIXME
else:
raise Exception('unpack item type '+str(obj_header)+' at '+str(offset)+ 'failed')
elif obj_type == 0x10: # int 0001 nnnn ... // # of bytes is 2^nnnn, big-endian bytes
return self.__unpackInt(offset)
elif obj_type == 0x20: # real 0010 nnnn ... // # of bytes is 2^nnnn, big-endian bytes
return self.__unpackFloat(offset)
elif obj_type == 0x30: # date 0011 0011 ... // 8 byte float follows, big-endian bytes
return self.__unpackDate(offset)
elif obj_type == 0x40: # data 0100 nnnn [int] ... // nnnn is number of bytes unless 1111 then int count follows, followed by bytes
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count] # XXX: we return data as str
elif obj_type == 0x50: # string 0101 nnnn [int] ... // ASCII string, nnnn is # of chars, else 1111 then int count, then bytes
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count]
elif obj_type == 0x60: # string 0110 nnnn [int] ... // Unicode string, nnnn is # of chars, else 1111 then int count, then big-endian 2-byte uint16_t
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count*2].decode('utf-16be')
elif obj_type == 0x80: # uid 1000 nnnn ... // nnnn+1 is # of bytes
# FIXME: Accept as a string for now
obj_count, objref = self.__resolveIntSize(obj_info, offset)
return self.data[objref:objref+obj_count]
elif obj_type == 0xA0: # array 1010 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows
obj_count, objref = self.__resolveIntSize(obj_info, offset)
arr = []
for i in range(obj_count):
arr.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
return arr
elif obj_type == 0xC0: # set 1100 nnnn [int] objref* // nnnn is count, unless '1111', then int count follows
# XXX: not serializable via apple implementation
raise Exception("0xC0 Not Implemented") # FIXME: implement
elif obj_type == 0xD0: # dict 1101 nnnn [int] keyref* objref* // nnnn is count, unless '1111', then int count follows
obj_count, objref = self.__resolveIntSize(obj_info, offset)
keys = []
for i in range(obj_count):
keys.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
values = []
objref += obj_count*self.object_ref_size
for i in range(obj_count):
values.append(self.__unpackIntStruct(self.object_ref_size, self.data[objref+i*self.object_ref_size:objref+i*self.object_ref_size+self.object_ref_size]))
dic = {}
for i in range(obj_count):
dic[keys[i]] = values[i]
return dic
else:
raise Exception('don\'t know how to unpack obj type '+hex(obj_type)+' at '+str(offset))
def __resolveObject(self, idx):
try:
return self.resolved[idx]
except KeyError:
obj = self.objects[idx]
if type(obj) == list:
newArr = []
for i in obj:
newArr.append(self.__resolveObject(i))
self.resolved[idx] = newArr
return newArr
if type(obj) == dict:
newDic = {}
for k,v in obj.items():
key_resolved = self.__resolveObject(k)
if isinstance(key_resolved, str):
rk = key_resolved
else:
rk = codecs.decode(key_resolved, "utf-8")
rv = self.__resolveObject(v)
newDic[rk] = rv
self.resolved[idx] = newDic
return newDic
else:
self.resolved[idx] = obj
return obj
def parse(self):
# read header
if self.data[:8] != b'bplist00':
raise Exception('Bad magic')
# read trailer
self.offset_size, self.object_ref_size, self.number_of_objects, self.top_object, self.table_offset = struct.unpack('!6xBB4xI4xI4xI', self.data[-32:])
#print "** plist offset_size:",self.offset_size,"objref_size:",self.object_ref_size,"num_objs:",self.number_of_objects,"top:",self.top_object,"table_ofs:",self.table_offset
# read offset table
self.offset_table = self.data[self.table_offset:-32]
self.offsets = []
ot = self.offset_table
for i in range(self.number_of_objects):
offset_entry = ot[:self.offset_size]
ot = ot[self.offset_size:]
self.offsets.append(self.__unpackIntStruct(self.offset_size, offset_entry))
#print "** plist offsets:",self.offsets
# read object table
self.objects = []
k = 0
for i in self.offsets:
obj = self.__unpackItem(i)
#print "** plist unpacked",k,type(obj),obj,"at",i
k += 1
self.objects.append(obj)
# rebuild object tree
#for i in range(len(self.objects)):
# self.__resolveObject(i)
# return root object
return self.__resolveObject(self.top_object)
@classmethod
def plistWithString(cls, s):
parser = cls(s)
return parser.parse()
# helpers for testing
def plist(obj):
from Foundation import NSPropertyListSerialization, NSPropertyListBinaryFormat_v1_0
b = NSPropertyListSerialization.dataWithPropertyList_format_options_error_(obj, NSPropertyListBinaryFormat_v1_0, 0, None)
return str(b.bytes())
def unplist(s):
from Foundation import NSData, NSPropertyListSerialization
d = NSData.dataWithBytes_length_(s, len(s))
return NSPropertyListSerialization.propertyListWithData_options_format_error_(d, 0, None, None)
if __name__ == "__main__":
import os
import sys
import json
file_path = sys.argv[1]
with open(file_path, "rb") as fp:
data = fp.read()
out = BPListReader(data).parse()
with open(file_path + ".json", "w") as fp:
json.dump(out, indent=4)

View File

@@ -0,0 +1,54 @@
"""
commit_info.py: Parse Commit Info from binary's info.plist
"""
import plistlib
from pathlib import Path
class ParseCommitInfo:
def __init__(self, binary_path: str) -> None:
"""
Parameters:
binary_path (str): Path to binary
"""
self.binary_path = str(binary_path)
self.plist_path = self._convert_binary_path_to_plist_path()
def _convert_binary_path_to_plist_path(self) -> str or None:
"""
Resolve Info.plist path from binary path
"""
if Path(self.binary_path).exists():
plist_path = self.binary_path.replace("MacOS/OpenCore-Patcher", "Info.plist")
if Path(plist_path).exists() and plist_path.endswith(".plist"):
return plist_path
return None
def generate_commit_info(self) -> tuple:
"""
Generate commit info from Info.plist
Returns:
tuple: (Branch, Commit Date, Commit URL)
"""
if self.plist_path:
plist_info = plistlib.load(Path(self.plist_path).open("rb"))
if "Github" in plist_info:
return (
plist_info["Github"]["Branch"],
plist_info["Github"]["Commit Date"],
plist_info["Github"]["Commit URL"],
)
return (
"Running from source",
"Not applicable",
"",
)

View File

@@ -0,0 +1,361 @@
"""
defaults.py: Generate default data for host/target
"""
import subprocess
from .. import constants
from ..detections import device_probe
from . import (
utilities,
generate_smbios,
global_settings
)
from ..datasets import (
smbios_data,
cpu_data,
os_data
)
class GenerateDefaults:
def __init__(self, model: str, host_is_target: bool, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self.model: str = model
self.host_is_target: bool = host_is_target
# Reset Variables
self.constants.sip_status = True
self.constants.secure_status = False
self.constants.disable_cs_lv = False
self.constants.disable_amfi = False
self.constants.fu_status = True
self.constants.fu_arguments = None
self.constants.custom_serial_number = ""
self.constants.custom_board_serial_number = ""
if self.host_is_target is True:
for gpu in self.constants.computer.gpus:
if gpu.device_id_unspoofed == -1:
gpu.device_id_unspoofed = gpu.device_id
if gpu.vendor_id_unspoofed == -1:
gpu.vendor_id_unspoofed = gpu.vendor_id
self._general_probe()
self._nvram_probe()
self._gpu_probe()
self._networking_probe()
self._misc_hardwares_probe()
self._smbios_probe()
self._check_amfipass_supported()
def _general_probe(self) -> None:
"""
General probe for data
"""
if self.model in ["MacBookPro8,2", "MacBookPro8,3"]:
# Users disabling TS2 most likely have a faulty dGPU
# users can override this in settings
ts2_status = global_settings.GlobalEnviromentSettings().read_property("MacBookPro_TeraScale_2_Accel")
if ts2_status is True:
self.constants.allow_ts2_accel = True
else:
global_settings.GlobalEnviromentSettings().write_property("MacBookPro_TeraScale_2_Accel", False)
self.constants.allow_ts2_accel = False
if self.model in smbios_data.smbios_dictionary:
if smbios_data.smbios_dictionary[self.model]["CPU Generation"] >= cpu_data.CPUGen.skylake.value:
# On 2016-2017 MacBook Pros, 15" devices used a stock Samsung SSD with IONVMeController
# Technically this should be patched based on NVMeFix.kext logic,
# however Apple deemed the SSD unsupported for enhanced performance
# In addition, some upgraded NVMe drives still have issues with enhanced power management
# Safest to disable by default, allow user to configure afterwards
self.constants.allow_nvme_fixing = False
else:
self.constants.allow_nvme_fixing = True
# Check if running in RecoveryOS
self.constants.recovery_status = utilities.check_recovery()
if global_settings.GlobalEnviromentSettings().read_property("Force_Web_Drivers") is True:
self.constants.force_nv_web = True
result = global_settings.GlobalEnviromentSettings().read_property("ShouldNukeKDKs")
if result is False:
self.constants.should_nuke_kdks = False
def _smbios_probe(self) -> None:
"""
SMBIOS specific probe
"""
if not self.host_is_target:
if self.model in ["MacPro4,1", "MacPro5,1"]:
# Allow H.265 on AMD
# Assume 2009+ machines have Polaris on pre-builts (internal testing)
# Hardware Detection will never hit this
self.constants.serial_settings = "Minimal"
# Check if model uses T2 SMBIOS, if so see if it needs root patching (determined earlier on via SIP variable)
# If not, allow SecureBootModel usage, otherwise force VMM patching
# Needed for macOS Monterey to allow OTA updates
try:
spoof_model = generate_smbios.set_smbios_model_spoof(self.model)
except:
# Native Macs (mainly M1s) will error out as they don't know what SMBIOS to spoof to
# As we don't spoof on native models, we can safely ignore this
spoof_model = self.model
if spoof_model in smbios_data.smbios_dictionary:
if smbios_data.smbios_dictionary[spoof_model]["SecureBootModel"] is not None:
if self.constants.sip_status is False:
# Force VMM as root patching breaks .im4m signature
self.constants.secure_status = False
self.constants.force_vmm = True
else:
# Allow SecureBootModel
self.constants.secure_status = True
self.constants.force_vmm = False
def _nvram_probe(self) -> None:
"""
NVRAM specific probe
"""
if not self.host_is_target:
return
if "-v" in (utilities.get_nvram("boot-args") or ""):
self.constants.verbose_debug = True
self.constants.custom_serial_number = utilities.get_nvram("OCLP-Spoofed-SN", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
self.constants.custom_board_serial_number = utilities.get_nvram("OCLP-Spoofed-MLB", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
if self.constants.custom_serial_number is None or self.constants.custom_board_serial_number is None:
# If either variables are missing, we assume something is wrong with the spoofed variables and reset
self.constants.custom_serial_number = ""
self.constants.custom_board_serial_number = ""
def _networking_probe(self) -> None:
"""
Networking specific probe
"""
is_legacy_wifi = False
is_modern_wifi = False
if self.host_is_target:
if (
(
isinstance(self.constants.computer.wifi, device_probe.Broadcom) and
self.constants.computer.wifi.chipset in [
device_probe.Broadcom.Chipsets.AirPortBrcm4331,
device_probe.Broadcom.Chipsets.AirPortBrcm43224,
]
) or (
isinstance(self.constants.computer.wifi, device_probe.Atheros) and
self.constants.computer.wifi.chipset == device_probe.Atheros.Chipsets.AirPortAtheros40
)
):
is_legacy_wifi = True
elif (
(
isinstance(self.constants.computer.wifi, device_probe.Broadcom) and
self.constants.computer.wifi.chipset in [
device_probe.Broadcom.Chipsets.AirPortBrcm4360,
device_probe.Broadcom.Chipsets.AirportBrcmNIC,
]
)
):
is_modern_wifi = True
else:
print("Checking WiFi")
if self.model not in smbios_data.smbios_dictionary:
return
if (
smbios_data.smbios_dictionary[self.model]["Wireless Model"] in [
device_probe.Broadcom.Chipsets.AirPortBrcm4331,
device_probe.Broadcom.Chipsets.AirPortBrcm43224,
device_probe.Atheros.Chipsets.AirPortAtheros40,
]
):
is_legacy_wifi = True
elif (
smbios_data.smbios_dictionary[self.model]["Wireless Model"] in [
device_probe.Broadcom.Chipsets.AirPortBrcm4360,
device_probe.Broadcom.Chipsets.AirportBrcmNIC,
]
):
print("Modern WiFi")
is_modern_wifi = True
if is_legacy_wifi is False and is_modern_wifi is False:
return
# 12.0: Legacy Wireless chipsets require root patching
# 14.0: Modern Wireless chipsets require root patching
self.constants.sip_status = False
self.constants.secure_status = False
self.constants.disable_cs_lv = True
self.constants.disable_amfi = True
if is_legacy_wifi is True:
# 13.0: Enabling AirPlay to Mac patches breaks Control Center on legacy chipsets
# AirPlay to Mac was unsupported regardless, so we can safely disable it
self.constants.fu_status = True
self.constants.fu_arguments = " -disable_sidecar_mac"
def _misc_hardwares_probe(self) -> None:
"""
Misc probe
"""
if self.host_is_target:
if self.constants.computer.usb_controllers:
if self.model in smbios_data.smbios_dictionary:
if smbios_data.smbios_dictionary[self.model]["CPU Generation"] < cpu_data.CPUGen.ivy_bridge.value:
# Pre-Ivy do not natively support XHCI boot support
# If we detect XHCI on older model, enable
for controller in self.constants.computer.usb_controllers:
if isinstance(controller, device_probe.XHCIController):
self.constants.xhci_boot = True
break
def _gpu_probe(self) -> None:
"""
Graphics specific probe
"""
gpu_archs = []
if self.host_is_target:
gpu_archs = [gpu.arch for gpu in self.constants.computer.gpus if gpu.class_code != 0xFFFFFFFF]
else:
if self.model in smbios_data.smbios_dictionary:
gpu_archs = smbios_data.smbios_dictionary[self.model]["Stock GPUs"]
for arch in gpu_archs:
# Legacy Metal Logic
if arch in [
device_probe.Intel.Archs.Ivy_Bridge,
device_probe.Intel.Archs.Haswell,
device_probe.Intel.Archs.Broadwell,
device_probe.Intel.Archs.Skylake,
device_probe.NVIDIA.Archs.Kepler,
device_probe.AMD.Archs.Legacy_GCN_7000,
device_probe.AMD.Archs.Legacy_GCN_8000,
device_probe.AMD.Archs.Legacy_GCN_9000,
device_probe.AMD.Archs.Polaris,
device_probe.AMD.Archs.Polaris_Spoof,
device_probe.AMD.Archs.Vega,
device_probe.AMD.Archs.Navi,
]:
if arch in [
device_probe.Intel.Archs.Ivy_Bridge,
device_probe.Intel.Archs.Haswell,
device_probe.NVIDIA.Archs.Kepler,
]:
self.constants.disable_amfi = True
if arch in [
device_probe.AMD.Archs.Legacy_GCN_7000,
device_probe.AMD.Archs.Legacy_GCN_8000,
device_probe.AMD.Archs.Legacy_GCN_9000,
device_probe.AMD.Archs.Polaris,
device_probe.AMD.Archs.Polaris_Spoof,
device_probe.AMD.Archs.Vega,
device_probe.AMD.Archs.Navi,
]:
if arch == device_probe.AMD.Archs.Legacy_GCN_7000:
# Check if we're running in Rosetta
if self.host_is_target:
if self.constants.computer.rosetta_active is True:
continue
# Allow H.265 on AMD
if self.model in smbios_data.smbios_dictionary:
if "Socketed GPUs" in smbios_data.smbios_dictionary[self.model]:
self.constants.serial_settings = "Minimal"
# See if system can use the native AMD stack in Ventura
if arch in [
device_probe.AMD.Archs.Polaris,
device_probe.AMD.Archs.Polaris_Spoof,
device_probe.AMD.Archs.Vega,
device_probe.AMD.Archs.Navi,
]:
if self.host_is_target:
if "AVX2" in self.constants.computer.cpu.leafs:
continue
else:
if self.model in smbios_data.smbios_dictionary:
if smbios_data.smbios_dictionary[self.model]["CPU Generation"] >= cpu_data.CPUGen.haswell.value:
continue
self.constants.sip_status = False
self.constants.secure_status = False
self.constants.disable_cs_lv = True
# Non-Metal Logic
elif arch in [
device_probe.Intel.Archs.Iron_Lake,
device_probe.Intel.Archs.Sandy_Bridge,
device_probe.NVIDIA.Archs.Tesla,
device_probe.NVIDIA.Archs.Fermi,
device_probe.NVIDIA.Archs.Maxwell,
device_probe.NVIDIA.Archs.Pascal,
device_probe.AMD.Archs.TeraScale_1,
device_probe.AMD.Archs.TeraScale_2,
]:
self.constants.sip_status = False
self.constants.secure_status = False
self.constants.disable_cs_lv = True
if os_data.os_data.ventura in self.constants.legacy_accel_support:
# Only disable AMFI if we officially support Ventura
self.constants.disable_amfi = True
for key in ["Moraea_BlurBeta"]:
# Enable BetaBlur if user hasn't disabled it
is_key_enabled = subprocess.run(["/usr/bin/defaults", "read", "-globalDomain", key], stdout=subprocess.PIPE).stdout.decode("utf-8").strip()
if is_key_enabled not in ["false", "0"]:
subprocess.run(["/usr/bin/defaults", "write", "-globalDomain", key, "-bool", "true"])
def _check_amfipass_supported(self) -> None:
"""
Check if root volume supports AMFIPass
The basic requirements of this function are:
- The host is the target
- Root volume doesn't have adhoc signed binaries
If all of these conditions are met, it is safe to disable AMFI and CS_LV. Otherwise, for safety, leave it be.
"""
if not self.host_is_target:
# Unknown whether the host is using old binaries
# Rebuild it once you are on the host
return
# Check for adhoc signed binaries
if self.constants.computer.oclp_sys_signed is False:
# Root patch with new binaries, then reboot
return
# Note: simply checking the authority is not enough, as the authority can be spoofed
# (but do we really care? this is just a simple check)
# Note: the cert will change
self.constants.disable_amfi = False
self.constants.disable_cs_lv = False

View File

@@ -0,0 +1,177 @@
"""
generate_smbios.py: SMBIOS generation for OpenCore Legacy Patcher
"""
import logging
from . import utilities
from ..datasets import (
smbios_data,
os_data,
cpu_data
)
def set_smbios_model_spoof(model):
try:
smbios_data.smbios_dictionary[model]["Screen Size"]
# Found mobile SMBIOS
if model.startswith("MacBookAir"):
return "MacBookAir8,1"
elif model.startswith("MacBookPro"):
if smbios_data.smbios_dictionary[model]["Screen Size"] == 13:
return "MacBookPro14,1"
elif smbios_data.smbios_dictionary[model]["Screen Size"] >= 15:
# 15" and 17"
return "MacBookPro14,3"
else:
# Unknown Model
raise Exception(f"Unknown SMBIOS for spoofing: {model}")
elif model.startswith("MacBook"):
if smbios_data.smbios_dictionary[model]["Screen Size"] == 13:
return "MacBookAir8,1"
elif smbios_data.smbios_dictionary[model]["Screen Size"] == 12:
return "MacBook10,1"
else:
# Unknown Model
raise Exception(f"Unknown SMBIOS for spoofing: {model}")
else:
# Unknown Model
raise Exception(f"Unknown SMBIOS for spoofing: {model}")
except KeyError:
# Found desktop model
if model.startswith("MacPro") or model.startswith("Xserve"):
return "MacPro7,1"
elif model.startswith("Macmini"):
return "Macmini8,1"
elif model.startswith("iMac"):
if smbios_data.smbios_dictionary[model]["Max OS Supported"] <= os_data.os_data.high_sierra:
# Models dropped in Mojave either do not have an iGPU, or should have them disabled
return "iMacPro1,1"
else:
return "iMac18,3"
else:
# Unknown Model
raise Exception(f"Unknown SMBIOS for spoofing: {model}")
def update_firmware_features(firmwarefeature):
# Adjust FirmwareFeature to support everything macOS requires
# APFS Bit (19/20): 10.13+ (OSInstall)
# Large BaseSystem Bit (35): 12.0 B7+ (patchd)
# https://github.com/acidanthera/OpenCorePkg/tree/2f76673546ac3e32d2e2d528095fddcd66ad6a23/Include/Apple/IndustryStandard/AppleFeatures.h
firmwarefeature |= 2 ** 19 # FW_FEATURE_SUPPORTS_APFS
firmwarefeature |= 2 ** 20 # FW_FEATURE_SUPPORTS_APFS_EXTRA
firmwarefeature |= 2 ** 35 # FW_FEATURE_SUPPORTS_LARGE_BASESYSTEM
return firmwarefeature
def generate_fw_features(model, custom):
if not custom:
firmwarefeature = utilities.get_rom("firmware-features")
if not firmwarefeature:
logging.info("- Failed to find FirmwareFeatures, falling back on defaults")
if smbios_data.smbios_dictionary[model]["FirmwareFeatures"] is None:
firmwarefeature = 0
else:
firmwarefeature = int(smbios_data.smbios_dictionary[model]["FirmwareFeatures"], 16)
else:
if smbios_data.smbios_dictionary[model]["FirmwareFeatures"] is None:
firmwarefeature = 0
else:
firmwarefeature = int(smbios_data.smbios_dictionary[model]["FirmwareFeatures"], 16)
firmwarefeature = update_firmware_features(firmwarefeature)
return firmwarefeature
def find_model_off_board(board):
# Find model based off Board ID provided
# Return none if unknown
# Strip extra data from Target Types (ap, uppercase)
if not (board.startswith("Mac-") or board.startswith("VMM-")):
if board.lower().endswith("ap"):
board = board[:-2]
board = board.lower()
for key in smbios_data.smbios_dictionary:
if board in [smbios_data.smbios_dictionary[key]["Board ID"], smbios_data.smbios_dictionary[key]["SecureBootModel"]]:
if key.endswith("_v2") or key.endswith("_v3") or key.endswith("_v4"):
# smbios_data has duplicate SMBIOS to handle multiple board IDs
key = key[:-3]
if key == "MacPro4,1":
# 4,1 and 5,1 have the same board ID, best to return the newer ID
key = "MacPro5,1"
return key
return None
def find_board_off_model(model):
if model in smbios_data.smbios_dictionary:
return smbios_data.smbios_dictionary[model]["Board ID"]
else:
return None
def check_firewire(model):
# MacBooks never supported FireWire
# Pre-Thunderbolt MacBook Airs as well
if model.startswith("MacBookPro"):
return True
elif model.startswith("MacBookAir"):
if smbios_data.smbios_dictionary[model]["CPU Generation"] < cpu_data.CPUGen.sandy_bridge.value:
return False
elif model.startswith("MacBook"):
return False
else:
return True
def determine_best_board_id_for_sandy(current_board_id, gpus):
# This function is mainly for users who are either spoofing or using hackintoshes
# Generally hackintosh will use whatever the latest SMBIOS is, so we need to determine
# the best Board ID to patch inside of AppleIntelSNBGraphicsFB
# Currently the kext supports the following models:
# MacBookPro8,1 - Mac-94245B3640C91C81 (13")
# MacBookPro8,2 - Mac-94245A3940C91C80 (15")
# MacBookPro8,3 - Mac-942459F5819B171B (17")
# MacBookAir4,1 - Mac-C08A6BB70A942AC2 (11")
# MacBookAir4,2 - Mac-742912EFDBEE19B3 (13")
# Macmini5,1 - Mac-8ED6AF5B48C039E1
# Macmini5,2 - Mac-4BC72D62AD45599E (headless)
# Macmini5,3 - Mac-7BA5B2794B2CDB12
# iMac12,1 - Mac-942B5BF58194151B (headless)
# iMac12,2 - Mac-942B59F58194171B (headless)
# Unknown(MBP) - Mac-94245AF5819B141B
# Unknown(iMac) - Mac-942B5B3A40C91381 (headless)
if current_board_id:
model = find_model_off_board(current_board_id)
if model:
if model.startswith("MacBook"):
try:
size = int(smbios_data.smbios_dictionary[model]["Screen Size"])
except KeyError:
size = 13 # Assume 13 if it's missing
if model.startswith("MacBookPro"):
if size >= 17:
return find_board_off_model("MacBookPro8,3")
elif size >= 15:
return find_board_off_model("MacBookPro8,2")
else:
return find_board_off_model("MacBookPro8,1")
else: # MacBook and MacBookAir
if size >= 13:
return find_board_off_model("MacBookAir4,2")
else:
return find_board_off_model("MacBookAir4,1")
else:
# We're working with a desktop, so need to figure out whether the unit is running headless or not
if len(gpus) > 1:
# More than 1 GPU detected, assume headless
if model.startswith("Macmini"):
return find_board_off_model("Macmini5,2")
else:
return find_board_off_model("iMac12,2")
else:
return find_board_off_model("Macmini5,1")
return find_board_off_model("Macmini5,1") # Safest bet if we somehow don't know the model

View File

@@ -0,0 +1,126 @@
"""
global_settings.py: Library for querying and writing global enviroment settings
Alternative to Apple's 'defaults' tool
Store data in '/Users/Shared'
This is to ensure compatibility when running without a user
ie. during automated patching
"""
import os
import logging
import plistlib
import subprocess
from pathlib import Path
class GlobalEnviromentSettings:
"""
Library for querying and writing global enviroment settings
"""
def __init__(self) -> None:
self.file_name: str = ".com.dortania.opencore-legacy-patcher.plist"
self.global_settings_folder: str = "/Users/Shared"
self.global_settings_plist: str = f"{self.global_settings_folder}/{self.file_name}"
self._generate_settings_file()
self._convert_defaults_to_global_settings()
self._fix_file_permission()
def read_property(self, property_name: str) -> str or None:
"""
Reads a property from the global settings file
"""
if Path(self.global_settings_plist).exists():
try:
plist = plistlib.load(Path(self.global_settings_plist).open("rb"))
except Exception as e:
logging.error("Error: Unable to read global settings file")
logging.error(e)
return None
if property_name in plist:
return plist[property_name]
return None
def write_property(self, property_name: str, property_value) -> None:
"""
Writes a property to the global settings file
"""
if Path(self.global_settings_plist).exists():
try:
plist = plistlib.load(Path(self.global_settings_plist).open("rb"))
except Exception as e:
logging.error("Error: Unable to read global settings file")
logging.error(e)
return
plist[property_name] = property_value
try:
plistlib.dump(plist, Path(self.global_settings_plist).open("wb"))
except PermissionError:
logging.info("Failed to write to global settings file")
def _generate_settings_file(self) -> None:
if Path(self.global_settings_plist).exists():
return
try:
plistlib.dump({"Developed by Dortania": True,}, Path(self.global_settings_plist).open("wb"))
except PermissionError:
logging.info("Permission error: Unable to write to global settings file")
def _convert_defaults_to_global_settings(self) -> None:
"""
Converts legacy defaults to global settings
"""
defaults_path = "~/Library/Preferences/com.dortania.opencore-legacy-patcher.plist"
defaults_path = Path(defaults_path).expanduser()
if Path(defaults_path).exists():
# merge defaults with global settings
try:
defaults_plist = plistlib.load(Path(defaults_path).open("rb"))
global_settings_plist = plistlib.load(Path(self.global_settings_plist).open("rb"))
except Exception as e:
logging.error("Error: Unable to read global settings file")
logging.error(e)
return
global_settings_plist.update(defaults_plist)
try:
plistlib.dump(global_settings_plist, Path(self.global_settings_plist).open("wb"))
except PermissionError:
logging.info("Permission error: Unable to write to global settings file")
return
# delete defaults plist
try:
Path(defaults_path).unlink()
except Exception as e:
logging.error("Error: Unable to delete defaults plist")
logging.error(e)
def _fix_file_permission(self) -> None:
"""
Fixes file permission for log file
If OCLP was invoked as root, file permission will only allow root to write to settings file
This in turn breaks normal OCLP execution to write to settings file
"""
if os.geteuid() != 0:
return
# Set file permission to allow any user to write to log file
result = subprocess.run(["/bin/chmod", "777", self.global_settings_plist], capture_output=True)
if result.returncode != 0:
logging.warning("Failed to fix settings file permissions:")
if result.stderr:
logging.warning(result.stderr.decode("utf-8"))

View File

@@ -0,0 +1,177 @@
"""
install.py: Installation of OpenCore files to ESP
"""
import logging
import plistlib
import subprocess
import applescript
from pathlib import Path
from . import utilities
from .. import constants
from ..datasets import os_data
class tui_disk_installation:
def __init__(self, versions):
self.constants: constants.Constants = versions
def list_disks(self):
all_disks = {}
# TODO: AllDisksAndPartitions is not supported in Snow Leopard and older
try:
# High Sierra and newer
disks = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "list", "-plist", "physical"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
except ValueError:
# Sierra and older
disks = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "list", "-plist"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
for disk in disks["AllDisksAndPartitions"]:
disk_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", disk["DeviceIdentifier"]], stdout=subprocess.PIPE).stdout.decode().strip().encode())
try:
all_disks[disk["DeviceIdentifier"]] = {"identifier": disk_info["DeviceNode"], "name": disk_info["MediaName"], "size": disk_info["TotalSize"], "partitions": {}}
for partition in disk["Partitions"]:
partition_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", partition["DeviceIdentifier"]], stdout=subprocess.PIPE).stdout.decode().strip().encode())
all_disks[disk["DeviceIdentifier"]]["partitions"][partition["DeviceIdentifier"]] = {
"fs": partition_info.get("FilesystemType", partition_info["Content"]),
"type": partition_info["Content"],
"name": partition_info.get("VolumeName", ""),
"size": partition_info["TotalSize"],
}
except KeyError:
# Avoid crashing with CDs installed
continue
supported_disks = {}
for disk in all_disks:
if not any(all_disks[disk]["partitions"][partition]["fs"] in ("msdos", "EFI") for partition in all_disks[disk]["partitions"]):
continue
supported_disks.update({
disk: {
"disk": disk,
"name": all_disks[disk]["name"],
"size": utilities.human_fmt(all_disks[disk]['size']),
"partitions": all_disks[disk]["partitions"]
}
})
return supported_disks
def list_partitions(self, disk_response, supported_disks):
# Takes disk UUID as well as diskutil dataset generated by list_disks
# Returns list of FAT32 partitions
disk_identifier = disk_response
selected_disk = supported_disks[disk_identifier]
supported_partitions = {}
for partition in selected_disk["partitions"]:
if selected_disk["partitions"][partition]["fs"] not in ("msdos", "EFI"):
continue
supported_partitions.update({
partition: {
"partition": partition,
"name": selected_disk["partitions"][partition]["name"],
"size": utilities.human_fmt(selected_disk["partitions"][partition]["size"])
}
})
return supported_partitions
def _determine_sd_card(self, media_name: str):
# Array filled with common SD Card names
# Note most USB-based SD Card readers generally report as "Storage Device"
# Thus no reliable way to detect further without parsing IOService output (kUSBProductString)
if any(x in media_name for x in ("SD Card", "SD/MMC", "SDXC Reader", "SD Reader", "Card Reader")):
return True
return False
def install_opencore(self, full_disk_identifier: str):
# TODO: Apple Script fails in Yosemite(?) and older
logging.info(f"Mounting partition: {full_disk_identifier}")
if self.constants.detected_os >= os_data.os_data.el_capitan and not self.constants.recovery_status:
try:
applescript.AppleScript(f'''do shell script "diskutil mount {full_disk_identifier}" with prompt "OpenCore Legacy Patcher needs administrator privileges to mount this volume." with administrator privileges without altering line endings''').run()
except applescript.ScriptError as e:
if "User canceled" in str(e):
logging.info("Mount cancelled by user")
return
logging.info(f"An error occurred: {e}")
if utilities.check_boot_mode() == "safe_boot":
logging.info("\nSafe Mode detected. FAT32 is unsupported by macOS in this mode.")
logging.info("Please disable Safe Mode and try again.")
return
else:
result = subprocess.run(["/usr/sbin/diskutil", "mount", full_disk_identifier], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("Mount failed")
logging.info(result.stderr.decode())
return
partition_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", full_disk_identifier], stdout=subprocess.PIPE).stdout.decode().strip().encode())
parent_disk = partition_info["ParentWholeDisk"]
drive_host_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", parent_disk], stdout=subprocess.PIPE).stdout.decode().strip().encode())
sd_type = drive_host_info["MediaName"]
try:
ssd_type = drive_host_info["SolidState"]
except KeyError:
ssd_type = False
mount_path = Path(partition_info["MountPoint"])
disk_type = partition_info["BusProtocol"]
if not mount_path.exists():
logging.info("EFI failed to mount!")
return False
if (mount_path / Path("EFI/OC")).exists():
logging.info("Removing preexisting EFI/OC folder")
subprocess.run(["/bin/rm", "-rf", mount_path / Path("EFI/OC")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if (mount_path / Path("System")).exists():
logging.info("Removing preexisting System folder")
subprocess.run(["/bin/rm", "-rf", mount_path / Path("System")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if (mount_path / Path("boot.efi")).exists():
logging.info("Removing preexisting boot.efi")
subprocess.run(["/bin/rm", mount_path / Path("boot.efi")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info("Copying OpenCore onto EFI partition")
subprocess.run(["/bin/mkdir", "-p", mount_path / Path("EFI")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.run(["/bin/cp", "-r", self.constants.opencore_release_folder / Path("EFI/OC"), mount_path / Path("EFI/OC")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.run(["/bin/cp", "-r", self.constants.opencore_release_folder / Path("System"), mount_path / Path("System")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if Path(self.constants.opencore_release_folder / Path("boot.efi")).exists():
subprocess.run(["/bin/cp", self.constants.opencore_release_folder / Path("boot.efi"), mount_path / Path("boot.efi")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.constants.boot_efi is True:
logging.info("Converting Bootstrap to BOOTx64.efi")
if (mount_path / Path("EFI/BOOT")).exists():
subprocess.run(["/bin/rm", "-rf", mount_path / Path("EFI/BOOT")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Path(mount_path / Path("EFI/BOOT")).mkdir()
subprocess.run(["/bin/mv", mount_path / Path("System/Library/CoreServices/boot.efi"), mount_path / Path("EFI/BOOT/BOOTx64.efi")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.run(["/bin/rm", "-rf", mount_path / Path("System")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self._determine_sd_card(sd_type) is True:
logging.info("Adding SD Card icon")
subprocess.run(["/bin/cp", self.constants.icon_path_sd, mount_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
elif ssd_type is True:
logging.info("Adding SSD icon")
subprocess.run(["/bin/cp", self.constants.icon_path_ssd, mount_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
elif disk_type == "USB":
logging.info("Adding External USB Drive icon")
subprocess.run(["/bin/cp", self.constants.icon_path_external, mount_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
logging.info("Adding Internal Drive icon")
subprocess.run(["/bin/cp", self.constants.icon_path_internal, mount_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info("Cleaning install location")
if not self.constants.recovery_status:
logging.info("Unmounting EFI partition")
subprocess.run(["/usr/sbin/diskutil", "umount", mount_path], stdout=subprocess.PIPE).stdout.decode().strip().encode()
logging.info("OpenCore transfer complete")
return True

View File

@@ -0,0 +1,134 @@
"""
integrity_verification.py: Validate the integrity of Apple downloaded files via .chunklist and .integrityDataV1 files
Based off of chunklist.py:
- https://gist.github.com/dhinakg/cbe30edf31ddc153fd0b0c0570c9b041
"""
import enum
import hashlib
import logging
import binascii
import threading
from typing import Union
from pathlib import Path
CHUNK_LENGTH = 4 + 32
class ChunklistStatus(enum.Enum):
"""
Chunklist status
"""
IN_PROGRESS = 0
SUCCESS = 1
FAILURE = 2
class ChunklistVerification:
"""
Library to validate Apple's files against their chunklist format
Supports both chunklist and integrityDataV1 files
- Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h
Parameters:
file_path (Path): Path to the file to validate
chunklist_path (Path): Path to the chunklist file
Usage:
>>> chunk_obj = ChunklistVerification("InstallAssistant.pkg", "InstallAssistant.pkg.integrityDataV1")
>>> chunk_obj.validate()
>>> while chunk_obj.status == ChunklistStatus.IN_PROGRESS:
... print(f"Validating {chunk_obj.current_chunk} of {chunk_obj.total_chunks}")
>>> if chunk_obj.status == ChunklistStatus.FAILURE:
... print(chunk_obj.error_msg)
"""
def __init__(self, file_path: Path, chunklist_path: Union[Path, bytes]) -> None:
if isinstance(chunklist_path, bytes):
self.chunklist_path: bytes = chunklist_path
else:
self.chunklist_path: Path = Path(chunklist_path)
self.file_path: Path = Path(file_path)
self.chunks: dict = self._generate_chunks(self.chunklist_path)
self.error_msg: str = ""
self.current_chunk: int = 0
self.total_chunks: int = len(self.chunks)
self.status: ChunklistStatus = ChunklistStatus.IN_PROGRESS
def _generate_chunks(self, chunklist: Union[Path, bytes]) -> dict:
"""
Generate a dictionary of the chunklist header and chunks
Parameters:
chunklist (Path | bytes): Path to the chunklist file or the chunklist file itself
"""
chunklist: bytes = chunklist if isinstance(chunklist, bytes) else chunklist.read_bytes()
# Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h#L59-L69
header: dict = {
"magic": chunklist[:4],
"length": int.from_bytes(chunklist[4:8], "little"),
"fileVersion": chunklist[8],
"chunkMethod": chunklist[9],
"sigMethod": chunklist[10],
"chunkCount": int.from_bytes(chunklist[12:20], "little"),
"chunkOffset": int.from_bytes(chunklist[20:28], "little"),
"sigOffset": int.from_bytes(chunklist[28:36], "little")
}
if header["magic"] != b"CNKL":
return None
all_chunks = chunklist[header["chunkOffset"]:header["chunkOffset"]+header["chunkCount"]*CHUNK_LENGTH]
chunks = [{"length": int.from_bytes(all_chunks[i:i+4], "little"), "checksum": all_chunks[i+4:i+CHUNK_LENGTH]} for i in range(0, len(all_chunks), CHUNK_LENGTH)]
return chunks
def _validate(self) -> None:
"""
Validates provided file against chunklist
"""
if self.chunks is None:
self.status = ChunklistStatus.FAILURE
return
if not Path(self.file_path).exists():
self.error_msg = f"File {self.file_path} does not exist"
self.status = ChunklistStatus.FAILURE
logging.info(self.error_msg)
return
if not Path(self.file_path).is_file():
self.error_msg = f"File {self.file_path} is not a file"
self.status = ChunklistStatus.FAILURE
logging.info(self.error_msg)
return
with self.file_path.open("rb") as f:
for chunk in self.chunks:
self.current_chunk += 1
status = hashlib.sha256(f.read(chunk["length"])).digest()
if status != chunk["checksum"]:
self.error_msg = f"Chunk {self.current_chunk} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}"
self.status = ChunklistStatus.FAILURE
logging.info(self.error_msg)
return
self.status = ChunklistStatus.SUCCESS
def validate(self) -> None:
"""
Spawns _validate() thread
"""
threading.Thread(target=self._validate).start()

View File

@@ -0,0 +1,694 @@
"""
kdk_handler.py: Module for parsing and determining best Kernel Debug Kit for host OS
"""
import os
import logging
import plistlib
import requests
import tempfile
import subprocess
import packaging.version
from typing import cast
from pathlib import Path
from .. import constants
from ..datasets import os_data
from . import (
utilities,
network_handler
)
KDK_INSTALL_PATH: str = "/Library/Developer/KDKs"
KDK_INFO_PLIST: str = "KDKInfo.plist"
KDK_API_LINK: str = "https://dortania.github.io/KdkSupportPkg/manifest.json"
KDK_ASSET_LIST: list = None
class KernelDebugKitObject:
"""
Library for querying and downloading Kernel Debug Kits (KDK) for macOS
Usage:
>>> kdk_object = KernelDebugKitObject(constants, host_build, host_version)
>>> if kdk_object.success:
>>> # Query whether a KDK is already installed
>>> if kdk_object.kdk_already_installed:
>>> # Use the installed KDK
>>> kdk_path = kdk_object.kdk_installed_path
>>> else:
>>> # Get DownloadObject for the KDK
>>> # See network_handler.py's DownloadObject documentation for usage
>>> kdk_download_object = kdk_object.retrieve_download()
>>> # Once downloaded, recommend verifying KDK's checksum
>>> valid = kdk_object.validate_kdk_checksum()
"""
def __init__(self, global_constants: constants.Constants,
host_build: str, host_version: str,
ignore_installed: bool = False, passive: bool = False,
check_backups_only: bool = False
) -> None:
self.constants: constants.Constants = global_constants
self.host_build: str = host_build # ex. 20A5384c
self.host_version: str = host_version # ex. 11.0.1
self.passive: bool = passive # Don't perform actions requiring elevated privileges
self.ignore_installed: bool = ignore_installed # If True, will ignore any installed KDKs and download the latest
self.check_backups_only: bool = check_backups_only # If True, will only check for KDK backups, not KDKs already installed
self.kdk_already_installed: bool = False
self.kdk_installed_path: str = ""
self.kdk_url: str = ""
self.kdk_url_build: str = ""
self.kdk_url_version: str = ""
self.kdk_url_expected_size: int = 0
self.kdk_url_is_exactly_match: bool = False
self.kdk_closest_match_url: str = ""
self.kdk_closest_match_url_build: str = ""
self.kdk_closest_match_url_version: str = ""
self.kdk_closest_match_url_expected_size: int = 0
self.success: bool = False
self.error_msg: str = ""
self._get_latest_kdk()
def _get_remote_kdks(self) -> list:
"""
Fetches a list of available KDKs from the KdkSupportPkg API
Additionally caches the list for future use, avoiding extra API calls
Returns:
list: A list of KDKs, sorted by version and date if available. Returns None if the API is unreachable
"""
global KDK_ASSET_LIST
logging.info("Pulling KDK list from KdkSupportPkg API")
if KDK_ASSET_LIST:
return KDK_ASSET_LIST
try:
results = network_handler.NetworkUtilities().get(
KDK_API_LINK,
headers={
"User-Agent": f"OCLP/{self.constants.patcher_version}"
},
timeout=5
)
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectionError):
logging.info("Could not contact KDK API")
return None
if results.status_code != 200:
logging.info("Could not fetch KDK list")
return None
KDK_ASSET_LIST = results.json()
return KDK_ASSET_LIST
def _get_latest_kdk(self, host_build: str = None, host_version: str = None) -> None:
"""
Fetches the latest KDK for the current macOS version
Parameters:
host_build (str, optional): The build version of the current macOS version.
If empty, will use the host_build from the class. Defaults to None.
host_version (str, optional): The version of the current macOS version.
If empty, will use the host_version from the class. Defaults to None.
"""
if host_build is None and host_version is None:
host_build = self.host_build
host_version = self.host_version
parsed_version = cast(packaging.version.Version, packaging.version.parse(host_version))
if os_data.os_conversion.os_to_kernel(str(parsed_version.major)) < os_data.os_data.ventura:
self.error_msg = "KDKs are not required for macOS Monterey or older"
logging.warning(f"{self.error_msg}")
return
self.kdk_installed_path = self._local_kdk_installed()
if self.kdk_installed_path:
logging.info(f"KDK already installed ({Path(self.kdk_installed_path).name}), skipping")
self.kdk_already_installed = True
self.success = True
return
remote_kdk_version = self._get_remote_kdks()
if remote_kdk_version is None:
logging.warning("Failed to fetch KDK list, falling back to local KDK matching")
# First check if a KDK matching the current macOS version is installed
# ex. 13.0.1 vs 13.0
loose_version = f"{parsed_version.major}.{parsed_version.minor}"
logging.info(f"Checking for KDKs loosely matching {loose_version}")
self.kdk_installed_path = self._local_kdk_installed(match=loose_version, check_version=True)
if self.kdk_installed_path:
logging.info(f"Found matching KDK: {Path(self.kdk_installed_path).name}")
self.kdk_already_installed = True
self.success = True
return
older_version = f"{parsed_version.major}.{parsed_version.minor - 1 if parsed_version.minor > 0 else 0}"
logging.info(f"Checking for KDKs matching {older_version}")
self.kdk_installed_path = self._local_kdk_installed(match=older_version, check_version=True)
if self.kdk_installed_path:
logging.info(f"Found matching KDK: {Path(self.kdk_installed_path).name}")
self.kdk_already_installed = True
self.success = True
return
logging.warning(f"Couldn't find KDK matching {host_version} or {older_version}, please install one manually")
self.error_msg = f"Could not contact KdkSupportPkg API, and no KDK matching {host_version} ({host_build}) or {older_version} was installed.\nPlease ensure you have a network connection or manually install a KDK."
return
# First check exact match
for kdk in remote_kdk_version:
if (kdk["build"] != host_build):
continue
self.kdk_url = kdk["url"]
self.kdk_url_build = kdk["build"]
self.kdk_url_version = kdk["version"]
self.kdk_url_expected_size = kdk["fileSize"]
self.kdk_url_is_exactly_match = True
break
# If no exact match, check for closest match
if self.kdk_url == "":
for kdk in remote_kdk_version:
kdk_version = cast(packaging.version.Version, packaging.version.parse(kdk["version"]))
if kdk_version > parsed_version:
continue
if kdk_version.major != parsed_version.major:
continue
if kdk_version.minor not in range(parsed_version.minor - 1, parsed_version.minor + 1):
continue
# The KDK list is already sorted by version then date, so the first match is the closest
self.kdk_closest_match_url = kdk["url"]
self.kdk_closest_match_url_build = kdk["build"]
self.kdk_closest_match_url_version = kdk["version"]
self.kdk_closest_match_url_expected_size = kdk["fileSize"]
self.kdk_url_is_exactly_match = False
break
if self.kdk_url == "":
if self.kdk_closest_match_url == "":
logging.warning(f"No KDKs found for {host_build} ({host_version})")
self.error_msg = f"No KDKs found for {host_build} ({host_version})"
return
logging.info(f"No direct match found for {host_build}, falling back to closest match")
logging.info(f"Closest Match: {self.kdk_closest_match_url_build} ({self.kdk_closest_match_url_version})")
self.kdk_url = self.kdk_closest_match_url
self.kdk_url_build = self.kdk_closest_match_url_build
self.kdk_url_version = self.kdk_closest_match_url_version
self.kdk_url_expected_size = self.kdk_closest_match_url_expected_size
else:
logging.info(f"Direct match found for {host_build} ({host_version})")
# Check if this KDK is already installed
self.kdk_installed_path = self._local_kdk_installed(match=self.kdk_url_build)
if self.kdk_installed_path:
logging.info(f"KDK already installed ({Path(self.kdk_installed_path).name}), skipping")
self.kdk_already_installed = True
self.success = True
return
logging.info("Following KDK is recommended:")
logging.info(f"- KDK Build: {self.kdk_url_build}")
logging.info(f"- KDK Version: {self.kdk_url_version}")
logging.info(f"- KDK URL: {self.kdk_url}")
self.success = True
def retrieve_download(self, override_path: str = "") -> network_handler.DownloadObject:
"""
Returns a DownloadObject for the KDK
Parameters:
override_path (str): Override the default download path
Returns:
DownloadObject: DownloadObject for the KDK, None if no download required
"""
self.success = False
self.error_msg = ""
if self.kdk_already_installed:
logging.info("No download required, KDK already installed")
self.success = True
return None
if self.kdk_url == "":
self.error_msg = "Could not retrieve KDK catalog, no KDK to download"
logging.error(self.error_msg)
return None
logging.info(f"Returning DownloadObject for KDK: {Path(self.kdk_url).name}")
self.success = True
kdk_download_path = self.constants.kdk_download_path if override_path == "" else Path(override_path)
kdk_plist_path = Path(f"{kdk_download_path.parent}/{KDK_INFO_PLIST}") if override_path == "" else Path(f"{Path(override_path).parent}/{KDK_INFO_PLIST}")
self._generate_kdk_info_plist(kdk_plist_path)
return network_handler.DownloadObject(self.kdk_url, kdk_download_path)
def _generate_kdk_info_plist(self, plist_path: str) -> None:
"""
Generates a KDK Info.plist
"""
plist_path = Path(plist_path)
if plist_path.exists():
plist_path.unlink()
kdk_dict = {
"build": self.kdk_url_build,
"version": self.kdk_url_version,
}
try:
plist_path.touch()
plistlib.dump(kdk_dict, plist_path.open("wb"), sort_keys=False)
except Exception as e:
logging.error(f"Failed to generate KDK Info.plist: {e}")
def _local_kdk_valid(self, kdk_path: Path) -> bool:
"""
Validates provided KDK, ensure no corruption
The reason for this is due to macOS deleting files from the KDK during OS updates,
similar to how Install macOS.app is deleted during OS updates
Uses Apple's pkg receipt system to verify the original contents of the KDK
Parameters:
kdk_path (Path): Path to KDK
Returns:
bool: True if valid, False if invalid
"""
if not Path(f"{kdk_path}/System/Library/CoreServices/SystemVersion.plist").exists():
logging.info(f"Corrupted KDK found ({kdk_path.name}), removing due to missing SystemVersion.plist")
self._remove_kdk(kdk_path)
return False
# Get build from KDK
kdk_plist_data = plistlib.load(Path(f"{kdk_path}/System/Library/CoreServices/SystemVersion.plist").open("rb"))
if "ProductBuildVersion" not in kdk_plist_data:
logging.info(f"Corrupted KDK found ({kdk_path.name}), removing due to missing ProductBuildVersion")
self._remove_kdk(kdk_path)
return False
kdk_build = kdk_plist_data["ProductBuildVersion"]
# Check pkg receipts for this build, will give a canonical list if all files that should be present
result = subprocess.run(["/usr/sbin/pkgutil", "--files", f"com.apple.pkg.KDK.{kdk_build}"], capture_output=True)
if result.returncode != 0:
# If pkg receipt is missing, we'll fallback to legacy validation
logging.info(f"pkg receipt missing for {kdk_path.name}, falling back to legacy validation")
return self._local_kdk_valid_legacy(kdk_path)
# Go through each line of the pkg receipt and ensure it exists
for line in result.stdout.decode("utf-8").splitlines():
if not line.startswith("System/Library/Extensions"):
continue
if not Path(f"{kdk_path}/{line}").exists():
logging.info(f"Corrupted KDK found ({kdk_path.name}), removing due to missing file: {line}")
self._remove_kdk(kdk_path)
return False
return True
def _local_kdk_valid_legacy(self, kdk_path: Path) -> bool:
"""
Legacy variant of validating provided KDK
Uses best guess of files that should be present
This should ideally never be invoked, but used as a fallback
Parameters:
kdk_path (Path): Path to KDK
Returns:
bool: True if valid, False if invalid
"""
KEXT_CATALOG = [
"System.kext/PlugIns/Libkern.kext/Libkern",
"apfs.kext/Contents/MacOS/apfs",
"IOUSBHostFamily.kext/Contents/MacOS/IOUSBHostFamily",
"AMDRadeonX6000.kext/Contents/MacOS/AMDRadeonX6000",
]
for kext in KEXT_CATALOG:
if not Path(f"{kdk_path}/System/Library/Extensions/{kext}").exists():
logging.info(f"Corrupted KDK found, removing due to missing: {kdk_path}/System/Library/Extensions/{kext}")
self._remove_kdk(kdk_path)
return False
return True
def _local_kdk_installed(self, match: str = None, check_version: bool = False) -> str:
"""
Checks if KDK matching build is installed
If so, validates it has not been corrupted
Parameters:
match (str): string to match against (ex. build or version)
check_version (bool): If True, match against version, otherwise match against build
Returns:
str: Path to KDK if valid, None if not
"""
if self.ignore_installed is True:
return None
if match is None:
if check_version:
match = self.host_version
else:
match = self.host_build
if not Path(KDK_INSTALL_PATH).exists():
return None
# Installed KDKs only
if self.check_backups_only is False:
for kdk_folder in Path(KDK_INSTALL_PATH).iterdir():
if not kdk_folder.is_dir():
continue
if check_version:
if match not in kdk_folder.name:
continue
else:
if not kdk_folder.name.endswith(f"{match}.kdk"):
continue
if self._local_kdk_valid(kdk_folder):
return kdk_folder
# If we can't find a KDK, next check if there's a backup present
# Check for KDK packages in the same directory as the KDK
for kdk_pkg in Path(KDK_INSTALL_PATH).iterdir():
if kdk_pkg.is_dir():
continue
if not kdk_pkg.name.endswith(".pkg"):
continue
if check_version:
if match not in kdk_pkg.name:
continue
else:
if not kdk_pkg.name.endswith(f"{match}.pkg"):
continue
logging.info(f"Found KDK backup: {kdk_pkg.name}")
if self.passive is False:
logging.info("Attempting KDK restoration")
if KernelDebugKitUtilities().install_kdk_pkg(kdk_pkg):
logging.info("Successfully restored KDK")
return self._local_kdk_installed(match=match, check_version=check_version)
else:
# When in passive mode, we're just checking if a KDK could be restored
logging.info("KDK restoration skipped, running in passive mode")
return kdk_pkg
return None
def _remove_kdk(self, kdk_path: str) -> None:
"""
Removes provided KDK
Parameters:
kdk_path (str): Path to KDK
"""
if self.passive is True:
return
if os.getuid() != 0:
logging.warning("Cannot remove KDK, not running as root")
return
if not Path(kdk_path).exists():
logging.warning(f"KDK does not exist: {kdk_path}")
return
rm_args = ["/bin/rm", "-rf" if Path(kdk_path).is_dir() else "-f", kdk_path]
result = utilities.elevated(rm_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.warning(f"Failed to remove KDK: {kdk_path}")
logging.warning(f"{result.stdout.decode('utf-8')}")
return
logging.info(f"Successfully removed KDK: {kdk_path}")
def _remove_unused_kdks(self, exclude_builds: list = None) -> None:
"""
Removes KDKs that are not in use
Parameters:
exclude_builds (list, optional): Builds to exclude from removal.
If None, defaults to host and closest match builds.
"""
if self.passive is True:
return
if exclude_builds is None:
exclude_builds = [
self.kdk_url_build,
self.kdk_closest_match_url_build,
]
if self.constants.should_nuke_kdks is False:
return
if not Path(KDK_INSTALL_PATH).exists():
return
logging.info("Cleaning unused KDKs")
for kdk_folder in Path(KDK_INSTALL_PATH).iterdir():
if kdk_folder.name.endswith(".kdk") or kdk_folder.name.endswith(".pkg"):
should_remove = True
for build in exclude_builds:
if kdk_folder.name.endswith(f"_{build}.kdk") or kdk_folder.name.endswith(f"_{build}.pkg"):
should_remove = False
break
if should_remove is False:
continue
self._remove_kdk(kdk_folder)
def validate_kdk_checksum(self, kdk_dmg_path: str = None) -> bool:
"""
Validates KDK DMG checksum
Parameters:
kdk_dmg_path (str, optional): Path to KDK DMG. Defaults to None.
Returns:
bool: True if valid, False if invalid
"""
self.success = False
self.error_msg = ""
if kdk_dmg_path is None:
kdk_dmg_path = self.constants.kdk_download_path
if not Path(kdk_dmg_path).exists():
logging.error(f"KDK DMG does not exist: {kdk_dmg_path}")
return False
# TODO: should we use the checksum from the API?
result = subprocess.run(["/usr/bin/hdiutil", "verify", self.constants.kdk_download_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("Error: Kernel Debug Kit checksum verification failed!")
logging.info(f"Output: {result.stderr.decode('utf-8')}")
msg = "Kernel Debug Kit checksum verification failed, please try again.\n\nIf this continues to fail, ensure you're downloading on a stable network connection (ie. Ethernet)"
logging.info(f"{msg}")
self.error_msg = msg
return False
self._remove_unused_kdks()
self.success = True
logging.info("Kernel Debug Kit checksum verified")
return True
class KernelDebugKitUtilities:
"""
Utilities for KDK handling
"""
def __init__(self) -> None:
pass
def install_kdk_pkg(self, kdk_path: Path) -> bool:
"""
Installs provided KDK packages
Parameters:
kdk_path (Path): Path to KDK package
Returns:
bool: True if successful, False if not
"""
if os.getuid() != 0:
logging.warning("Cannot install KDK, not running as root")
return False
logging.info(f"Installing KDK package: {kdk_path.name}")
logging.info(f"- This may take a while...")
# TODO: Check whether enough disk space is available
result = utilities.elevated(["installer", "-pkg", kdk_path, "-target", "/"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to install KDK:")
logging.info(result.stdout.decode('utf-8'))
if result.stderr:
logging.info(result.stderr.decode('utf-8'))
return False
return True
def install_kdk_dmg(self, kdk_path: Path, only_install_backup: bool = False) -> bool:
"""
Installs provided KDK disk image
Parameters:
kdk_path (Path): Path to KDK disk image
Returns:
bool: True if successful, False if not
"""
if os.getuid() != 0:
logging.warning("Cannot install KDK, not running as root")
return False
logging.info(f"Extracting downloaded KDK disk image")
with tempfile.TemporaryDirectory() as mount_point:
result = subprocess.run(["/usr/bin/hdiutil", "attach", kdk_path, "-mountpoint", mount_point, "-nobrowse"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to mount KDK:")
logging.info(result.stdout.decode('utf-8'))
return False
kdk_pkg_path = Path(f"{mount_point}/KernelDebugKit.pkg")
if not kdk_pkg_path.exists():
logging.warning("Failed to find KDK package in DMG, likely corrupted!!!")
self._unmount_disk_image(mount_point)
return False
if only_install_backup is False:
if self.install_kdk_pkg(kdk_pkg_path) is False:
self._unmount_disk_image(mount_point)
return False
self._create_backup(kdk_pkg_path, Path(f"{kdk_path.parent}/{KDK_INFO_PLIST}"))
self._unmount_disk_image(mount_point)
logging.info("Successfully installed KDK")
return True
def _unmount_disk_image(self, mount_point) -> None:
"""
Unmounts provided disk image silently
Parameters:
mount_point (Path): Path to mount point
"""
subprocess.run(["/usr/bin/hdiutil", "detach", mount_point], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _create_backup(self, kdk_path: Path, kdk_info_plist: Path) -> None:
"""
Creates a backup of the KDK
Parameters:
kdk_path (Path): Path to KDK
kdk_info_plist (Path): Path to KDK Info.plist
"""
if not kdk_path.exists():
logging.warning("KDK does not exist, cannot create backup")
return
if not kdk_info_plist.exists():
logging.warning("KDK Info.plist does not exist, cannot create backup")
return
kdk_info_dict = plistlib.load(kdk_info_plist.open("rb"))
if 'version' not in kdk_info_dict or 'build' not in kdk_info_dict:
logging.warning("Malformed KDK Info.plist provided, cannot create backup")
return
if os.getuid() != 0:
logging.warning("Cannot create KDK backup, not running as root")
return
if not Path(KDK_INSTALL_PATH).exists():
subprocess.run(["/bin/mkdir", "-p", KDK_INSTALL_PATH], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
kdk_dst_name = f"KDK_{kdk_info_dict['version']}_{kdk_info_dict['build']}.pkg"
kdk_dst_path = Path(f"{KDK_INSTALL_PATH}/{kdk_dst_name}")
logging.info(f"Creating backup: {kdk_dst_name}")
if kdk_dst_path.exists():
logging.info("Backup already exists, skipping")
return
result = utilities.elevated(["/bin/cp", "-R", kdk_path, kdk_dst_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to create KDK backup:")
logging.info(result.stdout.decode('utf-8'))

View File

@@ -0,0 +1,305 @@
"""
logging_handler.py: Initialize logging framework for program
"""
import os
import sys
import pprint
import logging
import threading
import traceback
import subprocess
import applescript
from pathlib import Path
from datetime import datetime
from .. import constants
from . import (
analytics_handler,
global_settings
)
class InitializeLoggingSupport:
"""
Initialize logging framework for program
Primary responsibilities:
- Determine where to store log file
- Clean log file if it's near the max file size
- Initialize logging framework configuration
- Implement custom traceback handler
- Implement error handling for file write
Usage:
>>> from resources.logging_handler import InitializeLoggingSupport
>>> InitializeLoggingSupport()
FOR DEVELOPERS:
- Do not invoke logging until after '_attempt_initialize_logging_configuration()' has been invoked
"""
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
log_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")
self.log_filename: str = f"OpenCore-Patcher_{self.constants.patcher_version}_{log_time}.log"
self.log_filepath: Path = None
self.original_excepthook: sys = sys.excepthook
self.original_thread_excepthook: threading = threading.excepthook
self.max_file_size: int = 1024 * 1024 # 1 MB
self.file_size_redline: int = 1024 * 1024 - 1024 * 100 # 900 KB, when to start cleaning log file
self._initialize_logging_path()
self._attempt_initialize_logging_configuration()
self._start_logging()
self._implement_custom_traceback_handler()
self._fix_file_permission()
self._clean_prior_version_logs()
def _initialize_logging_path(self) -> None:
"""
Initialize logging framework storage path
"""
base_path = Path("~/Library/Logs").expanduser()
if not base_path.exists() or str(base_path).startswith("/var/root/"):
# Likely in an installer environment, store in /Users/Shared
base_path = Path("/Users/Shared")
else:
# create Dortania folder if it doesn't exist
base_path = base_path / "Dortania"
if not base_path.exists():
try:
base_path.mkdir()
except Exception as e:
print(f"Failed to create Dortania folder: {e}")
base_path = Path("/Users/Shared")
self.log_filepath = Path(f"{base_path}/{self.log_filename}").expanduser()
self.constants.log_filepath = self.log_filepath
def _clean_prior_version_logs(self) -> None:
"""
Clean logs from old Patcher versions
Keep 10 latest logs
"""
paths = [
self.log_filepath.parent, # ~/Library/Logs/Dortania
self.log_filepath.parent.parent, # ~/Library/Logs (old location)
]
logs = []
for path in paths:
for file in path.glob("OpenCore-Patcher*"):
if not file.is_file():
continue
if not file.name.endswith(".log"):
continue
if file.name == self.log_filename:
continue
logs.append(file)
logs.sort(key=lambda x: x.stat().st_mtime, reverse=True)
for log in logs[9:]:
try:
log.unlink()
except Exception as e:
logging.error(f"Failed to delete log file: {e}")
def _fix_file_permission(self) -> None:
"""
Fixes file permission for log file
If OCLP was invoked as root, file permission will only allow root to write to log file
This in turn breaks normal OCLP execution to write to log file
"""
if os.geteuid() != 0:
return
paths = [
self.log_filepath, # ~/Library/Logs/Dortania/OpenCore-Patcher_{version}_{date}.log
self.log_filepath.parent, # ~/Library/Logs/Dortania
]
for path in paths:
result = subprocess.run(["/bin/chmod", "777", path], capture_output=True)
if result.returncode != 0:
logging.error(f"Failed to fix log file permissions")
if result.stdout:
logging.error("STDOUT:")
logging.error(result.stdout.decode("utf-8"))
if result.stderr:
logging.error("STDERR:")
logging.error(result.stderr.decode("utf-8"))
def _initialize_logging_configuration(self, log_to_file: bool = True) -> None:
"""
Initialize logging framework configuration
StreamHandler's format is used to mimic the default behavior of print()
While FileHandler's format is for more in-depth logging
Parameters:
log_to_file (bool): Whether to log to file or not
"""
logging.basicConfig(
level=logging.NOTSET,
format="[%(asctime)s] [%(filename)-32s] [%(lineno)-4d]: %(message)s",
handlers=[
logging.StreamHandler(stream = sys.stdout),
logging.FileHandler(self.log_filepath) if log_to_file is True else logging.NullHandler()
],
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().handlers[0].setFormatter(logging.Formatter("%(message)s"))
logging.getLogger().handlers[1].maxBytes = self.max_file_size
def _attempt_initialize_logging_configuration(self) -> None:
"""
Attempt to initialize logging framework configuration
If we fail to initialize the logging framework, we will disable logging to file
"""
try:
self._initialize_logging_configuration()
except Exception as e:
print(f"Failed to initialize logging framework: {e}")
print("Retrying without logging to file...")
self._initialize_logging_configuration(log_to_file=False)
def _start_logging(self):
"""
Start logging, used as easily identifiable start point in logs
"""
str_msg = f"# OpenCore Legacy Patcher ({self.constants.patcher_version}) #"
str_len = len(str_msg)
logging.info('#' * str_len)
logging.info(str_msg)
logging.info('#' * str_len)
logging.info("Log file set:")
logging.info(f" {self.log_filepath}")
# Display relative path to avoid disclosing user's username
try:
path = self.log_filepath.relative_to(Path.home())
logging.info(f"~/{path}")
except ValueError:
logging.info(self.log_filepath)
def _implement_custom_traceback_handler(self) -> None:
"""
Reroute traceback to logging module
"""
def custom_excepthook(type, value, tb) -> None:
"""
Reroute traceback in main thread to logging module
"""
logging.error("Uncaught exception in main thread", exc_info=(type, value, tb))
self._display_debug_properties()
if "wx/" in "".join(traceback.format_exception(type, value, tb)):
# Likely a GUI error, don't display error dialog
return
if self.constants.cli_mode is True:
threading.Thread(target=analytics_handler.Analytics(self.constants).send_crash_report, args=(self.log_filepath,)).start()
return
error_msg = f"OpenCore Legacy Patcher encountered the following internal error:\n\n"
error_msg += f"{type.__name__}: {value}"
if tb:
error_msg += f"\n\n{traceback.extract_tb(tb)[-1]}"
cant_log: bool = global_settings.GlobalEnviromentSettings().read_property("DisableCrashAndAnalyticsReporting")
if not isinstance(cant_log, bool):
cant_log = False
if self.constants.commit_info[0].startswith("refs/tags"):
cant_log = True
if cant_log is True:
error_msg += "\n\nReveal log file?"
else:
error_msg += "\n\nSend crash report to Dortania?"
# Ask user if they want to send crash report
try:
result = applescript.AppleScript(f'display dialog "{error_msg}" with title "OpenCore Legacy Patcher ({self.constants.patcher_version})" buttons {{"Yes", "No"}} default button "Yes" with icon caution').run()
except Exception as e:
logging.error(f"Failed to display crash report dialog: {e}")
return
if result[applescript.AEType(b'bhit')] != "Yes":
return
if cant_log is True:
subprocess.run(["/usr/bin/open", "--reveal", self.log_filepath])
return
threading.Thread(target=analytics_handler.Analytics(self.constants).send_crash_report, args=(self.log_filepath,)).start()
def custom_thread_excepthook(args) -> None:
"""
Reroute traceback in spawned thread to logging module
"""
logging.error("Uncaught exception in spawned thread", exc_info=(args))
sys.excepthook = custom_excepthook
threading.excepthook = custom_thread_excepthook
def _restore_original_excepthook(self) -> None:
"""
Restore original traceback handlers
"""
sys.excepthook = self.original_excepthook
threading.excepthook = self.original_thread_excepthook
def _display_debug_properties(self) -> None:
"""
Display debug properties, primarily after main thread crash
"""
logging.info("Host Properties:")
logging.info(f" XNU Version: {self.constants.detected_os}.{self.constants.detected_os_minor}")
logging.info(f" XNU Build: {self.constants.detected_os_build}")
logging.info(f" macOS Version: {self.constants.detected_os_version}")
logging.info("Debug Properties:")
logging.info(f" Effective User ID: {os.geteuid()}")
logging.info(f" Effective Group ID: {os.getegid()}")
logging.info(f" Real User ID: {os.getuid()}")
logging.info(f" Real Group ID: {os.getgid()}")
logging.info(" Arguments passed to Patcher:")
for arg in sys.argv:
logging.info(f" {arg}")
logging.info(f"Host Properties:\n{pprint.pformat(self.constants.computer.__dict__, indent=4)}")

View File

@@ -0,0 +1,656 @@
"""
macos_installer_handler.py: Handler for macOS installers, both local and remote
"""
import enum
import logging
import plistlib
import tempfile
import subprocess
import applescript
from pathlib import Path
from ..datasets import os_data
from . import (
network_handler,
utilities
)
APPLICATION_SEARCH_PATH: str = "/Applications"
SFR_SOFTWARE_UPDATE_PATH: str = "SFR/com_apple_MobileAsset_SFRSoftwareUpdate/com_apple_MobileAsset_SFRSoftwareUpdate.xml"
CATALOG_URL_BASE: str = "https://swscan.apple.com/content/catalogs/others/index"
CATALOG_URL_EXTENSION: str = ".merged-1.sucatalog"
CATALOG_URL_VARIANTS: list = [
"14",
"13",
"12",
"10.16",
"10.15",
"10.14",
"10.13",
"10.12",
"10.11",
"10.10",
"10.9",
"mountainlion",
"lion",
"snowleopard",
"leopard",
]
tmp_dir = tempfile.TemporaryDirectory()
class InstallerCreation():
def __init__(self) -> None:
pass
def install_macOS_installer(self, download_path: str) -> bool:
"""
Installs InstallAssistant.pkg
Parameters:
download_path (str): Path to InstallAssistant.pkg
Returns:
bool: True if successful, False otherwise
"""
logging.info("Extracting macOS installer from InstallAssistant.pkg")
try:
applescript.AppleScript(
f'''do shell script "installer -pkg {Path(download_path)}/InstallAssistant.pkg -target /"'''
' with prompt "OpenCore Legacy Patcher needs administrator privileges to extract the installer."'
" with administrator privileges"
" without altering line endings",
).run()
except Exception as e:
logging.info("Failed to install InstallAssistant")
logging.info(f" Error Code: {e}")
return False
logging.info("InstallAssistant installed")
return True
def generate_installer_creation_script(self, tmp_location: str, installer_path: str, disk: str) -> bool:
"""
Creates installer.sh to be piped to OCLP-Helper and run as admin
Script includes:
- Format provided disk as HFS+ GPT
- Run createinstallmedia on provided disk
Implementing this into a single installer.sh script allows us to only call
OCLP-Helper once to avoid nagging the user about permissions
Parameters:
tmp_location (str): Path to temporary directory
installer_path (str): Path to InstallAssistant.pkg
disk (str): Disk to install to
Returns:
bool: True if successful, False otherwise
"""
additional_args = ""
script_location = Path(tmp_location) / Path("Installer.sh")
# Due to a bug in createinstallmedia, running from '/Applications' may sometimes error:
# 'Failed to extract AssetData/boot/Firmware/Manifests/InstallerBoot/*'
# This affects native Macs as well even when manually invoking createinstallmedia
# To resolve, we'll copy into our temp directory and run from there
# Create a new tmp directory
# Our current one is a disk image, thus CoW will not work
global tmp_dir
ia_tmp = tmp_dir.name
logging.info(f"Creating temporary directory at {ia_tmp}")
# Delete all files in tmp_dir
for file in Path(ia_tmp).glob("*"):
subprocess.run(["/bin/rm", "-rf", str(file)])
# Copy installer to tmp (use CoW to avoid extra disk writes)
args = ["/bin/cp", "-cR", installer_path, ia_tmp]
if utilities.check_filesystem_type() != "apfs":
# HFS+ disks do not support CoW
args[1] = "-R"
# Ensure we have enough space for the duplication
space_available = utilities.get_free_space()
space_needed = Path(ia_tmp).stat().st_size
if space_available < space_needed:
logging.info("Not enough free space to create installer.sh")
logging.info(f"{utilities.human_fmt(space_available)} available, {utilities.human_fmt(space_needed)} required")
return False
subprocess.run(args)
# Adjust installer_path to point to the copied installer
installer_path = Path(ia_tmp) / Path(Path(installer_path).name)
if not Path(installer_path).exists():
logging.info(f"Failed to copy installer to {ia_tmp}")
return False
createinstallmedia_path = str(Path(installer_path) / Path("Contents/Resources/createinstallmedia"))
plist_path = str(Path(installer_path) / Path("Contents/Info.plist"))
if Path(plist_path).exists():
plist = plistlib.load(Path(plist_path).open("rb"))
if "DTPlatformVersion" in plist:
platform_version = plist["DTPlatformVersion"]
platform_version = platform_version.split(".")[0]
if platform_version[0] == "10":
if int(platform_version[1]) < 13:
additional_args = f" --applicationpath '{installer_path}'"
if script_location.exists():
script_location.unlink()
script_location.touch()
with script_location.open("w") as script:
script.write(f'''#!/bin/bash
erase_disk='diskutil eraseDisk HFS+ OCLP-Installer {disk}'
if $erase_disk; then
"{createinstallmedia_path}" --volume /Volumes/OCLP-Installer --nointeraction{additional_args}
fi
''')
if Path(script_location).exists():
return True
return False
def list_disk_to_format(self) -> dict:
"""
List applicable disks for macOS installer creation
Only lists disks that are:
- 14GB or larger
- External
Current limitations:
- Does not support PCIe based SD cards readers
Returns:
dict: Dictionary of disks
"""
all_disks: dict = {}
list_disks: dict = {}
# TODO: AllDisksAndPartitions is not supported in Snow Leopard and older
try:
# High Sierra and newer
disks = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "list", "-plist", "physical"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
except ValueError:
# Sierra and older
disks = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "list", "-plist"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
for disk in disks["AllDisksAndPartitions"]:
disk_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", disk["DeviceIdentifier"]], stdout=subprocess.PIPE).stdout.decode().strip().encode())
try:
all_disks[disk["DeviceIdentifier"]] = {"identifier": disk_info["DeviceNode"], "name": disk_info["MediaName"], "size": disk_info["TotalSize"], "removable": disk_info["Internal"], "partitions": {}}
except KeyError:
# Avoid crashing with CDs installed
continue
for disk in all_disks:
# Strip disks that are under 14GB (15,032,385,536 bytes)
# createinstallmedia isn't great at detecting if a disk has enough space
if not any(all_disks[disk]['size'] > 15032385536 for partition in all_disks[disk]):
continue
# Strip internal disks as well (avoid user formatting their SSD/HDD)
# Ensure user doesn't format their boot drive
if not any(all_disks[disk]['removable'] is False for partition in all_disks[disk]):
continue
list_disks.update({
disk: {
"identifier": all_disks[disk]["identifier"],
"name": all_disks[disk]["name"],
"size": all_disks[disk]["size"],
}
})
return list_disks
class SeedType(enum.IntEnum):
"""
Enum for catalog types
Variants:
DeveloperSeed: Developer Beta (Part of the Apple Developer Program)
PublicSeed: Public Beta
CustomerSeed: AppleSeed Program (Generally mirrors DeveloperSeed)
PublicRelease: Public Release
"""
DeveloperSeed: int = 0
PublicSeed: int = 1
CustomerSeed: int = 2
PublicRelease: int = 3
class RemoteInstallerCatalog:
"""
Parses Apple's Software Update catalog and finds all macOS installers.
"""
def __init__(self, seed_override: SeedType = SeedType.PublicRelease, os_override: int = os_data.os_data.sonoma) -> None:
self.catalog_url: str = self._construct_catalog_url(seed_override, os_override)
self.available_apps: dict = self._parse_catalog()
self.available_apps_latest: dict = self._list_newest_installers_only()
def _construct_catalog_url(self, seed_type: SeedType, os_kernel: int) -> str:
"""
Constructs the catalog URL based on the seed type
Parameters:
seed_type (SeedType): The seed type to use
Returns:
str: The catalog URL
"""
url: str = CATALOG_URL_BASE
os_version: str = os_data.os_conversion.kernel_to_os(os_kernel)
os_version = "10.16" if os_version == "11" else os_version
if os_version not in CATALOG_URL_VARIANTS:
logging.error(f"OS version {os_version} is not supported, defaulting to latest")
os_version = CATALOG_URL_VARIANTS[0]
url += f"-{os_version}"
if seed_type == SeedType.DeveloperSeed:
url += f"seed"
elif seed_type == SeedType.PublicSeed:
url += f"beta"
elif seed_type == SeedType.CustomerSeed:
url += f"customerseed"
did_find_variant: bool = False
for variant in CATALOG_URL_VARIANTS:
if variant in url:
did_find_variant = True
if did_find_variant:
url += f"-{variant}"
url += f"{CATALOG_URL_EXTENSION}"
return url
def _fetch_catalog(self) -> dict:
"""
Fetches the catalog from Apple's servers
Returns:
dict: The catalog as a dictionary
"""
catalog: dict = {}
if network_handler.NetworkUtilities(self.catalog_url).verify_network_connection() is False:
return catalog
try:
catalog = plistlib.loads(network_handler.NetworkUtilities().get(self.catalog_url).content)
except plistlib.InvalidFileException:
return {}
return catalog
def _parse_catalog(self) -> dict:
"""
Parses the catalog and returns a dictionary of available installers
Returns:
dict: Dictionary of available installers
"""
available_apps: dict = {}
catalog: dict = self._fetch_catalog()
if not catalog:
return available_apps
if "Products" not in catalog:
return available_apps
for product in catalog["Products"]:
if "ExtendedMetaInfo" not in catalog["Products"][product]:
continue
if "Packages" not in catalog["Products"][product]:
continue
if "InstallAssistantPackageIdentifiers" not in catalog["Products"][product]["ExtendedMetaInfo"]:
continue
if "SharedSupport" not in catalog["Products"][product]["ExtendedMetaInfo"]["InstallAssistantPackageIdentifiers"]:
continue
if "BuildManifest" not in catalog["Products"][product]["ExtendedMetaInfo"]["InstallAssistantPackageIdentifiers"]:
continue
for bm_package in catalog["Products"][product]["Packages"]:
if "Info.plist" not in bm_package["URL"]:
continue
if "InstallInfo.plist" in bm_package["URL"]:
continue
try:
build_plist = plistlib.loads(network_handler.NetworkUtilities().get(bm_package["URL"]).content)
except plistlib.InvalidFileException:
continue
if "MobileAssetProperties" not in build_plist:
continue
if "SupportedDeviceModels" not in build_plist["MobileAssetProperties"]:
continue
if "OSVersion" not in build_plist["MobileAssetProperties"]:
continue
if "Build" not in build_plist["MobileAssetProperties"]:
continue
# Ensure Apple Silicon specific Installers are not listed
if "VMM-x86_64" not in build_plist["MobileAssetProperties"]["SupportedDeviceModels"]:
continue
version = build_plist["MobileAssetProperties"]["OSVersion"]
build = build_plist["MobileAssetProperties"]["Build"]
try:
catalog_url = build_plist["MobileAssetProperties"]["BridgeVersionInfo"]["CatalogURL"]
if "beta" in catalog_url:
catalog_url = "PublicSeed"
elif "customerseed" in catalog_url:
catalog_url = "CustomerSeed"
elif "seed" in catalog_url:
catalog_url = "DeveloperSeed"
else:
catalog_url = "Public"
except KeyError:
# Assume Public if no catalog URL is found
catalog_url = "Public"
download_link = None
integrity = None
size = None
date = catalog["Products"][product]["PostDate"]
for ia_package in catalog["Products"][product]["Packages"]:
if "InstallAssistant.pkg" not in ia_package["URL"]:
continue
if "URL" not in ia_package:
continue
if "IntegrityDataURL" not in ia_package:
continue
download_link = ia_package["URL"]
integrity = ia_package["IntegrityDataURL"]
size = ia_package["Size"] if ia_package["Size"] else 0
if any([version, build, download_link, size, integrity]) is None:
continue
available_apps.update({
product: {
"Version": version,
"Build": build,
"Link": download_link,
"Size": size,
"integrity": integrity,
"Source": "Apple Inc.",
"Variant": catalog_url,
"OS": os_data.os_conversion.os_to_kernel(version),
"Models": build_plist["MobileAssetProperties"]["SupportedDeviceModels"],
"Date": date
}
})
available_apps = {k: v for k, v in sorted(available_apps.items(), key=lambda x: x[1]['Version'])}
return available_apps
def _list_newest_installers_only(self) -> dict:
"""
Returns a dictionary of the newest macOS installers only.
Primarily used to avoid overwhelming the user with a list of
installers that are not the newest version.
Returns:
dict: A dictionary of the newest macOS installers only.
"""
if self.available_apps is None:
return {}
newest_apps: dict = self.available_apps.copy()
supported_versions = ["10.13", "10.14", "10.15", "11", "12", "13", "14"]
for version in supported_versions:
remote_version_minor = 0
remote_version_security = 0
os_builds = []
# First determine the largest version
for ia in newest_apps:
if newest_apps[ia]["Version"].startswith(version):
if newest_apps[ia]["Variant"] not in ["CustomerSeed", "DeveloperSeed", "PublicSeed"]:
remote_version = newest_apps[ia]["Version"].split(".")
if remote_version[0] == "10":
remote_version.pop(0)
remote_version.pop(0)
else:
remote_version.pop(0)
if int(remote_version[0]) > remote_version_minor:
remote_version_minor = int(remote_version[0])
remote_version_security = 0 # Reset as new minor version found
if len(remote_version) > 1:
if int(remote_version[1]) > remote_version_security:
remote_version_security = int(remote_version[1])
# Now remove all versions that are not the largest
for ia in list(newest_apps):
# Don't use Beta builds to determine latest version
if newest_apps[ia]["Variant"] in ["CustomerSeed", "DeveloperSeed", "PublicSeed"]:
continue
if newest_apps[ia]["Version"].startswith(version):
remote_version = newest_apps[ia]["Version"].split(".")
if remote_version[0] == "10":
remote_version.pop(0)
remote_version.pop(0)
else:
remote_version.pop(0)
if int(remote_version[0]) < remote_version_minor:
newest_apps.pop(ia)
continue
if int(remote_version[0]) == remote_version_minor:
if len(remote_version) > 1:
if int(remote_version[1]) < remote_version_security:
newest_apps.pop(ia)
continue
else:
if remote_version_security > 0:
newest_apps.pop(ia)
continue
# Remove duplicate builds
# ex. macOS 12.5.1 has 2 builds in the Software Update Catalog
# ref: https://twitter.com/classicii_mrmac/status/1560357471654379522
if newest_apps[ia]["Build"] in os_builds:
newest_apps.pop(ia)
continue
os_builds.append(newest_apps[ia]["Build"])
# Remove Betas if there's a non-beta version available
for ia in list(newest_apps):
if newest_apps[ia]["Variant"] in ["CustomerSeed", "DeveloperSeed", "PublicSeed"]:
for ia2 in newest_apps:
if newest_apps[ia2]["Version"].split(".")[0] == newest_apps[ia]["Version"].split(".")[0] and newest_apps[ia2]["Variant"] not in ["CustomerSeed", "DeveloperSeed", "PublicSeed"]:
newest_apps.pop(ia)
break
return newest_apps
class LocalInstallerCatalog:
"""
Finds all macOS installers on the local machine.
"""
def __init__(self) -> None:
self.available_apps: dict = self._list_local_macOS_installers()
def _list_local_macOS_installers(self) -> dict:
"""
Searches for macOS installers in /Applications
Returns:
dict: A dictionary of macOS installers found on the local machine.
Example:
"Install macOS Big Sur Beta.app": {
"Short Name": "Big Sur Beta",
"Version": "11.0",
"Build": "20A5343i",
"Path": "/Applications/Install macOS Big Sur Beta.app",
},
etc...
"""
application_list: dict = {}
for application in Path(APPLICATION_SEARCH_PATH).iterdir():
# Certain Microsoft Applications have strange permissions disabling us from reading them
try:
if not (Path(APPLICATION_SEARCH_PATH) / Path(application) / Path("Contents/Resources/createinstallmedia")).exists():
continue
if not (Path(APPLICATION_SEARCH_PATH) / Path(application) / Path("Contents/Info.plist")).exists():
continue
except PermissionError:
continue
try:
application_info_plist = plistlib.load((Path(APPLICATION_SEARCH_PATH) / Path(application) / Path("Contents/Info.plist")).open("rb"))
except (PermissionError, TypeError, plistlib.InvalidFileException):
continue
if "DTPlatformVersion" not in application_info_plist:
continue
if "CFBundleDisplayName" not in application_info_plist:
continue
app_version: str = application_info_plist["DTPlatformVersion"]
clean_name: str = application_info_plist["CFBundleDisplayName"]
app_sdk: str = application_info_plist["DTSDKBuild"] if "DTSDKBuild" in application_info_plist else "Unknown"
min_required: str = application_info_plist["LSMinimumSystemVersion"] if "LSMinimumSystemVersion" in application_info_plist else "Unknown"
kernel: int = 0
try:
kernel = int(app_sdk[:2])
except ValueError:
pass
min_required = os_data.os_conversion.os_to_kernel(min_required) if min_required != "Unknown" else 0
if min_required == os_data.os_data.sierra and kernel == os_data.os_data.ventura:
# Ventura's installer requires El Capitan minimum
# Ref: https://github.com/dortania/OpenCore-Legacy-Patcher/discussions/1038
min_required = os_data.os_data.el_capitan
# app_version can sometimes report GM instead of the actual version
# This is a workaround to get the actual version
if app_version.startswith("GM"):
if kernel == 0:
app_version = "Unknown"
else:
app_version = os_data.os_conversion.kernel_to_os(kernel)
# Check if App Version is High Sierra or newer
if kernel < os_data.os_data.high_sierra:
continue
results = self._parse_sharedsupport_version(Path(APPLICATION_SEARCH_PATH) / Path(application)/ Path("Contents/SharedSupport/SharedSupport.dmg"))
if results[0] is not None:
app_sdk = results[0]
if results[1] is not None:
app_version = results[1]
application_list.update({
application: {
"Short Name": clean_name,
"Version": app_version,
"Build": app_sdk,
"Path": application,
"Minimum Host OS": min_required,
"OS": kernel
}
})
# Sort Applications by version
application_list = {k: v for k, v in sorted(application_list.items(), key=lambda item: item[1]["Version"])}
return application_list
def _parse_sharedsupport_version(self, sharedsupport_path: Path) -> tuple:
"""
Determine true version of macOS installer by parsing SharedSupport.dmg
This is required due to Info.plist reporting the application version, not the OS version
Parameters:
sharedsupport_path (Path): Path to SharedSupport.dmg
Returns:
tuple: Tuple containing the build and OS version
"""
detected_build: str = None
detected_os: str = None
if not sharedsupport_path.exists():
return (detected_build, detected_os)
if not sharedsupport_path.name.endswith(".dmg"):
return (detected_build, detected_os)
# Create temporary directory to extract SharedSupport.dmg to
with tempfile.TemporaryDirectory() as tmpdir:
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", sharedsupport_path,
"-mountpoint", tmpdir,
"-nobrowse",
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
return (detected_build, detected_os)
ss_info = Path(SFR_SOFTWARE_UPDATE_PATH)
if Path(tmpdir / ss_info).exists():
plist = plistlib.load((tmpdir / ss_info).open("rb"))
if "Assets" in plist:
if "Build" in plist["Assets"][0]:
detected_build = plist["Assets"][0]["Build"]
if "OSVersion" in plist["Assets"][0]:
detected_os = plist["Assets"][0]["OSVersion"]
# Unmount SharedSupport.dmg
subprocess.run(["/usr/bin/hdiutil", "detach", tmpdir], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return (detected_build, detected_os)

View File

@@ -0,0 +1,455 @@
"""
network_handler.py: Library dedicated to Network Handling tasks including downloading files
Primarily based around the DownloadObject class, which provides a simple
object for libraries to query download progress and status
"""
import time
import requests
import threading
import logging
import enum
import hashlib
import atexit
from typing import Union
from pathlib import Path
from . import utilities
SESSION = requests.Session()
class DownloadStatus(enum.Enum):
"""
Enum for download status
"""
INACTIVE: str = "Inactive"
DOWNLOADING: str = "Downloading"
ERROR: str = "Error"
COMPLETE: str = "Complete"
class NetworkUtilities:
"""
Utilities for network related tasks, primarily used for downloading files
"""
def __init__(self, url: str = None) -> None:
self.url: str = url
if self.url is None:
self.url = "https://github.com"
def verify_network_connection(self) -> bool:
"""
Verifies that the network is available
Returns:
bool: True if network is available, False otherwise
"""
try:
requests.head(self.url, timeout=5, allow_redirects=True)
return True
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError
):
return False
def validate_link(self) -> bool:
"""
Check for 404 error
Returns:
bool: True if link is valid, False otherwise
"""
try:
response = SESSION.head(self.url, timeout=5, allow_redirects=True)
if response.status_code == 404:
return False
else:
return True
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError
):
return False
def get(self, url: str, **kwargs) -> requests.Response:
"""
Wrapper for requests's get method
Implement additional error handling
Parameters:
url (str): URL to get
**kwargs: Additional parameters for requests.get
Returns:
requests.Response: Response object from requests.get
"""
result: requests.Response = None
try:
result = SESSION.get(url, **kwargs)
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError
) as error:
logging.warn(f"Error calling requests.get: {error}")
# Return empty response object
return requests.Response()
return result
def post(self, url: str, **kwargs) -> requests.Response:
"""
Wrapper for requests's post method
Implement additional error handling
Parameters:
url (str): URL to post
**kwargs: Additional parameters for requests.post
Returns:
requests.Response: Response object from requests.post
"""
result: requests.Response = None
try:
result = SESSION.post(url, **kwargs)
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError
) as error:
logging.warn(f"Error calling requests.post: {error}")
# Return empty response object
return requests.Response()
return result
class DownloadObject:
"""
Object for downloading files from the network
Usage:
>>> download_object = DownloadObject(url, path)
>>> download_object.download(display_progress=True)
>>> if download_object.is_active():
>>> print(download_object.get_percent())
>>> if not download_object.download_complete:
>>> print("Download failed")
>>> print("Download complete"")
"""
def __init__(self, url: str, path: str) -> None:
self.url: str = url
self.status: str = DownloadStatus.INACTIVE
self.error_msg: str = ""
self.filename: str = self._get_filename()
self.filepath: Path = Path(path)
self.total_file_size: float = 0.0
self.downloaded_file_size: float = 0.0
self.start_time: float = time.time()
self.error: bool = False
self.should_stop: bool = False
self.download_complete: bool = False
self.has_network: bool = NetworkUtilities(self.url).verify_network_connection()
self.active_thread: threading.Thread = None
self.should_checksum: bool = False
self.checksum = None
self._checksum_storage: hash = None
if self.has_network:
self._populate_file_size()
def __del__(self) -> None:
self.stop()
def download(self, display_progress: bool = False, spawn_thread: bool = True, verify_checksum: bool = False) -> None:
"""
Download the file
Spawns a thread to download the file, so that the main thread can continue
Note sleep is disabled while the download is active
Parameters:
display_progress (bool): Display progress in console
spawn_thread (bool): Spawn a thread to download the file, otherwise download in the current thread
verify_checksum (bool): Calculate checksum of downloaded file if True
"""
self.status = DownloadStatus.DOWNLOADING
logging.info(f"Starting download: {self.filename}")
if spawn_thread:
if self.active_thread:
logging.error("Download already in progress")
return
self.should_checksum = verify_checksum
self.active_thread = threading.Thread(target=self._download, args=(display_progress,))
self.active_thread.start()
return
self.should_checksum = verify_checksum
self._download(display_progress)
def download_simple(self, verify_checksum: bool = False) -> Union[str, bool]:
"""
Alternative to download(), mimics utilities.py's old download_file() function
Parameters:
verify_checksum (bool): Return checksum of downloaded file if True
Returns:
If verify_checksum is True, returns the checksum of the downloaded file
Otherwise, returns True if download was successful, False otherwise
"""
if verify_checksum:
self.should_checksum = True
self.checksum = hashlib.sha256()
self.download(spawn_thread=False)
if not self.download_complete:
return False
return self.checksum.hexdigest() if self.checksum else True
def _get_filename(self) -> str:
"""
Get the filename from the URL
Returns:
str: Filename
"""
return Path(self.url).name
def _populate_file_size(self) -> None:
"""
Get the file size of the file to be downloaded
If unable to get file size, set to zero
"""
try:
result = SESSION.head(self.url, allow_redirects=True, timeout=5)
if 'Content-Length' in result.headers:
self.total_file_size = float(result.headers['Content-Length'])
else:
raise Exception("Content-Length missing from headers")
except Exception as e:
logging.error(f"Error determining file size {self.url}: {str(e)}")
logging.error("Assuming file size is 0")
self.total_file_size = 0.0
def _update_checksum(self, chunk: bytes) -> None:
"""
Update checksum with new chunk
Parameters:
chunk (bytes): Chunk to update checksum with
"""
self._checksum_storage.update(chunk)
def _prepare_working_directory(self, path: Path) -> bool:
"""
Validates working enviroment, including free space and removing existing files
Parameters:
path (str): Path to the file
Returns:
bool: True if successful, False if not
"""
try:
if Path(path).exists():
logging.info(f"Deleting existing file: {path}")
Path(path).unlink()
return True
if not Path(path).parent.exists():
logging.info(f"Creating directory: {Path(path).parent}")
Path(path).parent.mkdir(parents=True, exist_ok=True)
available_space = utilities.get_free_space(Path(path).parent)
if self.total_file_size > available_space:
msg = f"Not enough free space to download {self.filename}, need {utilities.human_fmt(self.total_file_size)}, have {utilities.human_fmt(available_space)}"
logging.error(msg)
raise Exception(msg)
except Exception as e:
self.error = True
self.error_msg = str(e)
self.status = DownloadStatus.ERROR
logging.error(f"Error preparing working directory {path}: {self.error_msg}")
return False
logging.info(f"- Directory ready: {path}")
return True
def _download(self, display_progress: bool = False) -> None:
"""
Download the file
Libraries should invoke download() instead of this method
Parameters:
display_progress (bool): Display progress in console
"""
utilities.disable_sleep_while_running()
try:
if not self.has_network:
raise Exception("No network connection")
if self._prepare_working_directory(self.filepath) is False:
raise Exception(self.error_msg)
response = NetworkUtilities().get(self.url, stream=True, timeout=10)
with open(self.filepath, 'wb') as file:
atexit.register(self.stop)
for i, chunk in enumerate(response.iter_content(1024 * 1024 * 4)):
if self.should_stop:
raise Exception("Download stopped")
if chunk:
file.write(chunk)
self.downloaded_file_size += len(chunk)
if self.should_checksum:
self._update_checksum(chunk)
if display_progress and i % 100:
# Don't use logging here, as we'll be spamming the log file
if self.total_file_size == 0.0:
print(f"Downloaded {utilities.human_fmt(self.downloaded_file_size)} of {self.filename}")
else:
print(f"Downloaded {self.get_percent():.2f}% of {self.filename} ({utilities.human_fmt(self.get_speed())}/s) ({self.get_time_remaining():.2f} seconds remaining)")
self.download_complete = True
logging.info(f"Download complete: {self.filename}")
logging.info("Stats:")
logging.info(f"- Downloaded size: {utilities.human_fmt(self.downloaded_file_size)}")
logging.info(f"- Time elapsed: {(time.time() - self.start_time):.2f} seconds")
logging.info(f"- Speed: {utilities.human_fmt(self.downloaded_file_size / (time.time() - self.start_time))}/s")
logging.info(f"- Location: {self.filepath}")
except Exception as e:
self.error = True
self.error_msg = str(e)
self.status = DownloadStatus.ERROR
logging.error(f"Error downloading {self.url}: {self.error_msg}")
self.status = DownloadStatus.COMPLETE
utilities.enable_sleep_after_running()
def get_percent(self) -> float:
"""
Query the download percent
Returns:
float: The download percent, or -1 if unknown
"""
if self.total_file_size == 0.0:
return -1
return self.downloaded_file_size / self.total_file_size * 100
def get_speed(self) -> float:
"""
Query the download speed
Returns:
float: The download speed in bytes per second
"""
return self.downloaded_file_size / (time.time() - self.start_time)
def get_time_remaining(self) -> float:
"""
Query the time remaining for the download
Returns:
float: The time remaining in seconds, or -1 if unknown
"""
if self.total_file_size == 0.0:
return -1
speed = self.get_speed()
if speed <= 0:
return -1
return (self.total_file_size - self.downloaded_file_size) / speed
def get_file_size(self) -> float:
"""
Query the file size of the file to be downloaded
Returns:
float: The file size in bytes, or 0.0 if unknown
"""
return self.total_file_size
def is_active(self) -> bool:
"""
Query if the download is active
Returns:
boolean: True if active, False if completed, failed, stopped, or inactive
"""
if self.status == DownloadStatus.DOWNLOADING:
return True
return False
def stop(self) -> None:
"""
Stop the download
If the download is active, this function will hold the thread until stopped
"""
self.should_stop = True
if self.active_thread:
while self.active_thread.is_alive():
time.sleep(1)

View File

@@ -0,0 +1,95 @@
"""
reroute_payloads.py: Reroute binaries to tmp directory, and mount a disk image of the payloads
Implements a shadowfile to avoid direct writes to the dmg
"""
import atexit
import plistlib
import tempfile
import subprocess
import logging
from pathlib import Path
from .. import constants
class RoutePayloadDiskImage:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
self._setup_tmp_disk_image()
def _setup_tmp_disk_image(self) -> None:
"""
Initialize temp directory and mount payloads.dmg
Create overlay for patcher to write to
Currently only applicable for GUI variant and not running from source
"""
if self.constants.wxpython_variant is True and not self.constants.launcher_script:
logging.info("Running in compiled binary, switching to tmp directory")
self.temp_dir = tempfile.TemporaryDirectory()
logging.info(f"New payloads location: {self.temp_dir.name}")
logging.info("Creating payloads directory")
Path(self.temp_dir.name / Path("payloads")).mkdir(parents=True, exist_ok=True)
self._unmount_active_dmgs(unmount_all_active=False)
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_path_dmg}",
"-mountpoint", Path(self.temp_dir.name / Path("payloads")),
"-nobrowse",
"-shadow", Path(self.temp_dir.name / Path("payloads_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode == 0:
logging.info("Mounted payloads.dmg")
self.constants.current_path = Path(self.temp_dir.name)
self.constants.payload_path = Path(self.temp_dir.name) / Path("payloads")
atexit.register(self._unmount_active_dmgs, unmount_all_active=False)
else:
logging.info("Failed to mount payloads.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
def _unmount_active_dmgs(self, unmount_all_active: bool = True) -> None:
"""
Unmounts disk images associated with OCLP
Finds all DMGs that are mounted, and forcefully unmount them
If our disk image was previously mounted, we need to unmount it to use again
This can happen if we crash during a previous secession, however 'atexit' class should hopefully avoid this
Parameters:
unmount_all_active (bool): If True, unmount all active DMGs, otherwise only unmount our own DMG
"""
dmg_info = subprocess.run(["/usr/bin/hdiutil", "info", "-plist"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
dmg_info = plistlib.loads(dmg_info.stdout)
for variant in ["DortaniaInternalResources.dmg", "Universal-Binaries.dmg", "payloads.dmg"]:
for image in dmg_info["images"]:
if image["image-path"].endswith(variant):
if unmount_all_active is False:
# Check that only our personal payloads.dmg is unmounted
if "shadow-path" in image:
if self.temp_dir.name in image["shadow-path"]:
logging.info(f"Unmounting personal {variant}")
subprocess.run(
["/usr/bin/hdiutil", "detach", image["system-entities"][0]["dev-entry"], "-force"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
else:
logging.info(f"Unmounting {variant} at: {image['system-entities'][0]['dev-entry']}")
subprocess.run(
["/usr/bin/hdiutil", "detach", image["system-entities"][0]["dev-entry"], "-force"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)

View File

@@ -0,0 +1,156 @@
"""
updates.py: Check for OpenCore Legacy Patcher binary updates
Call check_binary_updates() to determine if any updates are available
Returns dict with Link and Version of the latest binary update if available
"""
import logging
from typing import Optional, Union
from packaging import version
from . import network_handler
from .. import constants
REPO_LATEST_RELEASE_URL: str = "https://api.github.com/repos/dortania/OpenCore-Legacy-Patcher/releases/latest"
class CheckBinaryUpdates:
def __init__(self, global_constants: constants.Constants) -> None:
self.constants: constants.Constants = global_constants
try:
self.binary_version = version.parse(self.constants.patcher_version)
except version.InvalidVersion:
assert self.constants.special_build is True, "Invalid version number for binary"
# Special builds will not have a proper version number
self.binary_version = version.parse("0.0.0")
self.latest_details = None
def check_if_newer(self, version: Union[str, version.Version]) -> bool:
"""
Check if the provided version is newer than the local version
Parameters:
version (str): Version to compare against
Returns:
bool: True if the provided version is newer, False if not
"""
if self.constants.special_build is True:
return False
return self._check_if_build_newer(version, self.binary_version)
def _check_if_build_newer(self, first_version: Union[str, version.Version], second_version: Union[str, version.Version]) -> bool:
"""
Check if the first version is newer than the second version
Parameters:
first_version_str (str): First version to compare against (generally local)
second_version_str (str): Second version to compare against (generally remote)
Returns:
bool: True if first version is newer, False if not
"""
if not isinstance(first_version, version.Version):
try:
first_version = version.parse(first_version)
except version.InvalidVersion:
# Special build > release build: assume special build is newer
return True
if not isinstance(second_version, version.Version):
try:
second_version = version.parse(second_version)
except version.InvalidVersion:
# Release build > special build: assume special build is newer
return False
if first_version == second_version:
if not self.constants.commit_info[0].startswith("refs/tags"):
# Check for nightly builds
return True
return first_version > second_version
def _determine_local_build_type(self) -> str:
"""
Check if the local build is a GUI or TUI build
Returns:
str: "GUI" or "TUI"
"""
return "GUI" if self.constants.wxpython_variant else "TUI"
def _determine_remote_type(self, remote_name: str) -> str:
"""
Check if the remote build is a GUI or TUI build
Parameters:
remote_name (str): Name of the remote build
Returns:
str: "GUI" or "TUI"
"""
if "TUI" in remote_name:
return "TUI"
elif "GUI" in remote_name:
return "GUI"
else:
return "Unknown"
def check_binary_updates(self) -> Optional[dict]:
"""
Check if any updates are available for the OpenCore Legacy Patcher binary
Returns:
dict: Dictionary with Link and Version of the latest binary update if available
"""
if self.constants.special_build is True:
# Special builds do not get updates through the updater
return None
if self.latest_details:
# We already checked
return self.latest_details
if not network_handler.NetworkUtilities(REPO_LATEST_RELEASE_URL).verify_network_connection():
return None
response = network_handler.NetworkUtilities().get(REPO_LATEST_RELEASE_URL)
data_set = response.json()
if "tag_name" not in data_set:
return None
# The release marked as latest will always be stable, and thus, have a proper version number
# But if not, let's not crash the program
try:
latest_remote_version = version.parse(data_set["tag_name"])
except version.InvalidVersion:
return None
if not self._check_if_build_newer(latest_remote_version, self.binary_version):
return None
for asset in data_set["assets"]:
logging.info(f"Found asset: {asset['name']}")
if self._determine_remote_type(asset["name"]) == self._determine_local_build_type():
self.latest_details = {
"Name": asset["name"],
"Version": latest_remote_version,
"Link": asset["browser_download_url"],
"Type": self._determine_remote_type(asset["name"]),
"Github Link": f"https://github.com/dortania/OpenCore-Legacy-Patcher/releases/{latest_remote_version}",
}
return self.latest_details
return None

View File

@@ -0,0 +1,643 @@
"""
utilities.py: Utility functions for OpenCore Legacy Patcher
"""
import os
import re
import math
import atexit
import shutil
import logging
import argparse
import binascii
import plistlib
import subprocess
import py_sip_xnu
from pathlib import Path
from .. import constants
from ..detections import ioreg
from ..datasets import (
os_data,
sip_data
)
def hexswap(input_hex: str):
hex_pairs = [input_hex[i : i + 2] for i in range(0, len(input_hex), 2)]
hex_rev = hex_pairs[::-1]
hex_str = "".join(["".join(x) for x in hex_rev])
return hex_str.upper()
def string_to_hex(input_string):
if not (len(input_string) % 2) == 0:
input_string = "0" + input_string
input_string = hexswap(input_string)
input_string = binascii.unhexlify(input_string)
return input_string
def process_status(process_result):
if process_result.returncode != 0:
logging.info(f"Process failed with exit code {process_result.returncode}")
logging.info(f"Please report the issue on the Discord server")
raise Exception(f"Process result: \n{process_result.stdout.decode()}")
def human_fmt(num):
for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
if abs(num) < 1000.0:
return "%3.1f %s" % (num, unit)
num /= 1000.0
return "%.1f %s" % (num, "EB")
def seconds_to_readable_time(seconds) -> str:
"""
Convert seconds to a readable time format
Parameters:
seconds (int | float | str): Seconds to convert
Returns:
str: Readable time format
"""
seconds = int(seconds)
time = ""
if 0 <= seconds < 60:
return "Less than a minute "
if seconds < 0:
return "Indeterminate time "
years, seconds = divmod(seconds, 31536000)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
if years > 0:
return "Over a year"
if days > 0:
if days > 31:
return "Over a month"
time += f"{days}d "
if hours > 0:
time += f"{hours}h "
if minutes > 0:
time += f"{minutes}m "
#if seconds > 0:
# time += f"{seconds}s"
return time
def header(lines):
lines = [i for i in lines if i is not None]
total_length = len(max(lines, key=len)) + 4
logging.info("#" * (total_length))
for line in lines:
left_side = math.floor(((total_length - 2 - len(line.strip())) / 2))
logging.info("#" + " " * left_side + line.strip() + " " * (total_length - len("#" + " " * left_side + line.strip()) - 1) + "#")
logging.info("#" * total_length)
RECOVERY_STATUS = None
def check_recovery():
global RECOVERY_STATUS # pylint: disable=global-statement # We need to cache the result
if RECOVERY_STATUS is None:
RECOVERY_STATUS = Path("/System/Library/BaseSystem").exists()
return RECOVERY_STATUS
def get_disk_path():
root_partition_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", "/"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
root_mount_path = root_partition_info["DeviceIdentifier"]
root_mount_path = root_mount_path[:-2] if root_mount_path.count("s") > 1 else root_mount_path
return root_mount_path
def check_if_root_is_apfs_snapshot():
root_partition_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", "/"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
try:
is_snapshotted = root_partition_info["APFSSnapshot"]
except KeyError:
is_snapshotted = False
return is_snapshotted
def check_seal():
# 'Snapshot Sealed' property is only listed on booted snapshots
sealed = subprocess.run(["/usr/sbin/diskutil", "apfs", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if "Snapshot Sealed: Yes" in sealed.stdout.decode():
return True
else:
return False
def check_filesystem_type():
# Expected to return 'apfs' or 'hfs'
filesystem_type = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", "/"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
return filesystem_type["FilesystemType"]
def csr_decode(os_sip):
sip_int = py_sip_xnu.SipXnu().get_sip_status().value
for i, current_sip_bit in enumerate(sip_data.system_integrity_protection.csr_values):
if sip_int & (1 << i):
sip_data.system_integrity_protection.csr_values[current_sip_bit] = True
# Can be adjusted to whatever OS needs patching
sip_needs_change = all(sip_data.system_integrity_protection.csr_values[i] for i in os_sip)
if sip_needs_change is True:
return False
else:
return True
def friendly_hex(integer: int):
return "{:02X}".format(integer)
sleep_process = None
def disable_sleep_while_running():
global sleep_process
logging.info("Disabling Idle Sleep")
if sleep_process is None:
# If sleep_process is active, we'll just keep it running
sleep_process = subprocess.Popen(["caffeinate", "-d", "-i", "-s"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Ensures that if we don't properly close the process, 'atexit' will for us
atexit.register(enable_sleep_after_running)
def enable_sleep_after_running():
global sleep_process
if sleep_process:
logging.info("Re-enabling Idle Sleep")
sleep_process.kill()
sleep_process = None
def check_kext_loaded(bundle_id: str) -> str:
"""
Checks if a kext is loaded
Parameters:
bundle_id (str): The bundle ID of the kext to check
Returns:
str: The version of the kext if it is loaded, or "" if it is not loaded
"""
# Name (Version) UUID <Linked Against>
# no UUID for kextstat
pattern = re.compile(re.escape(bundle_id) + r"\s+\((?P<version>.+)\)")
args = ["/usr/sbin/kextstat", "-list-only", "-bundle-id", bundle_id]
if Path("/usr/bin/kmutil").exists():
args = ["/usr/bin/kmutil", "showloaded", "--list-only", "--variant-suffix", "release", "--optional-identifier", bundle_id]
kext_loaded = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if kext_loaded.returncode != 0:
return ""
output = kext_loaded.stdout.decode()
if not output.strip():
return ""
match = pattern.search(output)
if match:
return match.group("version")
return ""
def check_oclp_boot():
if get_nvram("OCLP-Version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True):
return True
else:
return False
def check_monterey_wifi():
IO80211ElCap = "com.apple.iokit.IO80211ElCap"
CoreCaptureElCap = "com.apple.driver.corecaptureElCap"
loaded_kexts: str = subprocess.run(["/usr/sbin/kextcache"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode()
if IO80211ElCap in loaded_kexts and CoreCaptureElCap in loaded_kexts:
return True
else:
return False
def check_metal_support(device_probe, computer):
if computer.gpus:
for gpu in computer.gpus:
if (
(gpu.arch in [
device_probe.NVIDIA.Archs.Tesla,
device_probe.NVIDIA.Archs.Fermi,
device_probe.NVIDIA.Archs.Maxwell,
device_probe.NVIDIA.Archs.Pascal,
device_probe.AMD.Archs.TeraScale_1,
device_probe.AMD.Archs.TeraScale_2,
device_probe.Intel.Archs.Iron_Lake,
device_probe.Intel.Archs.Sandy_Bridge
]
)
):
return False
return True
def check_filevault_skip():
# Check whether we can skip FileVault check with Root Patching
nvram = get_nvram("OCLP-Settings", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
if nvram:
if "-allow_fv" in nvram:
return True
return False
def check_secure_boot_model():
sbm_byte = get_nvram("HardwareModel", "94B73556-2197-4702-82A8-3E1337DAFBFB", decode=False)
if sbm_byte:
sbm_byte = sbm_byte.replace(b"\x00", b"")
sbm_string = sbm_byte.decode("utf-8")
return sbm_string
return None
def check_ap_security_policy():
ap_security_policy_byte = get_nvram("AppleSecureBootPolicy", "94B73556-2197-4702-82A8-3E1337DAFBFB", decode=False)
if ap_security_policy_byte:
# Supported Apple Secure Boot Policy values:
# AppleImg4SbModeDisabled = 0,
# AppleImg4SbModeMedium = 1,
# AppleImg4SbModeFull = 2
# Ref: https://github.com/acidanthera/OpenCorePkg/blob/f7c1a3d483fa2535b6a62c25a4f04017bfeee09a/Include/Apple/Protocol/AppleImg4Verification.h#L27-L31
return int.from_bytes(ap_security_policy_byte, byteorder="little")
return 0
def check_secure_boot_level():
if check_secure_boot_model() in constants.Constants().sbm_values:
# OpenCorePkg logic:
# - If a T2 Unit is used with ApECID, will return 2
# - Either x86legacy or T2 without ApECID, returns 1
# - Disabled, returns 0
# Ref: https://github.com/acidanthera/OpenCorePkg/blob/f7c1a3d483fa2535b6a62c25a4f04017bfeee09a/Library/OcMainLib/OpenCoreUefi.c#L490-L502
#
# Genuine Mac logic:
# - On genuine non-T2 Macs, they always return 0
# - T2 Macs will return based on their Startup Policy (Full(2), Medium(1), Disabled(0))
# Ref: https://support.apple.com/en-us/HT208198
if check_ap_security_policy() != 0:
return True
else:
return False
return False
def patching_status(os_sip, os):
# Detection for Root Patching
sip_enabled = True # System Integrity Protection
sbm_enabled = True # Secure Boot Status (SecureBootModel)
fv_enabled = True # FileVault
dosdude_patched = True
gen6_kext = "/System/Library/Extension/AppleIntelHDGraphics.kext"
gen7_kext = "/System/Library/Extension/AppleIntelHD3000Graphics.kext"
sbm_enabled = check_secure_boot_level()
if os > os_data.os_data.yosemite:
sip_enabled = csr_decode(os_sip)
else:
sip_enabled = False
if os > os_data.os_data.catalina and not check_filevault_skip():
# Assume non-OCLP Macs do not have our APFS seal patch
fv_status: str = subprocess.run(["/usr/bin/fdesetup", "status"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.decode()
if "FileVault is Off" in fv_status:
fv_enabled = False
else:
fv_enabled = False
if not (Path(gen6_kext).exists() and Path(gen7_kext).exists()):
dosdude_patched = False
return sip_enabled, sbm_enabled, fv_enabled, dosdude_patched
clear = True
def disable_cls():
global clear
clear = False
def cls():
global clear
if not clear:
return
if check_cli_args() is None:
# Our GUI does not support clear screen
if not check_recovery():
os.system("cls" if os.name == "nt" else "clear")
else:
logging.info("\u001Bc")
def check_command_line_tools():
# Determine whether Command Line Tools exist
xcode_select = subprocess.run(["/usr/bin/xcode-select", "--print-path"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if xcode_select.returncode == 0:
return True
else:
return False
def get_nvram(variable: str, uuid: str = None, *, decode: bool = False):
# TODO: Properly fix for El Capitan, which does not print the XML representation even though we say to
if uuid is not None:
uuid += ":"
else:
uuid = ""
nvram = ioreg.IORegistryEntryFromPath(ioreg.kIOMasterPortDefault, "IODeviceTree:/options".encode())
value = ioreg.IORegistryEntryCreateCFProperty(nvram, f"{uuid}{variable}", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)
ioreg.IOObjectRelease(nvram)
if not value:
return None
value = ioreg.corefoundation_to_native(value)
if decode:
if isinstance(value, bytes):
try:
value = value.strip(b"\0").decode()
except UnicodeDecodeError:
# Some sceanrios the firmware will throw garbage in
# ie. iMac12,2 with FireWire boot-path
value = None
elif isinstance(value, str):
value = value.strip("\0")
return value
def get_rom(variable: str, *, decode: bool = False):
# TODO: Properly fix for El Capitan, which does not print the XML representation even though we say to
rom = ioreg.IORegistryEntryFromPath(ioreg.kIOMasterPortDefault, "IODeviceTree:/rom".encode())
value = ioreg.IORegistryEntryCreateCFProperty(rom, variable, ioreg.kCFAllocatorDefault, ioreg.kNilOptions)
ioreg.IOObjectRelease(rom)
if not value:
return None
value = ioreg.corefoundation_to_native(value)
if decode and isinstance(value, bytes):
value = value.strip(b"\0").decode()
return value
def get_firmware_vendor(*, decode: bool = False):
efi = ioreg.IORegistryEntryFromPath(ioreg.kIOMasterPortDefault, "IODeviceTree:/efi".encode())
value = ioreg.IORegistryEntryCreateCFProperty(efi, "firmware-vendor", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)
ioreg.IOObjectRelease(efi)
if not value:
return None
value = ioreg.corefoundation_to_native(value)
if decode:
if isinstance(value, bytes):
value = value.strip(b"\0").decode()
elif isinstance(value, str):
value = value.strip("\0")
return value
def find_apfs_physical_volume(device):
# ex: disk3s1s1
# return: [disk0s2]
disk_list = None
physical_disks = []
try:
disk_list = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", device], stdout=subprocess.PIPE).stdout)
except TypeError:
pass
if disk_list:
try:
# Note: Fusion Drive Macs return multiple APFSPhysicalStores:
# APFSPhysicalStores:
# - 0:
# APFSPhysicalStore: disk0s2
# - 1:
# APFSPhysicalStore: disk3s2
for disk in disk_list["APFSPhysicalStores"]:
physical_disks.append(disk["APFSPhysicalStore"])
except KeyError:
pass
return physical_disks
def clean_device_path(device_path: str):
# ex:
# 'PciRoot(0x0)/Pci(0xA,0x0)/Sata(0x0,0x0,0x0)/HD(1,GPT,C0778F23-3765-4C8E-9BFA-D60C839E7D2D,0x28,0x64000)/EFI\OC\OpenCore.efi'
# 'PciRoot(0x0)/Pci(0x1A,0x7)/USB(0x0,0x0)/USB(0x2,0x0)/HD(2,GPT,4E929909-2074-43BA-9773-61EBC110A670,0x64800,0x38E3000)/EFI\OC\OpenCore.efi'
# 'PciRoot(0x0)/Pci(0x1A,0x7)/USB(0x0,0x0)/USB(0x1,0x0)/\EFI\OC\OpenCore.efi'
# return:
# 'C0778F23-3765-4C8E-9BFA-D60C839E7D2D'
# '4E929909-2074-43BA-9773-61EBC110A670'
# 'None'
if device_path:
if not any(partition in device_path for partition in ["GPT", "MBR"]):
return None
device_path_array = device_path.split("/")
# we can always assume [-1] is 'EFI\OC\OpenCore.efi'
if len(device_path_array) >= 2:
device_path_stripped = device_path_array[-2]
device_path_root_array = device_path_stripped.split(",")
if len(device_path_root_array) > 2:
return device_path_root_array[2]
return None
def find_disk_off_uuid(uuid):
# Find disk by UUID
disk_list = None
try:
disk_list = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", uuid], stdout=subprocess.PIPE).stdout)
except TypeError:
pass
if disk_list:
try:
return disk_list["DeviceIdentifier"]
except KeyError:
pass
return None
def get_free_space(disk=None):
"""
Get free space on disk in bytes
Parameters:
disk (str): Path to mounted disk (or folder on disk)
Returns:
int: Free space in bytes
"""
if disk is None:
disk = "/"
total, used, free = shutil.disk_usage(disk)
return free
def grab_mount_point_from_disk(disk):
data = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", disk], stdout=subprocess.PIPE).stdout.decode().strip().encode())
return data["MountPoint"]
def monitor_disk_output(disk):
# Returns MB written on drive
output = subprocess.check_output(["/usr/sbin/iostat", "-Id", disk])
output = output.decode("utf-8")
# Grab second last entry (last is \n)
output = output.split(" ")
output = output[-2]
return output
def get_preboot_uuid() -> str:
"""
Get the UUID of the Preboot volume
"""
args = ["/usr/sbin/ioreg", "-a", "-n", "chosen", "-p", "IODeviceTree", "-r"]
output = plistlib.loads(subprocess.run(args, stdout=subprocess.PIPE).stdout)
return output[0]["apfs-preboot-uuid"].strip(b"\0").decode()
def block_os_updaters():
# Disables any processes that would be likely to mess with
# the root volume while we're working with it.
bad_processes = [
"softwareupdate",
"SoftwareUpdate",
"Software Update",
"MobileSoftwareUpdate",
]
output = subprocess.check_output(["ps", "-ax"])
lines = output.splitlines()
for line in lines:
entry = line.split()
pid = entry[0].decode()
current_process = entry[3].decode()
for bad_process in bad_processes:
if bad_process in current_process:
if pid != "":
logging.info(f"Killing Process: {pid} - {current_process.split('/')[-1]}")
subprocess.run(["/bin/kill", "-9", pid])
break
def check_boot_mode():
# Check whether we're in Safe Mode or not
try:
sys_plist = plistlib.loads(subprocess.run(["/usr/sbin/system_profiler", "SPSoftwareDataType"], stdout=subprocess.PIPE).stdout)
return sys_plist[0]["_items"][0]["boot_mode"]
except (KeyError, TypeError, plistlib.InvalidFileException):
return None
def elevated(*args, **kwargs) -> subprocess.CompletedProcess:
# When running through our GUI, we run as root, however we do not get uid 0
# Best to assume CLI is running as root
if os.getuid() == 0 or check_cli_args() is not None:
return subprocess.run(*args, **kwargs)
else:
return subprocess.run(["/usr/bin/sudo"] + [args[0][0]] + args[0][1:], **kwargs)
def fetch_staged_update(variant: str = "Update") -> tuple[str, str]:
"""
Check for staged macOS update
Supported variants:
- Preflight
- Update
"""
os_build = None
os_version = None
update_config = f"/System/Volumes/Update/{variant}.plist"
if not Path(update_config).exists():
return (None, None)
try:
update_staged = plistlib.load(open(update_config, "rb"))
except:
return (None, None)
if "update-asset-attributes" not in update_staged:
return (None, None)
os_build = update_staged["update-asset-attributes"]["Build"]
os_version = update_staged["update-asset-attributes"]["OSVersion"]
return os_version, os_build
def check_cli_args():
parser = argparse.ArgumentParser()
parser.add_argument("--build", help="Build OpenCore", action="store_true", required=False)
parser.add_argument("--verbose", help="Enable verbose boot", action="store_true", required=False)
parser.add_argument("--debug_oc", help="Enable OpenCore DEBUG", action="store_true", required=False)
parser.add_argument("--debug_kext", help="Enable kext DEBUG", action="store_true", required=False)
parser.add_argument("--hide_picker", help="Hide OpenCore picker", action="store_true", required=False)
parser.add_argument("--disable_sip", help="Disable SIP", action="store_true", required=False)
parser.add_argument("--disable_smb", help="Disable SecureBootModel", action="store_true", required=False)
parser.add_argument("--vault", help="Enable OpenCore Vaulting", action="store_true", required=False)
parser.add_argument("--support_all", help="Allow OpenCore on natively supported Models", action="store_true", required=False)
parser.add_argument("--firewire", help="Enable FireWire Booting", action="store_true", required=False)
parser.add_argument("--nvme", help="Enable NVMe Booting", action="store_true", required=False)
parser.add_argument("--wlan", help="Enable Wake on WLAN support", action="store_true", required=False)
# parser.add_argument("--disable_amfi", help="Disable AMFI", action="store_true", required=False)
parser.add_argument("--moderate_smbios", help="Moderate SMBIOS Patching", action="store_true", required=False)
parser.add_argument("--disable_tb", help="Disable Thunderbolt on 2013-2014 MacBook Pros", action="store_true", required=False)
parser.add_argument("--force_surplus", help="Force SurPlus in all newer OSes", action="store_true", required=False)
# Building args requiring value values (ie. --model iMac12,2)
parser.add_argument("--model", action="store", help="Set custom model", required=False)
parser.add_argument("--disk", action="store", help="Specifies disk to install to", required=False)
parser.add_argument("--smbios_spoof", action="store", help="Set SMBIOS patching mode", required=False)
# sys_patch args
parser.add_argument("--patch_sys_vol", help="Patches root volume", action="store_true", required=False)
parser.add_argument("--unpatch_sys_vol", help="Unpatches root volume, EXPERIMENTAL", action="store_true", required=False)
parser.add_argument("--prepare_for_update", help="Prepares host for macOS update, ex. clean /Library/Extensions", action="store_true", required=False)
parser.add_argument("--cache_os", help="Caches patcher files (ex. KDKs) for incoming OS in Preflight.plist", action="store_true", required=False)
# validation args
parser.add_argument("--validate", help="Runs Validation Tests for CI", action="store_true", required=False)
# GUI args
parser.add_argument("--gui_patch", help="Starts GUI in Root Patcher", action="store_true", required=False)
parser.add_argument("--gui_unpatch", help="Starts GUI in Root Unpatcher", action="store_true", required=False)
parser.add_argument("--auto_patch", help="Check if patches are needed and prompt user", action="store_true", required=False)
parser.add_argument("--update_installed", help="Prompt user to finish updating via GUI", action="store_true", required=False)
args = parser.parse_args()
if not (
args.build or
args.patch_sys_vol or
args.unpatch_sys_vol or
args.validate or
args.auto_patch or
args.prepare_for_update or
args.cache_os
):
return None
else:
return args

View File

@@ -0,0 +1,315 @@
"""
validation.py: Validation class for the patcher
"""
import logging
import subprocess
from pathlib import Path
from . import network_handler
from .. import constants
from ..sys_patch import sys_patch_helpers
from ..efi_builder import build
from ..datasets import (
example_data,
model_array,
sys_patch_dict,
os_data
)
class PatcherValidation:
"""
Validation class for the patcher
Primarily for Continuous Integration
"""
def __init__(self, global_constants: constants.Constants, verify_unused_files: bool = False) -> None:
self.constants: constants.Constants = global_constants
self.verify_unused_files = verify_unused_files
self.active_patchset_files = []
self.constants.validate = True
self.valid_dumps = [
example_data.MacBookPro.MacBookPro92_Stock,
example_data.MacBookPro.MacBookPro111_Stock,
example_data.MacBookPro.MacBookPro133_Stock,
example_data.Macmini.Macmini52_Stock,
example_data.Macmini.Macmini61_Stock,
example_data.Macmini.Macmini71_Stock,
example_data.iMac.iMac81_Stock,
example_data.iMac.iMac112_Stock,
example_data.iMac.iMac122_Upgraded,
example_data.iMac.iMac122_Upgraded_Nvidia,
example_data.iMac.iMac151_Stock,
example_data.MacPro.MacPro31_Stock,
example_data.MacPro.MacPro31_Upgrade,
example_data.MacPro.MacPro31_Modern_AMD,
example_data.MacPro.MacPro31_Modern_Kepler,
example_data.MacPro.MacPro41_Upgrade,
example_data.MacPro.MacPro41_Modern_AMD,
example_data.MacPro.MacPro41_51__Flashed_Modern_AMD,
example_data.MacPro.MacPro41_51_Flashed_NVIDIA_WEB_DRIVERS,
]
self.valid_dumps_native = [
example_data.iMac.iMac201_Stock,
example_data.MacBookPro.MacBookPro141_SSD_Upgrade,
]
self._validate_configs()
self._validate_sys_patch()
def _build_prebuilt(self) -> None:
"""
Generate a build for each predefined model
Then validate against ocvalidate
"""
for model in model_array.SupportedSMBIOS:
logging.info(f"Validating predefined model: {model}")
self.constants.custom_model = model
build.BuildOpenCore(self.constants.custom_model, self.constants)
result = subprocess.run([self.constants.ocvalidate_path, f"{self.constants.opencore_release_folder}/EFI/OC/config.plist"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Error on build!")
logging.info(result.stdout.decode())
raise Exception(f"Validation failed for predefined model: {model}")
else:
logging.info(f"Validation succeeded for predefined model: {model}")
def _build_dumps(self) -> None:
"""
Generate a build for each predefined model
Then validate against ocvalidate
"""
for model in self.valid_dumps:
self.constants.computer = model
self.constants.custom_model = ""
logging.info(f"Validating dumped model: {self.constants.computer.real_model}")
build.BuildOpenCore(self.constants.computer.real_model, self.constants)
result = subprocess.run([self.constants.ocvalidate_path, f"{self.constants.opencore_release_folder}/EFI/OC/config.plist"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Error on build!")
logging.info(result.stdout.decode())
raise Exception(f"Validation failed for predefined model: {self.constants.computer.real_model}")
else:
logging.info(f"Validation succeeded for predefined model: {self.constants.computer.real_model}")
def _validate_root_patch_files(self, major_kernel: int, minor_kernel: int) -> None:
"""
Validate that all files in the patchset are present in the payload
Parameters:
major_kernel (int): Major kernel version
minor_kernel (int): Minor kernel version
"""
patchset = sys_patch_dict.SystemPatchDictionary(major_kernel, minor_kernel, self.constants.legacy_accel_support, self.constants.detected_os_version).patchset_dict
host_os_float = float(f"{major_kernel}.{minor_kernel}")
for patch_subject in patchset:
for patch_core in patchset[patch_subject]:
patch_os_min_float = float(f'{patchset[patch_subject][patch_core]["OS Support"]["Minimum OS Support"]["OS Major"]}.{patchset[patch_subject][patch_core]["OS Support"]["Minimum OS Support"]["OS Minor"]}')
patch_os_max_float = float(f'{patchset[patch_subject][patch_core]["OS Support"]["Maximum OS Support"]["OS Major"]}.{patchset[patch_subject][patch_core]["OS Support"]["Maximum OS Support"]["OS Minor"]}')
if (host_os_float < patch_os_min_float or host_os_float > patch_os_max_float):
continue
for install_type in ["Install", "Install Non-Root"]:
if install_type in patchset[patch_subject][patch_core]:
for install_directory in patchset[patch_subject][patch_core][install_type]:
for install_file in patchset[patch_subject][patch_core][install_type][install_directory]:
source_file = str(self.constants.payload_local_binaries_root_path) + "/" + patchset[patch_subject][patch_core][install_type][install_directory][install_file] + install_directory + "/" + install_file
if not Path(source_file).exists():
logging.info(f"File not found: {source_file}")
raise Exception(f"Failed to find {source_file}")
if self.verify_unused_files is True:
self.active_patchset_files.append(source_file)
logging.info(f"Validating against Darwin {major_kernel}.{minor_kernel}")
if not sys_patch_helpers.SysPatchHelpers(self.constants).generate_patchset_plist(patchset, f"OpenCore-Legacy-Patcher-{major_kernel}.{minor_kernel}.plist", None):
raise Exception("Failed to generate patchset plist")
# Remove the plist file after validation
Path(self.constants.payload_path / f"OpenCore-Legacy-Patcher-{major_kernel}.{minor_kernel}.plist").unlink()
def _validate_sys_patch(self) -> None:
"""
Validates sys_patch modules
"""
if not Path(self.constants.payload_local_binaries_root_path_dmg).exists():
dl_obj = network_handler.DownloadObject(f"https://github.com/dortania/PatcherSupportPkg/releases/download/{self.constants.patcher_support_pkg_version}/Universal-Binaries.dmg", self.constants.payload_local_binaries_root_path_dmg)
dl_obj.download(spawn_thread=False)
if dl_obj.download_complete is False:
logging.info("Failed to download Universal-Binaries.dmg")
raise Exception("Failed to download Universal-Binaries.dmg")
logging.info("Validating Root Patch File integrity")
if Path(self.constants.payload_path / Path("Universal-Binaries_overlay")).exists():
subprocess.run(
[
"/bin/rm", "-f", Path(self.constants.payload_path / Path("Universal-Binaries_overlay"))
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if Path(self.constants.payload_path / Path("Universal-Binaries")).exists():
output = subprocess.run(
[
"/usr/bin/hdiutil", "detach", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-force"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("Failed to unmount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
raise Exception("Failed to unmount Universal-Binaries.dmg")
output = subprocess.run(
[
"/usr/bin/hdiutil", "attach", "-noverify", f"{self.constants.payload_local_binaries_root_path_dmg}",
"-mountpoint", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-nobrowse",
"-shadow", Path(self.constants.payload_path / Path("Universal-Binaries_overlay")),
"-passphrase", "password"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("Failed to mount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
raise Exception("Failed to mount Universal-Binaries.dmg")
logging.info("Mounted Universal-Binaries.dmg")
for supported_os in [os_data.os_data.big_sur, os_data.os_data.monterey, os_data.os_data.ventura, os_data.os_data.sonoma]:
for i in range(0, 10):
self._validate_root_patch_files(supported_os, i)
logging.info("Validating SNB Board ID patcher")
self.constants.computer.reported_board_id = "Mac-7BA5B2DFE22DDD8C"
sys_patch_helpers.SysPatchHelpers(self.constants).snb_board_id_patch(self.constants.payload_local_binaries_root_path)
if self.verify_unused_files is True:
self._find_unused_files()
# unmount the dmg
output = subprocess.run(
[
"/usr/bin/hdiutil", "detach", Path(self.constants.payload_path / Path("Universal-Binaries")),
"-force"
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if output.returncode != 0:
logging.info("Failed to unmount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
raise Exception("Failed to unmount Universal-Binaries.dmg")
subprocess.run(
[
"/bin/rm", "-f", Path(self.constants.payload_path / Path("Universal-Binaries_overlay"))
],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
def _find_unused_files(self) -> None:
"""
Find PatcherSupportPkg files that are unused by the patcher
Note this function is extremely slow, so only manually run when needed
"""
if self.active_patchset_files == []:
return
unused_files = []
for file in Path(self.constants.payload_local_binaries_root_path).rglob("*"):
if file.is_dir():
continue
relative_path = Path(file).relative_to(self.constants.payload_local_binaries_root_path)
if relative_path.name == ".DS_Store":
continue
if str(relative_path) in [".fseventsd/fseventsd-uuid", ".signed"]:
continue
is_used = False
for used_file in self.active_patchset_files:
used_relative_path = Path(used_file).relative_to(self.constants.payload_local_binaries_root_path)
if str(relative_path) in str(used_relative_path):
is_used = True
break
if str(used_relative_path) in str(relative_path):
is_used = True
break
if is_used:
continue
unused_files.append(relative_path)
if len(unused_files) > 0:
logging.info("Unused files found:")
for file in unused_files:
logging.info(f" {file}")
def _validate_configs(self) -> None:
"""
Validates build modules
"""
# First run is with default settings
self._build_prebuilt()
self._build_dumps()
# Second run, flip all settings
self.constants.verbose_debug = True
self.constants.opencore_debug = True
self.constants.kext_debug = True
self.constants.kext_variant = "DEBUG"
self.constants.kext_debug = True
self.constants.showpicker = False
self.constants.sip_status = False
self.constants.secure_status = True
self.constants.firewire_boot = True
self.constants.nvme_boot = True
self.constants.enable_wake_on_wlan = True
self.constants.disable_tb = True
self.constants.force_surplus = True
self.constants.software_demux = True
self.constants.serial_settings = "Minimal"
self._build_prebuilt()
self._build_dumps()
subprocess.run(["/bin/rm", "-rf", self.constants.build_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)