From 23f2291bfa2a9ff31f8a513e3afe837671758280 Mon Sep 17 00:00:00 2001 From: Mykola Grymalyuk Date: Sun, 2 Apr 2023 11:39:55 -0600 Subject: [PATCH] integrity_verification.py: rework into OOP --- CHANGELOG.md | 3 + resources/gui/gui_main.py | 50 +++++----- resources/integrity_verification.py | 145 ++++++++++++++++++++-------- 3 files changed, 133 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee3372b93..7f9610262 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ - Add extra error handling for network errors - Handles `RemoteDisconnected('Remote end closed connection without response')` exceptions - Move root volume patch set generation to dedicated sys_patch_generate.py module + - Refactored integrity_verification.py: + - Implemented Object-Oriented design + - Reduced disk I/O and main thread monopolization - Increment Binaries: - PatcherSupportPkg 0.9.2 - release diff --git a/resources/gui/gui_main.py b/resources/gui/gui_main.py index ac4361da7..1d31afdab 100644 --- a/resources/gui/gui_main.py +++ b/resources/gui/gui_main.py @@ -1950,38 +1950,36 @@ class wx_python_gui: wx.App.Get().Yield() integrity_path = Path(Path(self.constants.payload_path) / Path(apple_integrity_file_link.split("/")[-1])) - if network_handler.DownloadObject(apple_integrity_file_link, integrity_path).download_simple(verify_checksum=False): + chunklist_stream = network_handler.NetworkUtilities().get(apple_integrity_file_link).content + if chunklist_stream: # If we're unable to download the integrity file immediately after downloading the IA, there's a legitimate issue # on Apple's end. # Fail gracefully and just head to installing the IA. utilities.disable_sleep_while_running() - apple_integrity_file = str(integrity_path) - chunks = integrity_verification.generate_chunklist_dict(str(apple_integrity_file)) - if chunks: - max_progress = len(chunks) - self.progress_bar.SetValue(0) - self.progress_bar.SetRange(max_progress) + chunk_obj = integrity_verification.ChunklistVerification(self.constants.payload_path / Path("InstallAssistant.pkg"), chunklist_stream) + if chunk_obj.chunks: + self.progress_bar.SetValue(chunk_obj.current_chunk) + self.progress_bar.SetRange(chunk_obj.total_chunks) wx.App.Get().Yield() - # See integrity_verification.py for more information on the integrity verification process - with Path(self.constants.payload_path / Path("InstallAssistant.pkg")).open("rb") as f: - for chunk in chunks: - status = hashlib.sha256(f.read(chunk["length"])).digest() - if status != chunk["checksum"]: - logging.info(f"Chunk {chunks.index(chunk) + 1} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}") - self.popup = wx.MessageDialog( - self.frame, - f"We've found that Chunk {chunks.index(chunk) + 1} of {len(chunks)} has failed the integrity check.\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)", - "Corrupted Installer!", - style = wx.OK | wx.ICON_EXCLAMATION - ) - self.popup.ShowModal() - self.main_menu() - break - else: - self.progress_bar.SetValue(self.progress_bar.GetValue() + 1) - self.verifying_chunk_label.SetLabel(f"Verifying Chunk {self.progress_bar.GetValue()} of {max_progress}") - wx.App.Get().Yield() + chunk_obj.validate() + + while chunk_obj.status == integrity_verification.ChunklistStatus.IN_PROGRESS: + self.progress_bar.SetValue(chunk_obj.current_chunk) + self.verifying_chunk_label.SetLabel(f"Verifying Chunk {chunk_obj.current_chunk} of {chunk_obj.total_chunks}") + wx.App.Get().Yield() + + if chunk_obj.status == integrity_verification.ChunklistStatus.FAILURE: + self.popup = wx.MessageDialog( + self.frame, + f"We've found that Chunk {chunk_obj.current_chunk} of {chunk_obj.total_chunks} has failed the integrity check.\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)", + "Corrupted Installer!", + style = wx.OK | wx.ICON_EXCLAMATION + ) + self.popup.ShowModal() + self.main_menu() + + logging.info("Integrity check passed!") else: logging.info("Invalid integrity file provided") else: diff --git a/resources/integrity_verification.py b/resources/integrity_verification.py index de3e44317..2ea0d4d4a 100644 --- a/resources/integrity_verification.py +++ b/resources/integrity_verification.py @@ -1,52 +1,119 @@ # Validate the integrity of Apple downloaded files via .chunklist and .integrityDataV1 files # Based off of chunklist.py: # - https://gist.github.com/dhinakg/cbe30edf31ddc153fd0b0c0570c9b041 -# Copyright (C) 2021-2022, Dhinak G, Mykola Grymalyuk +# Copyright (C) 2021-2023, Dhinak G, Mykola Grymalyuk -import binascii +import enum import hashlib import logging +import binascii +import threading + from pathlib import Path CHUNK_LENGTH = 4 + 32 -def generate_chunklist_dict(chunklist): - chunklist = Path(chunklist).read_bytes() if isinstance(chunklist, str) else chunklist - - # Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h#L59-L69 - header = { - "magic": chunklist[:4], - "length": int.from_bytes(chunklist[4:8], "little"), - "fileVersion": chunklist[8], - "chunkMethod": chunklist[9], - "sigMethod": chunklist[10], - "chunkCount": int.from_bytes(chunklist[12:20], "little"), - "chunkOffset": int.from_bytes(chunklist[20:28], "little"), - "sigOffset": int.from_bytes(chunklist[28:36], "little") - } - - if header["magic"] != b"CNKL": - return None - - all_chunks = chunklist[header["chunkOffset"]:header["chunkOffset"]+header["chunkCount"]*CHUNK_LENGTH] - chunks = [{"length": int.from_bytes(all_chunks[i:i+4], "little"), "checksum": all_chunks[i+4:i+CHUNK_LENGTH]} for i in range(0, len(all_chunks), CHUNK_LENGTH)] - - return chunks +class ChunklistStatus(enum.Enum): + """ + Chunklist status + """ + IN_PROGRESS = 0 + SUCCESS = 1 + FAILURE = 2 -def chunk(file_path, chunklist, verbose): - chunks = generate_chunklist_dict(chunklist) - if chunks is None: - return False - with Path(file_path).open("rb") as f: - for chunk in chunks: - status = hashlib.sha256(f.read(chunk["length"])).digest() - if not status == chunk["checksum"]: - logging.info( - f"Chunk {chunks.index(chunk) + 1} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}") - return False - elif verbose: - logging.info( - f"Chunk {chunks.index(chunk) + 1} checksum status success") - return True \ No newline at end of file +class ChunklistVerification: + """ + Library to validate Apple's files against their chunklist format + Supports both chunklist and integrityDataV1 files + - Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h + + Parameters: + file_path (Path): Path to the file to validate + chunklist_path (Path): Path to the chunklist file + + Usage: + >>> chunk_obj = ChunklistVerification("InstallAssistant.pkg", "InstallAssistant.pkg.integrityDataV1") + >>> chunk_obj.validate() + >>> while chunk_obj.status == ChunklistStatus.IN_PROGRESS: + ... print(f"Validating {chunk_obj.current_chunk} of {chunk_obj.total_chunks}") + + >>> if chunk_obj.status == ChunklistStatus.FAILURE: + ... print(chunk_obj.error_msg) + """ + + def __init__(self, file_path: Path, chunklist_path: Path | bytes) -> None: + if isinstance(chunklist_path, bytes): + self.chunklist_path: bytes = chunklist_path + else: + self.chunklist_path: Path = Path(chunklist_path) + self.file_path: Path = Path(file_path) + + self.chunks: dict = self._generate_chunks(self.chunklist_path) + + self.error_msg: str = "" + self.current_chunk: int = 0 + self.total_chunks: int = len(self.chunks) + + self.status: ChunklistStatus = ChunklistStatus.IN_PROGRESS + + + def _generate_chunks(self, chunklist: Path | bytes) -> dict: + """ + Generate a dictionary of the chunklist header and chunks + + Parameters: + chunklist (Path | bytes): Path to the chunklist file or the chunklist file itself + """ + + chunklist: bytes = chunklist if isinstance(chunklist, bytes) else chunklist.read_bytes() + + # Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h#L59-L69 + header: dict = { + "magic": chunklist[:4], + "length": int.from_bytes(chunklist[4:8], "little"), + "fileVersion": chunklist[8], + "chunkMethod": chunklist[9], + "sigMethod": chunklist[10], + "chunkCount": int.from_bytes(chunklist[12:20], "little"), + "chunkOffset": int.from_bytes(chunklist[20:28], "little"), + "sigOffset": int.from_bytes(chunklist[28:36], "little") + } + + if header["magic"] != b"CNKL": + return None + + all_chunks = chunklist[header["chunkOffset"]:header["chunkOffset"]+header["chunkCount"]*CHUNK_LENGTH] + chunks = [{"length": int.from_bytes(all_chunks[i:i+4], "little"), "checksum": all_chunks[i+4:i+CHUNK_LENGTH]} for i in range(0, len(all_chunks), CHUNK_LENGTH)] + + return chunks + + + def _validate(self) -> None: + """ + Validates provided file against chunklist + """ + + if self.chunks is None: + self.status = ChunklistStatus.FAILURE + return + + with self.file_path.open("rb") as f: + for chunk in self.chunks: + self.current_chunk += 1 + status = hashlib.sha256(f.read(chunk["length"])).digest() + if status != chunk["checksum"]: + self.error_msg = f"Chunk {self.current_chunk} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}" + self.status = ChunklistStatus.FAILURE + logging.info(self.error_msg) + return + + self.status = ChunklistStatus.SUCCESS + + + def validate(self) -> None: + """ + Spawns _validate() thread + """ + threading.Thread(target=self._validate).start()