Merge remote-tracking branch 'origin/main' into macos-next

# Conflicts:
#	CHANGELOG.md
#	opencore_legacy_patcher/constants.py
This commit is contained in:
Jazzzny
2025-11-30 23:01:38 -05:00
28 changed files with 1197 additions and 803 deletions

View File

@@ -108,4 +108,5 @@ By default, `CatalogProducts` will only return InstallAssistants. To get all pro
from .url import CatalogURL
from .constants import CatalogVersion, SeedType
from .products import CatalogProducts
from .products import CatalogProducts
from .products_appledb import AppleDBProducts

View File

@@ -214,9 +214,9 @@ class CatalogProducts:
# Remove all but the newest version
for version in supported_versions:
_newest_version = packaging.version.parse("0.0.0")
_latest_stable_version = packaging.version.parse("0.0.0")
# First, determine largest version
# First, determine largest stable version
for installer in products:
if installer["Version"] is None:
continue
@@ -225,26 +225,28 @@ class CatalogProducts:
if installer["Catalog"] in [SeedType.CustomerSeed, SeedType.DeveloperSeed, SeedType.PublicSeed]:
continue
try:
if packaging.version.parse(installer["Version"]) > _newest_version:
_newest_version = packaging.version.parse(installer["Version"])
if packaging.version.parse(installer["Version"]) > _latest_stable_version:
_latest_stable_version = packaging.version.parse(installer["Version"])
except packaging.version.InvalidVersion:
pass
# Next, remove all installers that are not the newest version
# Next, remove all installers that are older than the largest stable version
for installer in products:
if installer["Version"] is None:
continue
if not installer["Version"].startswith(version.value):
continue
try:
if packaging.version.parse(installer["Version"]) < _newest_version:
if packaging.version.parse(installer["Version"]) < _latest_stable_version:
if installer in products_copy:
products_copy.pop(products_copy.index(installer))
except packaging.version.InvalidVersion:
pass
# Remove beta versions if a public release is available
if _newest_version != packaging.version.parse("0.0.0"):
# If there is a largest stable version, remove all betas
# This is to ensure that we only keep the latest stable version where it is available
# but ensure we have a beta if it is the only version available (ie. macOS X.0 betas)
if _latest_stable_version != packaging.version.parse("0.0.0"):
if installer["Catalog"] in [SeedType.CustomerSeed, SeedType.DeveloperSeed, SeedType.PublicSeed]:
if installer in products_copy:
products_copy.pop(products_copy.index(installer))

View File

@@ -0,0 +1,182 @@
"""
products.py: Parse products from Software Update Catalog
"""
import datetime
import hashlib
import logging
import zoneinfo
import packaging.version
from functools import cached_property
from opencore_legacy_patcher import constants
from opencore_legacy_patcher.datasets.os_data import os_data
from ..support import network_handler
APPLEDB_API_URL = "https://api.appledb.dev/ios/macOS/main.json"
class AppleDBProducts:
"""
Fetch InstallAssistants from AppleDB
"""
def __init__(
self,
global_constants: constants.Constants,
max_install_assistant_version: os_data = os_data.sequoia,
) -> None:
self.constants: constants.Constants = global_constants
try:
self.data = (
network_handler.NetworkUtilities()
.get(APPLEDB_API_URL, headers={"User-Agent": f"OCLP/{self.constants.patcher_version}"})
.json()
)
except Exception as e:
self.data = []
logging.error(f"Failed to fetch AppleDB API response: {e}")
return
self.max_ia: os_data = max_install_assistant_version
def _build_installer_name(self, xnu_major: int, beta: bool) -> str:
"""
Builds the installer name based on the version and catalog
"""
try:
return f"macOS {os_data(xnu_major).name.replace('_', ' ').title()}{' Beta' if beta else ''}"
except ValueError:
return f"macOS{' Beta' if beta else ''}"
def _list_latest_installers_only(self, products: list) -> list:
"""
List only the latest installers per macOS version
macOS versions capped at n-3 (n being the latest macOS version)
"""
supported_versions = {
os_data(i): [v for v in products if v["InstallAssistant"]["XNUMajor"] == i] for i in range(self.max_ia - 3, self.max_ia + 1)
}
for versions in supported_versions.values():
versions.sort(key=lambda v: (not v["Beta"], packaging.version.parse(v["RawVersion"])), reverse=True)
return [next(iter(versions)) for versions in supported_versions.values() if versions]
@cached_property
def products(self) -> None:
"""
Returns a list of products from the sucatalog
"""
_products = []
for firmware in self.data:
if firmware.get("internal") or firmware.get("sdk") or firmware.get("rsr"):
continue
# AppleDB does not track whether an installer supports the VMM pseudo-identifier,
# so we will use MacPro7,1, which supports all macOS versions that we care about.
if "MacPro7,1" not in firmware["deviceMap"]:
continue
firmware["raw_version"] = firmware["version"].partition(" ")[0]
xnu_major = int(firmware["build"][:2])
beta = firmware.get("beta") or firmware.get("rc")
details = {
# Dates in AppleDB are in Cupertino time. There are no times, so pin to 10 AM
"PostDate": datetime.datetime.fromisoformat(firmware["released"]).replace(
# hour=10,
# minute=0,
# second=0,
tzinfo=zoneinfo.ZoneInfo("America/Los_Angeles"),
),
"Title": f"{self._build_installer_name(xnu_major, beta)}",
"Build": firmware["build"],
"RawVersion": firmware["raw_version"],
"Version": firmware["version"],
"Beta": beta,
"InstallAssistant": {"XNUMajor": xnu_major},
}
if xnu_major > self.max_ia:
continue
for source in firmware.get("sources", []):
if source["type"] != "installassistant":
continue
if "MacPro7,1" not in source["deviceMap"]:
continue
for link in source["links"]:
if not link["active"]:
continue
if not network_handler.NetworkUtilities(link["url"]).validate_link():
continue
details["InstallAssistant"] |= {
"URL": link["url"],
"Size": source.get("size", 0),
"Checksum": source.get("hashes"),
}
break
else:
continue
break
else:
# No applicable InstallAssistants, or no active sources
continue
_products.append(details)
_products = sorted(_products, key=lambda x: x["Beta"])
_deduplicated_products = []
_seen_builds = set()
# Prevent RCs that were the final release from showing up
for product in _products:
if product["Beta"] and product["Build"] in _seen_builds:
continue
_deduplicated_products.append(product)
_seen_builds.add(product["Build"])
_deduplicated_products = sorted(
_deduplicated_products, key=lambda x: (packaging.version.parse(x["RawVersion"]), x["Build"], not x["Beta"])
)
return _deduplicated_products
@cached_property
def latest_products(self) -> list:
"""
Returns a list of the latest products from the sucatalog
"""
return self._list_latest_installers_only(self.products)
def checksum_for_product(self, product: dict):
"""
Returns the checksum and algorithm for a given product
"""
HASH_TO_ALGO = {"md5": hashlib.md5, "sha1": hashlib.sha1, "sha2-256": hashlib.sha256, "sha2-512": hashlib.sha512}
if not product.get("InstallAssistant", {}).get("Checksum"):
return None, None
for algo, hash_func in HASH_TO_ALGO.items():
if algo in product["InstallAssistant"]["Checksum"]:
return product["InstallAssistant"]["Checksum"][algo], hash_func()
return None, None

View File

@@ -13,7 +13,7 @@ import enum
import hashlib
import atexit
from typing import Union
from typing import Optional, Union
from pathlib import Path
from . import utilities
@@ -65,17 +65,15 @@ class NetworkUtilities:
def validate_link(self) -> bool:
"""
Check for 404 error
Check for error
Returns:
bool: True if link is valid, False otherwise
"""
try:
response = SESSION.head(self.url, timeout=5, allow_redirects=True)
if response.status_code == 404:
return False
else:
return True
response.raise_for_status()
return True
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
@@ -162,7 +160,7 @@ class DownloadObject:
"""
def __init__(self, url: str, path: str) -> None:
def __init__(self, url: str, path: str, checksum_algo: Optional["hashlib._Hash"] = None) -> None:
self.url: str = url
self.status: str = DownloadStatus.INACTIVE
self.error_msg: str = ""
@@ -181,10 +179,8 @@ class DownloadObject:
self.active_thread: threading.Thread = None
self.should_checksum: bool = False
self.checksum = None
self._checksum_storage: hash = None
self._checksum_storage: Optional[hashlib._Hash] = checksum_algo
if self.has_network:
self._populate_file_size()
@@ -194,7 +190,7 @@ class DownloadObject:
self.stop()
def download(self, display_progress: bool = False, spawn_thread: bool = True, verify_checksum: bool = False) -> None:
def download(self, display_progress: bool = False, spawn_thread: bool = True) -> None:
"""
Download the file
@@ -204,7 +200,7 @@ class DownloadObject:
Parameters:
display_progress (bool): Display progress in console
spawn_thread (bool): Spawn a thread to download the file, otherwise download in the current thread
verify_checksum (bool): Calculate checksum of downloaded file if True
verify_checksum (Optional[hashlib._Hash]): Checksum algorithm to use for verifying the download, optional
"""
self.status = DownloadStatus.DOWNLOADING
@@ -213,12 +209,10 @@ class DownloadObject:
if self.active_thread:
logging.error("Download already in progress")
return
self.should_checksum = verify_checksum
self.active_thread = threading.Thread(target=self._download, args=(display_progress,))
self.active_thread.start()
return
self.should_checksum = verify_checksum
self._download(display_progress)
@@ -235,15 +229,14 @@ class DownloadObject:
"""
if verify_checksum:
self.should_checksum = True
self.checksum = hashlib.sha256()
self._checksum_storage = hashlib.sha256()
self.download(spawn_thread=False)
if not self.download_complete:
return False
return self.checksum.hexdigest() if self.checksum else True
return self._checksum_storage.hexdigest() if self._checksum_storage else True
def _get_filename(self) -> str:
@@ -283,7 +276,8 @@ class DownloadObject:
Parameters:
chunk (bytes): Chunk to update checksum with
"""
self._checksum_storage.update(chunk)
if self._checksum_storage:
self._checksum_storage.update(chunk)
def _prepare_working_directory(self, path: Path) -> bool:
@@ -353,7 +347,7 @@ class DownloadObject:
if chunk:
file.write(chunk)
self.downloaded_file_size += len(chunk)
if self.should_checksum:
if self._checksum_storage:
self._update_checksum(chunk)
if display_progress and i % 100:
# Don't use logging here, as we'll be spamming the log file
@@ -368,6 +362,9 @@ class DownloadObject:
logging.info(f"- Time elapsed: {(time.time() - self.start_time):.2f} seconds")
logging.info(f"- Speed: {utilities.human_fmt(self.downloaded_file_size / (time.time() - self.start_time))}/s")
logging.info(f"- Location: {self.filepath}")
if self._checksum_storage:
self.checksum = self._checksum_storage.hexdigest()
logging.info(f"Checksum: {self.checksum}")
except Exception as e:
self.error = True
self.error_msg = str(e)

View File

@@ -46,6 +46,7 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.title: str = title
self.parent: wx.Frame = parent
self.catalog_products = None
self.available_installers = None
self.available_installers_latest = None
@@ -135,15 +136,15 @@ class macOSInstallerDownloadFrame(wx.Frame):
# Grab installer catalog
def _fetch_installers():
logging.info(f"Fetching installer catalog: {sucatalog.SeedType.DeveloperSeed.name}")
logging.info(f"Fetching AppleDB products")
sucatalog_contents = sucatalog.CatalogURL(seed=sucatalog.SeedType.DeveloperSeed).url_contents
if sucatalog_contents is None:
logging.error("Failed to download Installer Catalog from Apple")
self.catalog_products = sucatalog.AppleDBProducts(self.constants)
if self.catalog_products.data is None:
logging.error("Failed to fetch installers from AppleDB")
return
self.available_installers = sucatalog.CatalogProducts(sucatalog_contents).products
self.available_installers_latest = sucatalog.CatalogProducts(sucatalog_contents).latest_products
self.available_installers = self.catalog_products.products
self.available_installers_latest = self.catalog_products.latest_products
thread = threading.Thread(target=_fetch_installers)
@@ -165,7 +166,7 @@ class macOSInstallerDownloadFrame(wx.Frame):
bundles = [wx.BitmapBundle.FromBitmaps(icon) for icon in self.icons]
self.frame_modal.Destroy()
self.frame_modal = wx.Dialog(self, title="Select macOS Installer", size=(505, 500))
self.frame_modal = wx.Dialog(self, title="Select macOS Installer", size=(550, 500))
# Title: Select macOS Installer
title_label = wx.StaticText(self.frame_modal, label="Select macOS Installer", pos=(-1,-1))
@@ -177,19 +178,19 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.list = wx.ListCtrl(self.frame_modal, id, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER | wx.BORDER_SUNKEN)
self.list.SetSmallImages(bundles)
self.list.InsertColumn(0, "Title", width=175)
self.list.InsertColumn(1, "Version", width=50)
self.list.InsertColumn(0, "Title", width=190 if show_full else 150)
self.list.InsertColumn(1, "Version", width=80 if show_full else 50)
self.list.InsertColumn(2, "Build", width=75)
self.list.InsertColumn(3, "Size", width=75)
self.list.InsertColumn(4, "Release Date", width=100)
installers = self.available_installers_latest if show_full is False else self.available_installers
if show_full is False:
self.frame_modal.SetSize((490, 370))
self.frame_modal.SetSize((480, 370))
if installers:
locale.setlocale(locale.LC_TIME, '')
logging.info(f"Available installers on SUCatalog ({'All entries' if show_full else 'Latest only'}):")
logging.info(f"Available installers from AppleDB ({'All entries' if show_full else 'Latest only'}):")
for item in installers:
logging.info(f"- {item['Title']} ({item['Version']} - {item['Build']}):\n - Size: {utilities.human_fmt(item['InstallAssistant']['Size'])}\n - Link: {item['InstallAssistant']['URL']}\n")
index = self.list.InsertItem(self.list.GetItemCount(), f"{item['Title']}")
@@ -199,8 +200,8 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.list.SetItem(index, 3, utilities.human_fmt(item['InstallAssistant']['Size']))
self.list.SetItem(index, 4, item['PostDate'].strftime("%x"))
else:
logging.error("No installers found on SUCatalog")
wx.MessageDialog(self.frame_modal, "Failed to download Installer Catalog from Apple", "Error", wx.OK | wx.ICON_ERROR).ShowModal()
logging.error("No installers found from AppleDB")
wx.MessageDialog(self.frame_modal, "Failed to fetch installers from AppleDB", "Error", wx.OK | wx.ICON_ERROR).ShowModal()
if show_full is False:
self.list.Select(-1)
@@ -319,7 +320,11 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.frame_modal.Close()
download_obj = network_handler.DownloadObject(selected_installer['InstallAssistant']['URL'], self.constants.payload_path / "InstallAssistant.pkg")
expected_checksum, checksum_algo = self.catalog_products.checksum_for_product(selected_installer)
download_obj = network_handler.DownloadObject(
selected_installer["InstallAssistant"]["URL"], self.constants.payload_path / "InstallAssistant.pkg", checksum_algo=checksum_algo
)
gui_download.DownloadFrame(
self,
@@ -334,24 +339,31 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.on_return_to_main_menu()
return
self._validate_installer(selected_installer['InstallAssistant']['IntegrityDataURL'])
self._validate_installer(expected_checksum, download_obj.checksum)
def _validate_installer(self, chunklist_link: str) -> None:
def _validate_installer(self, expected_checksum: str, calculated_checksum: str) -> None:
"""
Validate macOS installer
"""
if expected_checksum != calculated_checksum:
logging.error(f"Checksum validation failed: Expected {expected_checksum}, got {calculated_checksum}")
wx.MessageBox(f"Checksum validation failed!\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)", "Corrupted Installer!", wx.OK | wx.ICON_ERROR)
self.on_return_to_main_menu()
return
self.SetSize((300, 200))
for child in self.GetChildren():
child.Destroy()
# Title: Validating macOS Installer
title_label = wx.StaticText(self, label="Validating macOS Installer", pos=(-1,5))
logging.info("macOS installer validated")
title_label = wx.StaticText(self, label="Extracting macOS Installer", pos=(-1,5))
title_label.SetFont(gui_support.font_factory(19, wx.FONTWEIGHT_BOLD))
title_label.Centre(wx.HORIZONTAL)
# Label: Validating chunk 0 of 0
chunk_label = wx.StaticText(self, label="Validating chunk 0 of 0", pos=(-1, title_label.GetPosition()[1] + title_label.GetSize()[1] + 5))
chunk_label = wx.StaticText(self, label="May take a few minutes...", pos=(-1, title_label.GetPosition()[1] + title_label.GetSize()[1] + 5))
chunk_label.SetFont(gui_support.font_factory(13, wx.FONTWEIGHT_NORMAL))
chunk_label.Centre(wx.HORIZONTAL)
@@ -363,39 +375,6 @@ class macOSInstallerDownloadFrame(wx.Frame):
self.SetSize((-1, progress_bar.GetPosition()[1] + progress_bar.GetSize()[1] + 40))
self.Show()
chunklist_stream = network_handler.NetworkUtilities().get(chunklist_link).content
if chunklist_stream:
logging.info("Validating macOS installer")
utilities.disable_sleep_while_running()
chunk_obj = integrity_verification.ChunklistVerification(self.constants.payload_path / Path("InstallAssistant.pkg"), chunklist_stream)
if chunk_obj.chunks:
progress_bar.SetValue(chunk_obj.current_chunk)
progress_bar.SetRange(chunk_obj.total_chunks)
wx.App.Get().Yield()
chunk_obj.validate()
while chunk_obj.status == integrity_verification.ChunklistStatus.IN_PROGRESS:
progress_bar.SetValue(chunk_obj.current_chunk)
chunk_label.SetLabel(f"Validating chunk {chunk_obj.current_chunk} of {chunk_obj.total_chunks}")
chunk_label.Centre(wx.HORIZONTAL)
wx.App.Get().Yield()
if chunk_obj.status == integrity_verification.ChunklistStatus.FAILURE:
logging.error(f"Chunklist validation failed: Hash mismatch on {chunk_obj.current_chunk}")
wx.MessageBox(f"Chunklist validation failed: Hash mismatch on {chunk_obj.current_chunk}\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)", "Corrupted Installer!", wx.OK | wx.ICON_ERROR)
self.on_return_to_main_menu()
return
logging.info("macOS installer validated")
# Extract installer
title_label.SetLabel("Extracting macOS Installer")
title_label.Centre(wx.HORIZONTAL)
chunk_label.SetLabel("May take a few minutes...")
chunk_label.Centre(wx.HORIZONTAL)
progress_bar_animation = gui_support.GaugePulseCallback(self.constants, progress_bar)
progress_bar_animation.start_pulse()