logging: Adjust file handling path

This commit is contained in:
Mykola Grymalyuk
2023-02-04 10:26:36 -07:00
parent eb1e29f95b
commit 976f14eeb3
6 changed files with 440 additions and 114 deletions

1
.gitignore vendored
View File

@@ -32,3 +32,4 @@ __pycache__/
/payloads.dmg
/payloads/OpenCore-Legacy-Patcher-*.plist
/payloads/KDK.dmg
*.log

View File

@@ -7,8 +7,17 @@
- Ensure `Moraea_BlurBeta` is set on non-Metal systems
- Implement proper Root Unpatching verification in GUI
- Removes arbitrary patch requirements used against unpatching (ex. network connection)
- Prioritize KdkSupportPkg repository for downloads
- Skips calls to Apple's now defunct Developer Portal API
- Backend Changes:
- Refactored kdk_handler.py
- Prioritizes KdkSupportPkg repository for downloads
- Skips calls to Apple's now defunct Developer Portal API
- Support local loose matching when no network connection is available
- Implemented logging framework usage for more reliable logging
- Logs are stored under `~/OpenCore-Patcher-vX.Y.Z.log`
- Implemented new network_handler.py module
- Allows for more reliable network calls and downloads
- Better supports network timeouts and disconnects
- Dramatically less noise in console during downloads
- Increment Binaries:
- PatcherSupportPkg 0.8.3 - release

View File

@@ -1290,9 +1290,55 @@ class wx_python_gui:
self.subheader.Centre(wx.HORIZONTAL)
self.developer_note.SetLabel("Starting shortly")
logging.getLogger().handlers[1].stream = menu_redirect.RedirectLabel(self.developer_note)
kdk_result, error_msg, detected_build = kdk_handler.kernel_debug_kit_handler(self.constants).download_kdk(self.constants.detected_os_version, self.constants.detected_os_build)
logging.getLogger().handlers[1].stream = self.stock_stream
kdk_result = False
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is True:
kdk_download_obj = kdk_obj.retrieve_download()
if not kdk_download_obj:
kdk_result = True
else:
kdk_download_obj.download()
self.header.SetLabel(f"Downloading KDK Build: {kdk_obj.kdk_url_build}")
self.header.Centre(wx.HORIZONTAL)
self.progress_bar.SetValue(0)
# Set below developer note
self.progress_bar.SetPosition(
wx.Point(
self.developer_note.GetPosition().x,
self.developer_note.GetPosition().y + self.developer_note.GetSize().height + 10
)
)
self.progress_bar.Centre(wx.HORIZONTAL)
self.progress_bar.Show()
self.frame.SetSize(-1, self.progress_bar.GetPosition().y + self.progress_bar.GetSize().height + 60)
while kdk_download_obj.is_active():
self.subheader.SetLabel(f"{utilities.human_fmt(kdk_download_obj.downloaded_file_size)} downloaded of {utilities.human_fmt(kdk_download_obj.total_file_size)} ({kdk_download_obj.get_percent():.2f}%)")
self.subheader.Centre(wx.HORIZONTAL)
self.developer_note.SetLabel(
f"Average download speed: {utilities.human_fmt(kdk_download_obj.get_speed())}/s"
)
self.developer_note.Centre(wx.HORIZONTAL)
self.progress_bar.SetValue(kdk_download_obj.get_percent())
wx.GetApp().Yield()
time.sleep(0.1)
if kdk_download_obj.download_complete is False:
logging.error("Failed to download KDK")
logging.error(kdk_download_obj.error_msg)
error_msg = kdk_download_obj.error_msg
else:
kdk_result = kdk_obj.validate_kdk_checksum()
error_msg = kdk_obj.error_msg
else:
logging.error("Failed to download KDK")
logging.error(kdk_obj.error_msg)
error_msg = kdk_obj.error_msg
if kdk_result is False:
# Create popup window to inform user of error

View File

@@ -1,4 +1,5 @@
# Kernel Debug Kit downloader
# Module for parsing and determining best Kernel Debug Kit for host OS
# Copyright (C) 2022-2023, Dhinak G, Mykola Grymalyuk
import datetime
from pathlib import Path
@@ -8,24 +9,87 @@ import packaging.version
import requests
import subprocess
import os
import logging
from resources import utilities
from resources import utilities, network_handler
from resources.constants import Constants
KDK_INSTALL_PATH = "/Library/Developer/KDKs"
class kernel_debug_kit_handler:
def __init__(self, constants: Constants):
self.constants = constants
def get_available_kdks(self):
class KernelDebugKitObject:
"""
Library for querying and downloading Kernel Debug Kits (KDK) for macOS
Usage:
>>> kdk_object = KernelDebugKitObject(constants, host_build, host_version)
>>> if kdk_object.success:
>>> # Query whether a KDK is already installed
>>> if kdk_object.kdk_already_installed:
>>> # Use the installed KDK
>>> kdk_path = kdk_object.kdk_installed_path
>>> else:
>>> # Get DownloadObject for the KDK
>>> # See network_handler.py's DownloadObject documentation for usage
>>> kdk_download_object = kdk_object.retrieve_download()
>>> # Once downloaded, recommend verifying KDK's checksum
>>> valid = kdk_object.validate_kdk_checksum()
"""
def __init__(self, constants: Constants, host_build: str, host_version: str):
self.constants: Constants = constants
self.host_build: str = host_build # ex. 20A5384c
self.host_version: str = host_version # ex. 11.0.1
self.kdk_already_installed: bool = False
self.kdk_installed_path: str = ""
self.kdk_url: str = ""
self.kdk_url_build: str = ""
self.kdk_url_version: str = ""
self.kdk_url_is_exactly_match: bool = False
self.kdk_closest_match_url: str = ""
self.kdk_closest_match_url_build: str = ""
self.kdk_closest_match_url_version: str = ""
self.success: bool = False
self.error_msg: str = ""
self._get_latest_kdk()
def _get_available_kdks(self):
"""
Fetches a list of available KDKs from the KdkSupportPkg API
Returns:
list: A list of KDKs, sorted by version and date if available. Returns None if the API is unreachable
"""
KDK_API_LINK = "https://raw.githubusercontent.com/dortania/KdkSupportPkg/gh-pages/manifest.json"
logging.info("Fetching available KDKs")
logging.info("- Pulling KDK list from KdkSupportPkg API")
try:
results = utilities.SESSION.get(KDK_API_LINK, headers={"User-Agent": f"OCLP/{self.constants.patcher_version}"}, timeout=10)
results = network_handler.SESSION.get(
KDK_API_LINK,
headers={
"User-Agent": f"OCLP/{self.constants.patcher_version}"
},
timeout=10
)
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectionError):
logging.info("- Could not contact KDK API")
return None
@@ -36,112 +100,263 @@ class kernel_debug_kit_handler:
return sorted(results.json(), key=lambda x: (packaging.version.parse(x["version"]), datetime.datetime.fromisoformat(x["date"])), reverse=True)
def download_kdk(self, version: str, build: str):
detected_build = build
if self.is_kdk_installed(detected_build) is True:
logging.info("- KDK is already installed")
self.remove_unused_kdks(exclude_builds=[detected_build])
return True, "", detected_build
def _get_latest_kdk(self, host_build: str = None, host_version: str = None):
"""
Fetches the latest KDK for the current macOS version
download_link = None
closest_match_download_link = None
closest_version = ""
closest_build = ""
Args:
host_build (str, optional): The build version of the current macOS version.
If empty, will use the host_build from the class. Defaults to None.
host_version (str, optional): The version of the current macOS version.
If empty, will use the host_version from the class. Defaults to None.
"""
kdk_list = self.get_available_kdks()
if host_build is None and host_version is None:
host_build = self.host_build
host_version = self.host_version
parsed_version = cast(packaging.version.Version, packaging.version.parse(version))
logging.info(f"- Fetching latest KDK for {host_build} ({host_version})")
self.kdk_installed_path = self._local_kdk_installed_build()
if self.kdk_installed_path:
logging.info(f"- KDK already installed ({Path(self.kdk_installed_path).name}), skipping")
self.kdk_already_installed = True
self.success = True
return
if kdk_list:
for kdk in kdk_list:
kdk_version = cast(packaging.version.Version, packaging.version.parse(kdk["version"]))
if kdk["build"] == build:
download_link = kdk["url"]
elif not closest_match_download_link and kdk_version <= parsed_version and kdk_version.major == parsed_version.major and (kdk_version.minor in range(parsed_version.minor - 1, parsed_version.minor + 1)):
# The KDK list is already sorted by version then date, so the first match is the closest
closest_match_download_link = kdk["url"]
closest_version = kdk["version"]
closest_build = kdk["build"]
remote_kdk_version = self._get_available_kdks()
parsed_version = cast(packaging.version.Version, packaging.version.parse(host_version))
if remote_kdk_version is None:
logging.warning("- Failed to fetch KDK list, falling back to local KDK matching")
# First check if a KDK matching the current macOS version is installed
# ex. 13.0.1 vs 13.0
loose_version = f"{parsed_version.major}.{parsed_version.minor}"
logging.info(f"- Checking for KDKs loosely matching {loose_version}")
self.kdk_installed_path = self._local_kdk_installed_version(loose_version)
if self.kdk_installed_path:
logging.info(f"- Found matching KDK: {Path(self.kdk_installed_path).name}")
self.success = True
return
older_version = f"{parsed_version.major}.{parsed_version.minor - 1 if parsed_version.minor > 0 else 0}"
logging.info(f"- Checking for KDKs matching {older_version}")
self.kdk_installed_path = self._local_kdk_installed_version(older_version)
if self.kdk_installed_path:
logging.info(f"- Found matching KDK: {Path(self.kdk_installed_path).name}")
self.success = True
return
logging.warning(f"- Couldn't find KDK matching {host_version} or {older_version}, please install one manually")
self.error_msg = f"Could not contact KdkSupportPkg API, and no KDK matching {host_version} ({host_build}) or {older_version} was installed.\nPlease ensure you have a network connection or manually install a KDK."
return
for kdk in remote_kdk_version:
kdk_version = cast(packaging.version.Version, packaging.version.parse(kdk["version"]))
if (kdk["build"] == host_build):
self.kdk_url = kdk["url"]
self.kdk_url_build = kdk["build"]
self.kdk_url_version = kdk["version"]
self.kdk_url_is_exactly_match = True
break
if kdk_version <= parsed_version and kdk_version.major == parsed_version.major and (kdk_version.minor in range(parsed_version.minor - 1, parsed_version.minor + 1)):
# The KDK list is already sorted by version then date, so the first match is the closest
self.kdk_closest_match_url = kdk["url"]
self.kdk_closest_match_url_build = kdk["build"]
self.kdk_closest_match_url_version = kdk["version"]
self.kdk_url_is_exactly_match = False
break
if self.kdk_url == "":
if self.kdk_closest_match_url == "":
logging.warning(f"- No KDKs found for {host_build} ({host_version})")
self.error_msg = f"No KDKs found for {host_build} ({host_version})"
return
logging.info(f"- No direct match found for {host_build}, falling back to closest match")
logging.info(f"- Closest Match: {self.kdk_closest_match_url_build} ({self.kdk_closest_match_url_version})")
self.kdk_url = self.kdk_closest_match_url
self.kdk_url_build = self.kdk_closest_match_url_build
self.kdk_url_version = self.kdk_closest_match_url_version
else:
msg = "Could not fetch KDK list"
logging.info(f"- {msg}")
return False, msg, ""
logging.info(f"- Direct match found for {host_build} ({host_version})")
logging.info(f"- Checking for KDK matching macOS {version} build {build}")
# download_link is None if no matching KDK is found, so we'll fall back to the closest match
if not download_link:
logging.info("- Could not find KDK, finding closest match")
if self.is_kdk_installed(closest_build) is True:
logging.info(f"- Closest build ({closest_build}) already installed")
self.remove_unused_kdks(exclude_builds=[detected_build, closest_build])
return True, "", closest_build
# Check if this KDK is already installed
self.kdk_installed_path = self._local_kdk_installed_build(self.kdk_url_build)
if self.kdk_installed_path:
logging.info(f"- KDK already installed ({Path(self.kdk_installed_path).name}), skipping")
self.kdk_already_installed = True
self.success = True
return
if closest_match_download_link is None:
msg = "Could not find KDK for host, nor closest match"
logging.info(f"- {msg}")
return False, msg, ""
logging.info("- Following KDK is recommended:")
logging.info(f"- KDK Build: {self.kdk_url_build}")
logging.info(f"- KDK Version: {self.kdk_url_version}")
logging.info(f"- KDK URL: {self.kdk_url}")
logging.info(f"- Closest match: {closest_version} build {closest_build}")
download_link = closest_match_download_link
self.success = True
if utilities.verify_network_connection(download_link):
logging.info("- Downloading KDK")
else:
msg = "Could not contact download site"
logging.info(f"- {msg}")
return False, msg, ""
result = utilities.download_file(download_link, self.constants.kdk_download_path)
def retrieve_download(self):
"""
Returns a DownloadObject for the KDK
if result:
# TODO: should we use the checksum from the API?
result = subprocess.run(["hdiutil", "verify", self.constants.kdk_download_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("Error: Kernel Debug Kit checksum verification failed!")
logging.info(f"Output: {result.stderr}")
msg = "Kernel Debug Kit checksum verification failed, please try again.\n\nIf this continues to fail, ensure you're downloading on a stable network connection (ie. Ethernet)"
logging.info(f"- {msg}")
return False, msg, ""
self.remove_unused_kdks(exclude_builds=[detected_build, closest_build])
return True, "", detected_build
msg = "Failed to download KDK"
logging.info(f"- {msg}")
return False, msg, ""
Returns:
DownloadObject: DownloadObject for the KDK, None if no download required
"""
def is_kdk_installed(self, build):
kexts_to_check = [
self.success = False
self.error_msg = ""
if self.kdk_already_installed:
logging.info("- No download required, KDK already installed")
self.success = True
return None
if self.kdk_url == "":
self.error_msg = "Could not retrieve KDK catalog, no KDK to download"
logging.error(self.error_msg)
return None
logging.info(f"- Returning DownloadObject for KDK: {Path(self.kdk_url).name}")
self.success = True
return network_handler.DownloadObject(self.kdk_url, self.constants.kdk_download_path)
def _local_kdk_valid(self, kdk_path: str):
"""
Validates provided KDK, ensure no corruption
The reason for this is due to macOS deleting files from the KDK during OS updates,
similar to how Install macOS.app is deleted during OS updates
Args:
kdk_path (str): Path to KDK
Returns:
bool: True if valid, False if invalid
"""
KEXT_CATALOG = [
"System.kext/PlugIns/Libkern.kext/Libkern",
"apfs.kext/Contents/MacOS/apfs",
"IOUSBHostFamily.kext/Contents/MacOS/IOUSBHostFamily",
"AMDRadeonX6000.kext/Contents/MacOS/AMDRadeonX6000",
]
if Path("/Library/Developer/KDKs").exists():
for file in Path("/Library/Developer/KDKs").iterdir():
if file.is_dir():
if file.name.endswith(f"{build}.kdk"):
for kext in kexts_to_check:
if not Path(f"{file}/System/Library/Extensions/{kext}").exists():
logging.info(f"- Corrupted KDK found, removing due to missing: {file}/System/Library/Extensions/{kext}")
utilities.elevated(["rm", "-rf", file], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return False
return True
return False
kdk_path = Path(kdk_path)
for kext in KEXT_CATALOG:
if not Path(f"{kdk_path}/System/Library/Extensions/{kext}").exists():
logging.info(f"- Corrupted KDK found, removing due to missing: {kdk_path}/System/Library/Extensions/{kext}")
self._remove_kdk(kdk_path)
return False
return True
def _local_kdk_installed_build(self, build: str = None):
"""
Checks if KDK matching build is installed
If so, validates it has not been corrupted
Returns:
str: Path to KDK if valid, None if not
"""
if build is None:
build = self.host_build
if not Path(KDK_INSTALL_PATH).exists():
return None
for kdk_folder in Path(KDK_INSTALL_PATH).iterdir():
if not kdk_folder.is_dir():
continue
if not kdk_folder.name.endswith(f"{build}.kdk"):
continue
if self._local_kdk_valid(kdk_folder):
return kdk_folder
return None
def _local_kdk_installed_version(self, version: str = None):
"""
Checks if KDK matching version is installed
If so, validates it has not been corrupted
Returns:
str: Path to KDK if valid, None if not
"""
if version is None:
version = self.host_version
if not Path(KDK_INSTALL_PATH).exists():
return None
for kdk_folder in Path(KDK_INSTALL_PATH).iterdir():
if not kdk_folder.is_dir():
continue
if version not in kdk_folder.name:
continue
if self._local_kdk_valid(kdk_folder):
return kdk_folder
return None
def _remove_kdk(self, kdk_path: str):
"""
Removes provided KDK
Args:
kdk_path (str): Path to KDK
"""
if os.getuid() != 0:
logging.warning("- Cannot remove KDK, not running as root")
return
result = utilities.elevated(["rm", "-rf", kdk_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.warning(f"- Failed to remove KDK: {kdk_path}")
logging.warning(f"- {result.stdout.decode('utf-8')}")
logging.info(f"- Successfully removed KDK: {kdk_path}")
def _remove_unused_kdks(self, exclude_builds: list = None):
"""
Removes KDKs that are not in use
Args:
exclude_builds (list, optional): Builds to exclude from removal.
If None, defaults to host and closest match builds.
"""
if exclude_builds is None:
exclude_builds = [
self.kdk_url_build,
self.kdk_closest_match_url_build,
]
def remove_unused_kdks(self, exclude_builds=[]):
if self.constants.should_nuke_kdks is False:
return
if not Path("/Library/Developer/KDKs").exists():
return
if exclude_builds == []:
if not Path(KDK_INSTALL_PATH).exists():
return
logging.info("- Cleaning unused KDKs")
for kdk_folder in Path("/Library/Developer/KDKs").iterdir():
for kdk_folder in Path(KDK_INSTALL_PATH).iterdir():
if kdk_folder.is_dir():
if kdk_folder.name.endswith(".kdk"):
should_remove = True
@@ -151,27 +366,40 @@ class kernel_debug_kit_handler:
break
if should_remove is False:
continue
logging.info(f" - Removing {kdk_folder.name}")
utilities.elevated(["rm", "-rf", kdk_folder], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
self._remove_kdk(kdk_folder)
def kdk_backup_site(self, build):
KDK_MIRROR_REPOSITORY = "https://api.github.com/repos/dortania/KdkSupportPkg/releases"
def validate_kdk_checksum(self, kdk_dmg_path: str = None):
"""
Validates KDK DMG checksum
# Check if tag exists
catalog = requests.get(KDK_MIRROR_REPOSITORY)
if catalog.status_code != 200:
logging.info(f"- Could not contact KDK mirror repository")
return None
Args:
kdk_dmg_path (str, optional): Path to KDK DMG. Defaults to None.
catalog = catalog.json()
Returns:
bool: True if valid, False if invalid
"""
for release in catalog:
if release["tag_name"] == build:
logging.info(f"- Found KDK mirror for build: {build}")
for asset in release["assets"]:
if asset["name"].endswith(".dmg"):
return asset["browser_download_url"]
self.success = False
self.error_msg = ""
logging.info(f"- Could not find KDK mirror for build {build}")
return None
if kdk_dmg_path is None:
kdk_dmg_path = self.constants.kdk_download_path
if not Path(kdk_dmg_path).exists():
logging.error(f"KDK DMG does not exist: {kdk_dmg_path}")
return False
# TODO: should we use the checksum from the API?
result = subprocess.run(["hdiutil", "verify", self.constants.kdk_download_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("- Error: Kernel Debug Kit checksum verification failed!")
logging.info(f"- Output: {result.stderr.decode('utf-8')}")
msg = "Kernel Debug Kit checksum verification failed, please try again.\n\nIf this continues to fail, ensure you're downloading on a stable network connection (ie. Ethernet)"
logging.info(f"- {msg}")
self.error_msg = msg
self._remove_unused_kdks()
self.success = True

View File

@@ -29,12 +29,18 @@ class OpenCoreLegacyPatcher:
self.main_menu()
def initialize_logging(self):
LOG_FILENAME = f"OpenCore-Patcher-v{self.constants.patcher_version}.log"
LOG_FILEPATH = Path(f"~/Library/Logs/{LOG_FILENAME}").expanduser()
if not LOG_FILEPATH.parent.exists():
# Likely in an installer environment, store in /Users/Shared
LOG_FILEPATH = Path("/Users/Shared") / LOG_FILENAME
logging.basicConfig(
level=logging.NOTSET,
format="%(asctime)s - %(filename)s (%(lineno)d): %(message)s",
handlers=[
# TODO: Handle proper file storage
logging.FileHandler(f"OpenCore-Patcher-v{self.constants.patcher_version}.log"),
logging.FileHandler(LOG_FILEPATH),
logging.StreamHandler(),
],
)

View File

@@ -112,6 +112,42 @@ class PatchSysVolume:
logging.info(result.stdout.decode().strip())
return False
def invoke_kdk_handler(self):
# If we're invoked, there is no KDK installed (or something went wrong)
kdk_result = False
error_msg = ""
kdk_obj = kdk_handler.KernelDebugKitObject(self.constants, self.constants.detected_os_build, self.constants.detected_os_version)
if kdk_obj.success is False:
error_msg = kdk_obj.error_msg
return kdk_result, error_msg, None
kdk_download_obj = kdk_obj.retrieve_download()
# We didn't get a download object, something's wrong
if not kdk_download_obj:
if kdk_obj.kdk_already_installed is True:
error_msg = "KDK already installed, function should not have been invoked"
return kdk_result, error_msg, None
else:
error_msg = "Could not retrieve KDK"
return kdk_result, error_msg, None
# Hold thread until download is complete
kdk_download_obj.download(spawn_thread=False)
if kdk_download_obj.download_complete is False:
error_msg = kdk_download_obj.error_msg
return kdk_result, error_msg, None
kdk_result = kdk_obj.validate_kdk_checksum()
downloaded_kdk = self.constants.kdk_download_path
return kdk_result, error_msg, downloaded_kdk
def merge_kdk_with_root(self, save_hid_cs=False):
if self.skip_root_kmutil_requirement is True:
return
@@ -122,7 +158,7 @@ class PatchSysVolume:
kdk_path = sys_patch_helpers.sys_patch_helpers(self.constants).determine_kdk_present(match_closest=False)
if kdk_path is None:
if not self.constants.kdk_download_path.exists():
kdk_result, error_msg, downloaded_kdk = kdk_handler.kernel_debug_kit_handler(self.constants).download_kdk(self.constants.detected_os_version, self.constants.detected_os_build)
kdk_result, error_msg, downloaded_kdk = self.invoke_kdk_handler()
if kdk_result is False:
raise Exception(f"Unable to download KDK: {error_msg}")
sys_patch_helpers.sys_patch_helpers(self.constants).install_kdk()
@@ -348,7 +384,7 @@ class PatchSysVolume:
def delete_nonmetal_enforcement(self):
for arg in ["useMetal", "useIOP"]:
result = subprocess.run(["defaults", "read", "/Library/Preferences/com.apple.CoreDisplay", arg], stdout=subprocess.PIPE).stdout.decode("utf-8").strip()
result = subprocess.run(["defaults", "read", "/Library/Preferences/com.apple.CoreDisplay", arg], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode("utf-8").strip()
if result in ["0", "false", "1", "true"]:
logging.info(f"- Removing non-Metal Enforcement Preference: {arg}")
utilities.elevated(["defaults", "delete", "/Library/Preferences/com.apple.CoreDisplay", arg])