integrity_verification.py: rework into OOP

This commit is contained in:
Mykola Grymalyuk
2023-04-02 11:39:55 -06:00
parent b0c05b914b
commit 23f2291bfa
3 changed files with 133 additions and 65 deletions

View File

@@ -16,6 +16,9 @@
- Add extra error handling for network errors - Add extra error handling for network errors
- Handles `RemoteDisconnected('Remote end closed connection without response')` exceptions - Handles `RemoteDisconnected('Remote end closed connection without response')` exceptions
- Move root volume patch set generation to dedicated sys_patch_generate.py module - Move root volume patch set generation to dedicated sys_patch_generate.py module
- Refactored integrity_verification.py:
- Implemented Object-Oriented design
- Reduced disk I/O and main thread monopolization
- Increment Binaries: - Increment Binaries:
- PatcherSupportPkg 0.9.2 - release - PatcherSupportPkg 0.9.2 - release

View File

@@ -1950,38 +1950,36 @@ class wx_python_gui:
wx.App.Get().Yield() wx.App.Get().Yield()
integrity_path = Path(Path(self.constants.payload_path) / Path(apple_integrity_file_link.split("/")[-1])) integrity_path = Path(Path(self.constants.payload_path) / Path(apple_integrity_file_link.split("/")[-1]))
if network_handler.DownloadObject(apple_integrity_file_link, integrity_path).download_simple(verify_checksum=False): chunklist_stream = network_handler.NetworkUtilities().get(apple_integrity_file_link).content
if chunklist_stream:
# If we're unable to download the integrity file immediately after downloading the IA, there's a legitimate issue # If we're unable to download the integrity file immediately after downloading the IA, there's a legitimate issue
# on Apple's end. # on Apple's end.
# Fail gracefully and just head to installing the IA. # Fail gracefully and just head to installing the IA.
utilities.disable_sleep_while_running() utilities.disable_sleep_while_running()
apple_integrity_file = str(integrity_path) chunk_obj = integrity_verification.ChunklistVerification(self.constants.payload_path / Path("InstallAssistant.pkg"), chunklist_stream)
chunks = integrity_verification.generate_chunklist_dict(str(apple_integrity_file)) if chunk_obj.chunks:
if chunks: self.progress_bar.SetValue(chunk_obj.current_chunk)
max_progress = len(chunks) self.progress_bar.SetRange(chunk_obj.total_chunks)
self.progress_bar.SetValue(0)
self.progress_bar.SetRange(max_progress)
wx.App.Get().Yield() wx.App.Get().Yield()
# See integrity_verification.py for more information on the integrity verification process chunk_obj.validate()
with Path(self.constants.payload_path / Path("InstallAssistant.pkg")).open("rb") as f:
for chunk in chunks: while chunk_obj.status == integrity_verification.ChunklistStatus.IN_PROGRESS:
status = hashlib.sha256(f.read(chunk["length"])).digest() self.progress_bar.SetValue(chunk_obj.current_chunk)
if status != chunk["checksum"]: self.verifying_chunk_label.SetLabel(f"Verifying Chunk {chunk_obj.current_chunk} of {chunk_obj.total_chunks}")
logging.info(f"Chunk {chunks.index(chunk) + 1} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}") wx.App.Get().Yield()
self.popup = wx.MessageDialog(
self.frame, if chunk_obj.status == integrity_verification.ChunklistStatus.FAILURE:
f"We've found that Chunk {chunks.index(chunk) + 1} of {len(chunks)} has failed the integrity check.\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)", self.popup = wx.MessageDialog(
"Corrupted Installer!", self.frame,
style = wx.OK | wx.ICON_EXCLAMATION f"We've found that Chunk {chunk_obj.current_chunk} of {chunk_obj.total_chunks} has failed the integrity check.\n\nThis generally happens when downloading on unstable connections such as WiFi or cellular.\n\nPlease try redownloading again on a stable connection (ie. Ethernet)",
) "Corrupted Installer!",
self.popup.ShowModal() style = wx.OK | wx.ICON_EXCLAMATION
self.main_menu() )
break self.popup.ShowModal()
else: self.main_menu()
self.progress_bar.SetValue(self.progress_bar.GetValue() + 1)
self.verifying_chunk_label.SetLabel(f"Verifying Chunk {self.progress_bar.GetValue()} of {max_progress}") logging.info("Integrity check passed!")
wx.App.Get().Yield()
else: else:
logging.info("Invalid integrity file provided") logging.info("Invalid integrity file provided")
else: else:

View File

@@ -1,52 +1,119 @@
# Validate the integrity of Apple downloaded files via .chunklist and .integrityDataV1 files # Validate the integrity of Apple downloaded files via .chunklist and .integrityDataV1 files
# Based off of chunklist.py: # Based off of chunklist.py:
# - https://gist.github.com/dhinakg/cbe30edf31ddc153fd0b0c0570c9b041 # - https://gist.github.com/dhinakg/cbe30edf31ddc153fd0b0c0570c9b041
# Copyright (C) 2021-2022, Dhinak G, Mykola Grymalyuk # Copyright (C) 2021-2023, Dhinak G, Mykola Grymalyuk
import binascii import enum
import hashlib import hashlib
import logging import logging
import binascii
import threading
from pathlib import Path from pathlib import Path
CHUNK_LENGTH = 4 + 32 CHUNK_LENGTH = 4 + 32
def generate_chunklist_dict(chunklist): class ChunklistStatus(enum.Enum):
chunklist = Path(chunklist).read_bytes() if isinstance(chunklist, str) else chunklist """
Chunklist status
# Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h#L59-L69 """
header = { IN_PROGRESS = 0
"magic": chunklist[:4], SUCCESS = 1
"length": int.from_bytes(chunklist[4:8], "little"), FAILURE = 2
"fileVersion": chunklist[8],
"chunkMethod": chunklist[9],
"sigMethod": chunklist[10],
"chunkCount": int.from_bytes(chunklist[12:20], "little"),
"chunkOffset": int.from_bytes(chunklist[20:28], "little"),
"sigOffset": int.from_bytes(chunklist[28:36], "little")
}
if header["magic"] != b"CNKL":
return None
all_chunks = chunklist[header["chunkOffset"]:header["chunkOffset"]+header["chunkCount"]*CHUNK_LENGTH]
chunks = [{"length": int.from_bytes(all_chunks[i:i+4], "little"), "checksum": all_chunks[i+4:i+CHUNK_LENGTH]} for i in range(0, len(all_chunks), CHUNK_LENGTH)]
return chunks
def chunk(file_path, chunklist, verbose): class ChunklistVerification:
chunks = generate_chunklist_dict(chunklist) """
if chunks is None: Library to validate Apple's files against their chunklist format
return False Supports both chunklist and integrityDataV1 files
with Path(file_path).open("rb") as f: - Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h
for chunk in chunks:
status = hashlib.sha256(f.read(chunk["length"])).digest() Parameters:
if not status == chunk["checksum"]: file_path (Path): Path to the file to validate
logging.info( chunklist_path (Path): Path to the chunklist file
f"Chunk {chunks.index(chunk) + 1} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}")
return False Usage:
elif verbose: >>> chunk_obj = ChunklistVerification("InstallAssistant.pkg", "InstallAssistant.pkg.integrityDataV1")
logging.info( >>> chunk_obj.validate()
f"Chunk {chunks.index(chunk) + 1} checksum status success") >>> while chunk_obj.status == ChunklistStatus.IN_PROGRESS:
return True ... print(f"Validating {chunk_obj.current_chunk} of {chunk_obj.total_chunks}")
>>> if chunk_obj.status == ChunklistStatus.FAILURE:
... print(chunk_obj.error_msg)
"""
def __init__(self, file_path: Path, chunklist_path: Path | bytes) -> None:
if isinstance(chunklist_path, bytes):
self.chunklist_path: bytes = chunklist_path
else:
self.chunklist_path: Path = Path(chunklist_path)
self.file_path: Path = Path(file_path)
self.chunks: dict = self._generate_chunks(self.chunklist_path)
self.error_msg: str = ""
self.current_chunk: int = 0
self.total_chunks: int = len(self.chunks)
self.status: ChunklistStatus = ChunklistStatus.IN_PROGRESS
def _generate_chunks(self, chunklist: Path | bytes) -> dict:
"""
Generate a dictionary of the chunklist header and chunks
Parameters:
chunklist (Path | bytes): Path to the chunklist file or the chunklist file itself
"""
chunklist: bytes = chunklist if isinstance(chunklist, bytes) else chunklist.read_bytes()
# Ref: https://github.com/apple-oss-distributions/xnu/blob/xnu-8020.101.4/bsd/kern/chunklist.h#L59-L69
header: dict = {
"magic": chunklist[:4],
"length": int.from_bytes(chunklist[4:8], "little"),
"fileVersion": chunklist[8],
"chunkMethod": chunklist[9],
"sigMethod": chunklist[10],
"chunkCount": int.from_bytes(chunklist[12:20], "little"),
"chunkOffset": int.from_bytes(chunklist[20:28], "little"),
"sigOffset": int.from_bytes(chunklist[28:36], "little")
}
if header["magic"] != b"CNKL":
return None
all_chunks = chunklist[header["chunkOffset"]:header["chunkOffset"]+header["chunkCount"]*CHUNK_LENGTH]
chunks = [{"length": int.from_bytes(all_chunks[i:i+4], "little"), "checksum": all_chunks[i+4:i+CHUNK_LENGTH]} for i in range(0, len(all_chunks), CHUNK_LENGTH)]
return chunks
def _validate(self) -> None:
"""
Validates provided file against chunklist
"""
if self.chunks is None:
self.status = ChunklistStatus.FAILURE
return
with self.file_path.open("rb") as f:
for chunk in self.chunks:
self.current_chunk += 1
status = hashlib.sha256(f.read(chunk["length"])).digest()
if status != chunk["checksum"]:
self.error_msg = f"Chunk {self.current_chunk} checksum status FAIL: chunk sum {binascii.hexlify(chunk['checksum']).decode()}, calculated sum {binascii.hexlify(status).decode()}"
self.status = ChunklistStatus.FAILURE
logging.info(self.error_msg)
return
self.status = ChunklistStatus.SUCCESS
def validate(self) -> None:
"""
Spawns _validate() thread
"""
threading.Thread(target=self._validate).start()