Finish migration to network_handler.py usage

This commit is contained in:
Mykola Grymalyuk
2023-02-04 13:11:18 -07:00
parent 976f14eeb3
commit 6c294902c2
10 changed files with 96 additions and 219 deletions

View File

@@ -18,6 +18,7 @@
- Allows for more reliable network calls and downloads
- Better supports network timeouts and disconnects
- Dramatically less noise in console during downloads
- Remove unused sys_patch_downloader.py module
- Increment Binaries:
- PatcherSupportPkg 0.8.3 - release

View File

@@ -20,7 +20,7 @@ import py_sip_xnu
import logging
from resources import constants, defaults, install, installer, utilities, run, generate_smbios, updates, integrity_verification, global_settings, kdk_handler, network_handler
from resources.sys_patch import sys_patch_download, sys_patch_detect, sys_patch
from resources.sys_patch import sys_patch_detect, sys_patch
from resources.build import build
from data import model_array, os_data, smbios_data, sip_data, cpu_data
from resources.gui import menu_redirect, gui_help
@@ -1265,25 +1265,6 @@ class wx_python_gui:
self.progress_bar.Hide()
# Download resources
logging.getLogger().handlers[1].stream = menu_redirect.RedirectLabel(self.developer_note)
download_result, link = sys_patch_download.grab_patcher_support_pkg(self.constants).download_files()
logging.getLogger().handlers[1].stream = self.stock_stream
if download_result is None:
# Create popup window to inform user of error
self.popup = wx.MessageDialog(
self.frame_modal,
"A problem occurred trying to download PatcherSupportPkg binaries\n\nIf you continue to have this error, download an Offline build from Github\nThese builds don't require a network connection to root patch",
"Network Error",
wx.YES_NO | wx.ICON_ERROR
)
self.popup.SetYesNoLabels("View on Github", "Ignore")
answer = self.popup.ShowModal()
if answer == wx.ID_YES:
webbrowser.open(self.constants.repo_link_latest)
self.main_menu()
if self.patches["Settings: Kernel Debug Kit missing"] is True:
# Download KDK (if needed)
self.subheader.SetLabel("Downloading Kernel Debug Kit")
@@ -1798,7 +1779,7 @@ class wx_python_gui:
self.download_label.Centre(wx.HORIZONTAL)
self.download_label_2 = wx.StaticText(self.frame, label="")
self.download_label_2.SetFont(wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.BOLD))
self.download_label_2.SetFont(wx.Font(12, wx.DEFAULT, wx.NORMAL, wx.NORMAL))
self.download_label_2.SetPosition(
wx.Point(
self.download_label.GetPosition().x,
@@ -1849,7 +1830,7 @@ class wx_python_gui:
# Download macOS install data
if ia_download.download_complete:
if ia_download.download_complete is True:
self.download_label.SetLabel(f"Finished Downloading {installer_name}")
self.download_label.Centre(wx.HORIZONTAL)
wx.App.Get().Yield()
@@ -1895,7 +1876,7 @@ class wx_python_gui:
self.return_to_main_menu.SetPosition(
wx.Point(
self.progress_bar.GetPosition().x,
self.progress_bar.GetPosition().y + self.progress_bar.GetSize().height + 40
self.progress_bar.GetPosition().y + self.progress_bar.GetSize().height + 10
)
)
self.return_to_main_menu.Bind(wx.EVT_BUTTON, self.main_menu)
@@ -1905,7 +1886,11 @@ class wx_python_gui:
wx.App.Get().Yield()
integrity_path = Path(Path(self.constants.payload_path) / Path(apple_integrity_file_link.split("/")[-1]))
if utilities.download_file(apple_integrity_file_link, integrity_path, verify_checksum=False):
integrity_download = network_handler.DownloadObject(apple_integrity_file_link, integrity_path)
integrity_download.download(spawn_thread=False)
if network_handler.DownloadObject(apple_integrity_file_link, integrity_path).download_simple(verify_checksum=False):
# If we're unable to download the integrity file immediately after downloading the IA, there's a legitimate issue
# on Apple's end.
# Fail gracefully and just head to installing the IA.
@@ -2325,14 +2310,10 @@ class wx_python_gui:
else:
path = self.constants.installer_pkg_path
autopkg_download = network_handler.DownloadObject(link, path)
autopkg_download.download()
autopkg_download.download(spawn_thread=False)
while autopkg_download.is_active():
time.sleep(0.1)
if autopkg_download.download_complete:
if autopkg_download.download_complete is True:
# Download thread will re-enable Idle Sleep after downloading
utilities.disable_sleep_while_running()
if str(path).endswith(".zip"):

View File

@@ -4,7 +4,7 @@ import plistlib
import subprocess
import tempfile
import logging
from resources import utilities, tui_helpers
from resources import utilities, tui_helpers, network_handler
def list_local_macOS_installers():
# Finds all applicable macOS installers
@@ -132,7 +132,10 @@ def create_installer(installer_path, volume_name):
def download_install_assistant(download_path, ia_link):
# Downloads InstallAssistant.pkg
if utilities.download_file(ia_link, (Path(download_path) / Path("InstallAssistant.pkg"))):
ia_download = network_handler.DownloadObject(ia_link, (Path(download_path) / Path("InstallAssistant.pkg")))
ia_download.download(display_progress=True, spawn_thread=False)
if ia_download.download_complete is True:
return True
return False
@@ -165,9 +168,9 @@ def list_downloadable_macOS_installers(download_path, catalog):
else:
link = "https://swscan.apple.com/content/catalogs/others/index-13-12-10.16-10.15-10.14-10.13-10.12-10.11-10.10-10.9-mountainlion-lion-snowleopard-leopard.merged-1.sucatalog"
if utilities.verify_network_connection(link) is True:
if network_handler.NetworkUtilities(link).verify_network_connection() is True:
try:
catalog_plist = plistlib.loads(utilities.SESSION.get(link).content)
catalog_plist = plistlib.loads(network_handler.SESSION.get(link).content)
except plistlib.InvalidFileException:
return available_apps
@@ -181,7 +184,7 @@ def list_downloadable_macOS_installers(download_path, catalog):
for bm_package in catalog_plist["Products"][item]["Packages"]:
if "Info.plist" in bm_package["URL"] and "InstallInfo.plist" not in bm_package["URL"]:
try:
build_plist = plistlib.loads(utilities.SESSION.get(bm_package["URL"]).content)
build_plist = plistlib.loads(network_handler.SESSION.get(bm_package["URL"]).content)
except plistlib.InvalidFileException:
continue
# Ensure Apple Silicon specific Installers are not listed

View File

@@ -8,6 +8,7 @@ import requests
import threading
import logging
import enum
import hashlib
from pathlib import Path
from resources import utilities
@@ -31,9 +32,12 @@ class NetworkUtilities:
Utilities for network related tasks, primarily used for downloading files
"""
def __init__(self, url: str):
def __init__(self, url: str = None):
self.url: str = url
if self.url is None:
self.url = "https://github.com"
def verify_network_connection(self):
"""
@@ -44,8 +48,7 @@ class NetworkUtilities:
"""
try:
response = requests.head(self.url, timeout=5, allow_redirects=True)
return True
return True if requests.head(self.url, timeout=5, allow_redirects=True) else False
except (
requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
@@ -92,6 +95,11 @@ class DownloadObject:
self.active_thread: threading.Thread = None
self.should_checksum: bool = False
self.checksum = None
self._checksum_storage: hash = None
if self.has_network:
self._populate_file_size()
@@ -100,7 +108,7 @@ class DownloadObject:
self.stop()
def download(self, display_progress: bool = False, spawn_thread: bool = True):
def download(self, display_progress: bool = False, spawn_thread: bool = True, verify_checksum: bool = False):
"""
Download the file
@@ -110,6 +118,7 @@ class DownloadObject:
Parameters:
display_progress (bool): Display progress in console
spawn_thread (bool): Spawn a thread to download the file, otherwise download in the current thread
verify_checksum (bool): Calculate checksum of downloaded file if True
"""
self.status = DownloadStatus.DOWNLOADING
@@ -118,10 +127,36 @@ class DownloadObject:
if self.active_thread:
logging.error("Download already in progress")
return
self.should_checksum = verify_checksum
self.active_thread = threading.Thread(target=self._download, args=(display_progress,))
self.active_thread.start()
else:
self._download(display_progress)
return
self.should_checksum = verify_checksum
self._download(display_progress)
def download_simple(self, verify_checksum: bool = False):
"""
Alternative to download(), mimics utilities.py's old download_file() function
Parameters:
verify_checksum (bool): Return checksum of downloaded file if True
Returns:
If verify_checksum is True, returns the checksum of the downloaded file
Otherwise, returns True if download was successful, False otherwise
"""
if verify_checksum:
self.should_checksum = True
self.checksum = hashlib.sha256()
self.download(spawn_thread=False)
if not self.download_complete:
return False
return self.checksum.hexdigest() if self.checksum else True
def _get_filename(self):
@@ -143,7 +178,7 @@ class DownloadObject:
"""
try:
result = requests.head(self.url, allow_redirects=True, timeout=5)
result = SESSION.head(self.url, allow_redirects=True, timeout=5)
if 'Content-Length' in result.headers:
self.total_file_size = float(result.headers['Content-Length'])
else:
@@ -154,9 +189,19 @@ class DownloadObject:
self.total_file_size = 0.0
def _update_checksum(self, chunk: bytes):
"""
Update checksum with new chunk
Parameters:
chunk (bytes): Chunk to update checksum with
"""
self._checksum_storage.update(chunk)
def _prepare_working_directory(self, path: Path):
"""
Delete the file if it already exists
Validates working enviroment, including free space and removing existing files
Parameters:
path (str): Path to the file
@@ -170,9 +215,17 @@ class DownloadObject:
logging.info(f"Deleting existing file: {path}")
Path(path).unlink()
return True
if not Path(path).parent.exists():
logging.info(f"Creating directory: {Path(path).parent}")
Path(path).parent.mkdir(parents=True, exist_ok=True)
available_space = utilities.get_free_space()
if self.total_file_size > available_space:
msg = f"Not enough free space to download {self.filename}, need {utilities.human_fmt(self.total_file_size)}, have {utilities.human_fmt(available_space)}"
logging.error(msg)
raise Exception(msg)
except Exception as e:
self.error = True
self.error_msg = str(e)
@@ -211,6 +264,8 @@ class DownloadObject:
if chunk:
file.write(chunk)
self.downloaded_file_size += len(chunk)
if self.should_checksum:
self._update_checksum(chunk)
if display_progress and i % 100:
# Don't use logging here, as we'll be spamming the log file
if self.total_file_size == 0.0:

View File

@@ -40,7 +40,7 @@ from datetime import datetime
import logging
from resources import constants, utilities, kdk_handler
from resources.sys_patch import sys_patch_download, sys_patch_detect, sys_patch_auto, sys_patch_helpers
from resources.sys_patch import sys_patch_detect, sys_patch_auto, sys_patch_helpers
from data import os_data
@@ -661,42 +661,12 @@ class PatchSysVolume:
def check_files(self):
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Found local Apple Binaries")
if self.constants.gui_mode is False:
patch_input = input("Would you like to redownload?(y/n): ")
if patch_input in {"y", "Y", "yes", "Yes"}:
shutil.rmtree(Path(self.constants.payload_local_binaries_root_path))
output = self.download_files()
else:
output = True
else:
output = self.download_files()
else:
output = self.download_files()
return output
logging.info("- Local PatcherSupportPkg resources available, continuing...")
return True
def download_files(self):
if self.constants.cli_mode is True:
download_result, link = sys_patch_download.grab_patcher_support_pkg(self.constants).download_files()
else:
download_result = True
link = sys_patch_download.grab_patcher_support_pkg(self.constants).generate_pkg_link()
logging.info("- PatcherSupportPkg resources missing, Patcher likely corrupted!!!")
return False
if download_result and self.constants.payload_local_binaries_root_path_zip.exists():
logging.info("- Unzipping binaries...")
utilities.process_status(subprocess.run(["ditto", "-V", "-x", "-k", "--sequesterRsrc", "--rsrc", self.constants.payload_local_binaries_root_path_zip, self.constants.payload_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
logging.info("- Binaries downloaded to:")
logging.info(self.constants.payload_path)
return self.constants.payload_local_binaries_root_path
else:
if self.constants.gui_mode is True:
logging.info("- Download failed, please verify the below link work:")
logging.info(link)
logging.info("\nIf you continue to have issues, try using the Offline builds")
logging.info("located on Github next to the other builds")
else:
input("\nPress enter to continue")
return None
# Entry Function
def start_patch(self):

View File

@@ -13,7 +13,7 @@ import plistlib
import subprocess
import webbrowser
import logging
from resources import utilities, updates, global_settings
from resources import utilities, updates, global_settings, network_handler
from resources.sys_patch import sys_patch_detect
from resources.gui import gui_main
@@ -55,7 +55,7 @@ class AutomaticSysPatch:
args_string = f"{self.constants.launcher_binary} {self.constants.launcher_script} --gui_patch"
warning_str = ""
if utilities.verify_network_connection("https://api.github.com/repos/dortania/OpenCore-Legacy-Patcher/releases/latest") is False:
if network_handler.NetworkUtilities("https://api.github.com/repos/dortania/OpenCore-Legacy-Patcher/releases/latest").verify_network_connection() is False:
warning_str = f"""\n\nWARNING: We're unable to verify whether there are any new releases of OpenCore Legacy Patcher on Github. Be aware that you may be using an outdated version for this OS. If you're unsure, verify on Github that OpenCore Legacy Patcher {self.constants.patcher_version} is the latest official release"""
args = [

View File

@@ -3,7 +3,7 @@
# Used when supplying data to sys_patch.py
# Copyright (C) 2020-2022, Dhinak G, Mykola Grymalyuk
from resources import constants, device_probe, utilities, amfi_detect
from resources import constants, device_probe, utilities, amfi_detect, network_handler
from resources.sys_patch import sys_patch_helpers
from data import model_array, os_data, sip_data, sys_patch_dict, smbios_data, cpu_data
@@ -402,7 +402,7 @@ class detect_root_patch:
return False
def detect_patch_set(self):
self.has_network = utilities.verify_network_connection()
self.has_network = network_handler.NetworkUtilities().verify_network_connection()
if self.check_uhci_ohci() is True:
self.legacy_uhci_ohci = True

View File

@@ -1,33 +0,0 @@
# Download PatcherSupportPkg for usage with Root Patching
# Copyright (C) 2020-2022, Dhinak G, Mykola Grymalyuk
from resources import utilities
from pathlib import Path
import shutil
import logging
class grab_patcher_support_pkg:
def __init__(self, constants):
self.constants = constants
def generate_pkg_link(self):
link = f"{self.constants.url_patcher_support_pkg}{self.constants.patcher_support_pkg_version}/Universal-Binaries.zip"
return link
def download_files(self):
link = self.generate_pkg_link()
if Path(self.constants.payload_local_binaries_root_path).exists():
logging.info("- Removing old Root Patcher Payload folder")
# Delete folder
shutil.rmtree(self.constants.payload_local_binaries_root_path)
download_result = None
if Path(self.constants.payload_local_binaries_root_path_zip).exists():
logging.info(f"- Found local Universal-Binaries.zip, skipping download")
download_result = True
else:
logging.info(f"- No local version found, downloading...")
download_result = utilities.download_file(link, self.constants.payload_local_binaries_root_path_zip)
return download_result, link

View File

@@ -5,6 +5,8 @@
import requests
import logging
from resources import network_handler
class check_binary_updates:
def __init__(self, constants):
@@ -16,17 +18,6 @@ class check_binary_updates:
self.available_binaries = {}
def verify_network_connection(self, url):
try:
response = requests.head(url, timeout=5)
if response:
return True
except (requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects,
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
return False
return False
def check_if_build_newer(self, remote_version=None, local_version=None):
if remote_version is None:
@@ -64,7 +55,7 @@ class check_binary_updates:
def check_binary_updates(self):
# logging.info("- Checking for updates...")
if self.verify_network_connection(self.binary_url):
if network_handler.NetworkUtilities(self.binary_url).verify_network_connection():
# logging.info("- Network connection functional")
response = requests.get(self.binary_url)
data_set = response.json()

View File

@@ -1,6 +1,5 @@
# Copyright (C) 2020-2022, Dhinak G, Mykola Grymalyuk
# Copyright (C) 2020-2023, Dhinak G, Mykola Grymalyuk
import hashlib
import math
import os
import plistlib
@@ -9,7 +8,6 @@ from pathlib import Path
import os
import binascii
import argparse
import time
import atexit
import requests
import shutil
@@ -20,8 +18,6 @@ import logging
from resources import constants, ioreg
from data import sip_data, os_data
SESSION = requests.Session()
def hexswap(input_hex: str):
hex_pairs = [input_hex[i : i + 2] for i in range(0, len(input_hex), 2)]
@@ -361,93 +357,6 @@ def get_firmware_vendor(*, decode: bool = False):
value = value.strip("\0")
return value
def verify_network_connection(url=None):
if url is None:
url = "https://www.google.com"
try:
response = SESSION.head(url, timeout=5, allow_redirects=True)
return True
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectionError, requests.exceptions.HTTPError):
return False
def download_file(link, location, is_gui=None, verify_checksum=False):
if verify_network_connection(link):
disable_sleep_while_running()
base_name = Path(link).name
if Path(location).exists():
Path(location).unlink()
head_response = SESSION.head(link, allow_redirects=True)
try:
# Handle cases where Content-Length has garbage or is missing
total_file_size = int(head_response.headers['Content-Length'])
except KeyError:
total_file_size = 0
if total_file_size > 1024:
file_size_rounded = round(total_file_size / 1024 / 1024, 2)
file_size_string = f" of {file_size_rounded}MB"
# Check if we have enough space
if total_file_size > get_free_space():
logging.info(f"Not enough space to download {base_name} ({file_size_rounded}MB)")
return False
else:
file_size_string = ""
response = SESSION.get(link, stream=True)
# SU Catalog's link is quite long, strip to make it bearable
if "sucatalog.gz" in base_name:
base_name = "sucatalog.gz"
header = f"# Downloading: {base_name} #"
box_length = len(header)
box_string = "#" * box_length
dl = 0
total_downloaded_string = ""
global clear
checksum = hashlib.sha256() if verify_checksum else None
with location.open("wb") as file:
count = 0
start = time.perf_counter()
for chunk in response.iter_content(1024 * 1024 * 4):
dl += len(chunk)
file.write(chunk)
if checksum:
checksum.update(chunk)
count += len(chunk)
if is_gui is None:
if clear:
cls()
logging.info(box_string)
logging.info(header)
logging.info(box_string)
logging.info("")
if total_file_size > 1024:
total_downloaded_string = f" ({round(float(dl / total_file_size * 100), 2)}%)"
logging.info(f"{round(count / 1024 / 1024, 2)}MB Downloaded{file_size_string}{total_downloaded_string}\nAverage Download Speed: {round(dl//(time.perf_counter() - start) / 100000 / 8, 2)} MB/s")
enable_sleep_after_running()
return checksum.hexdigest() if checksum else True
else:
cls()
header = "# Could not establish Network Connection with provided link! #"
box_length = len(header)
box_string = "#" * box_length
logging.info(box_string)
logging.info(header)
logging.info(box_string)
if constants.Constants().url_patcher_support_pkg in link:
# If we're downloading PatcherSupportPkg, present offline build
logging.info("\nPlease grab the offline variant of OpenCore Legacy Patcher from Github:")
logging.info(f"https://github.com/dortania/OpenCore-Legacy-Patcher/releases/download/{constants.Constants().patcher_version}/OpenCore-Patcher-TUI-Offline.app.zip")
else:
logging.info(link)
return None
def dump_constants(constants):
with open(os.path.join(os.path.expanduser('~'), 'Desktop', 'internal_data.txt'), 'w') as f:
f.write(str(vars(constants)))