mirror of
https://github.com/dortania/OpenCore-Legacy-Patcher.git
synced 2026-04-22 19:10:15 +10:00
Add support for KDK downloads
This commit is contained in:
@@ -14,6 +14,7 @@ import time
|
||||
import atexit
|
||||
import requests
|
||||
import shutil
|
||||
import urllib.parse
|
||||
|
||||
from resources import constants, ioreg, amfi_detect
|
||||
from data import sip_data, os_data
|
||||
@@ -376,7 +377,7 @@ def get_firmware_vendor(*, decode: bool = False):
|
||||
|
||||
def verify_network_connection(url):
|
||||
try:
|
||||
response = SESSION.head(url, timeout=5)
|
||||
response = SESSION.head(url, timeout=5, allow_redirects=True)
|
||||
return True
|
||||
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectionError, requests.exceptions.HTTPError):
|
||||
return False
|
||||
@@ -384,33 +385,31 @@ def verify_network_connection(url):
|
||||
def download_file(link, location, is_gui=None, verify_checksum=False):
|
||||
if verify_network_connection(link):
|
||||
disable_sleep_while_running()
|
||||
short_link = os.path.basename(link)
|
||||
base_name = Path(link).name
|
||||
|
||||
if Path(location).exists():
|
||||
Path(location).unlink()
|
||||
header = SESSION.head(link).headers
|
||||
try:
|
||||
# Try to get true file
|
||||
# ex. Github's release links provides a "fake" header
|
||||
# Thus need to resolve to the real link
|
||||
link = SESSION.head(link).headers["location"]
|
||||
header = SESSION.head(link).headers
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
head_response = SESSION.head(link, allow_redirects=True)
|
||||
try:
|
||||
# Handle cases where Content-Length has garbage or is missing
|
||||
total_file_size = int(SESSION.head(link).headers['Content-Length'])
|
||||
total_file_size = int(head_response.headers['Content-Length'])
|
||||
except KeyError:
|
||||
total_file_size = 0
|
||||
|
||||
if total_file_size > 1024:
|
||||
file_size_rounded = round(total_file_size / 1024 / 1024, 2)
|
||||
file_size_string = f" of {file_size_rounded}MB"
|
||||
else:
|
||||
file_size_string = ""
|
||||
|
||||
response = SESSION.get(link, stream=True)
|
||||
|
||||
# SU Catalog's link is quite long, strip to make it bearable
|
||||
if "sucatalog.gz" in short_link:
|
||||
short_link = "sucatalog.gz"
|
||||
header = f"# Downloading: {short_link} #"
|
||||
if "sucatalog.gz" in base_name:
|
||||
base_name = "sucatalog.gz"
|
||||
|
||||
header = f"# Downloading: {base_name} #"
|
||||
box_length = len(header)
|
||||
box_string = "#" * box_length
|
||||
dl = 0
|
||||
@@ -463,6 +462,30 @@ def download_file(link, location, is_gui=None, verify_checksum=False):
|
||||
print(link)
|
||||
return None
|
||||
|
||||
|
||||
def download_apple_developer_portal(link, location, is_gui=None, verify_checksum=False):
|
||||
TOKEN_URL_BASE = "https://developerservices2.apple.com/services/download?path="
|
||||
remote_path = urllib.parse.urlparse(link).path
|
||||
token_url = urllib.parse.urlunparse(urllib.parse.urlparse(TOKEN_URL_BASE)._replace(query=urllib.parse.urlencode({"path": remote_path})))
|
||||
|
||||
try:
|
||||
response = SESSION.get(token_url, timeout=5)
|
||||
except (requests.exceptions.Timeout, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectionError):
|
||||
print(" - Could not contact Apple download servers")
|
||||
return None
|
||||
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
if response.status_code == 400 and "The path specified is invalid" in response.text:
|
||||
print(" - File does not exist on Apple download servers")
|
||||
else:
|
||||
print(" - Could not request download authorization from Apple download servers")
|
||||
return None
|
||||
|
||||
return download_file(link, location, is_gui, verify_checksum)
|
||||
|
||||
|
||||
def dump_constants(constants):
|
||||
with open(os.path.join(os.path.expanduser('~'), 'Desktop', 'internal_data.txt'), 'w') as f:
|
||||
f.write(str(vars(constants)))
|
||||
@@ -550,7 +573,7 @@ def monitor_disk_output(disk):
|
||||
def validate_link(link):
|
||||
# Check if link is 404
|
||||
try:
|
||||
response = SESSION.head(link, timeout=5)
|
||||
response = SESSION.head(link, timeout=5, allow_redirects=True)
|
||||
if response.status_code == 404:
|
||||
return False
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user