subprocess_wrapper.py: Add unified error handling

Additionally adds backend support for Privileged Helper Tool
This commit is contained in:
Mykola Grymalyuk
2024-05-10 16:06:16 -06:00
parent 84e020f7ea
commit abb18a5ad2
17 changed files with 304 additions and 142 deletions

View File

@@ -11,6 +11,8 @@ import subprocess
from pathlib import Path
from . import subprocess_wrapper
from .. import constants
from ..wx_gui import gui_entry
@@ -172,7 +174,7 @@ class arguments:
if "GPUCompanionBundles" not in kext_plist:
continue
logging.info(f" - Removing {kext.name}")
subprocess.run(["/bin/rm", "-rf", kext])
subprocess_wrapper.run_as_root(["/bin/rm", "-rf", kext])
def _build_handler(self) -> None:

View File

@@ -10,7 +10,8 @@ ie. during automated patching
import os
import logging
import plistlib
import subprocess
from . import subprocess_wrapper
from pathlib import Path
@@ -115,12 +116,11 @@ class GlobalEnviromentSettings:
This in turn breaks normal OCLP execution to write to settings file
"""
if os.geteuid() != 0:
if os.geteuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
return
# Set file permission to allow any user to write to log file
result = subprocess.run(["/bin/chmod", "777", self.global_settings_plist], capture_output=True)
result = subprocess_wrapper.run_as_root(["/bin/chmod", "777", self.global_settings_plist], capture_output=True)
if result.returncode != 0:
logging.warning("Failed to fix settings file permissions:")
if result.stderr:
logging.warning(result.stderr.decode("utf-8"))
subprocess_wrapper.log(result)

View File

@@ -9,7 +9,7 @@ import applescript
from pathlib import Path
from . import utilities
from . import utilities, subprocess_wrapper
from .. import constants
@@ -92,7 +92,7 @@ class tui_disk_installation:
def install_opencore(self, full_disk_identifier: str):
# TODO: Apple Script fails in Yosemite(?) and older
logging.info(f"Mounting partition: {full_disk_identifier}")
if self.constants.detected_os >= os_data.os_data.el_capitan and not self.constants.recovery_status:
if self.constants.detected_os >= os_data.os_data.el_capitan and not self.constants.recovery_status and subprocess_wrapper.supports_privileged_helper() is False:
try:
applescript.AppleScript(f'''do shell script "diskutil mount {full_disk_identifier}" with prompt "OpenCore Legacy Patcher needs administrator privileges to mount this volume." with administrator privileges without altering line endings''').run()
except applescript.ScriptError as e:
@@ -105,10 +105,10 @@ class tui_disk_installation:
logging.info("Please disable Safe Mode and try again.")
return
else:
result = subprocess.run(["/usr/sbin/diskutil", "mount", full_disk_identifier], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = subprocess_wrapper.run_as_root(["/usr/sbin/diskutil", "mount", full_disk_identifier], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("Mount failed")
logging.info(result.stderr.decode())
subprocess_wrapper.log(result)
return
partition_info = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", full_disk_identifier], stdout=subprocess.PIPE).stdout.decode().strip().encode())

View File

@@ -18,8 +18,8 @@ from .. import constants
from ..datasets import os_data
from . import (
utilities,
network_handler
network_handler,
subprocess_wrapper
)
KDK_INSTALL_PATH: str = "/Library/Developer/KDKs"
@@ -464,7 +464,7 @@ class KernelDebugKitObject:
if self.passive is True:
return
if os.getuid() != 0:
if os.getuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
logging.warning("Cannot remove KDK, not running as root")
return
@@ -474,10 +474,10 @@ class KernelDebugKitObject:
rm_args = ["/bin/rm", "-rf" if Path(kdk_path).is_dir() else "-f", kdk_path]
result = utilities.elevated(rm_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
result = subprocess_wrapper.run_as_root(rm_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.warning(f"Failed to remove KDK: {kdk_path}")
logging.warning(f"{result.stdout.decode('utf-8')}")
subprocess_wrapper.log(result)
return
logging.info(f"Successfully removed KDK: {kdk_path}")
@@ -545,7 +545,7 @@ class KernelDebugKitObject:
result = subprocess.run(["/usr/bin/hdiutil", "verify", self.constants.kdk_download_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
logging.info("Error: Kernel Debug Kit checksum verification failed!")
logging.info(f"Output: {result.stderr.decode('utf-8')}")
subprocess_wrapper.log(result)
msg = "Kernel Debug Kit checksum verification failed, please try again.\n\nIf this continues to fail, ensure you're downloading on a stable network connection (ie. Ethernet)"
logging.info(f"{msg}")
@@ -579,7 +579,7 @@ class KernelDebugKitUtilities:
bool: True if successful, False if not
"""
if os.getuid() != 0:
if os.getuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
logging.warning("Cannot install KDK, not running as root")
return False
@@ -588,12 +588,10 @@ class KernelDebugKitUtilities:
# TODO: Check whether enough disk space is available
result = utilities.elevated(["/usr/sbin/installer", "-pkg", kdk_path, "-target", "/"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
result = subprocess_wrapper.run_as_root(["/usr/sbin/installer", "-pkg", kdk_path, "-target", "/"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to install KDK:")
logging.info(result.stdout.decode('utf-8'))
if result.stderr:
logging.info(result.stderr.decode('utf-8'))
subprocess_wrapper.log(result)
return False
return True
@@ -609,20 +607,19 @@ class KernelDebugKitUtilities:
bool: True if successful, False if not
"""
if os.getuid() != 0:
if os.getuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
logging.warning("Cannot install KDK, not running as root")
return False
logging.info(f"Extracting downloaded KDK disk image")
with tempfile.TemporaryDirectory() as mount_point:
result = subprocess.run(["/usr/bin/hdiutil", "attach", kdk_path, "-mountpoint", mount_point, "-nobrowse"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
result = subprocess_wrapper.run_as_root(["/usr/bin/hdiutil", "attach", kdk_path, "-mountpoint", mount_point, "-nobrowse"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to mount KDK:")
logging.info(result.stdout.decode('utf-8'))
subprocess_wrapper.log(result)
return False
kdk_pkg_path = Path(f"{mount_point}/KernelDebugKit.pkg")
if not kdk_pkg_path.exists():
logging.warning("Failed to find KDK package in DMG, likely corrupted!!!")
self._unmount_disk_image(mount_point)
@@ -672,12 +669,12 @@ class KernelDebugKitUtilities:
logging.warning("Malformed KDK Info.plist provided, cannot create backup")
return
if os.getuid() != 0:
if os.getuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
logging.warning("Cannot create KDK backup, not running as root")
return
if not Path(KDK_INSTALL_PATH).exists():
subprocess.run(["/bin/mkdir", "-p", KDK_INSTALL_PATH], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
subprocess_wrapper.run_as_root(["/bin/mkdir", "-p", KDK_INSTALL_PATH], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
kdk_dst_name = f"KDK_{kdk_info_dict['version']}_{kdk_info_dict['build']}.pkg"
kdk_dst_path = Path(f"{KDK_INSTALL_PATH}/{kdk_dst_name}")
@@ -687,7 +684,7 @@ class KernelDebugKitUtilities:
logging.info("Backup already exists, skipping")
return
result = utilities.elevated(["/bin/cp", "-R", kdk_path, kdk_dst_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
result = subprocess_wrapper.run_as_root(["/bin/cp", "-R", kdk_path, kdk_dst_path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Failed to create KDK backup:")
logging.info(result.stdout.decode('utf-8'))
subprocess_wrapper.log(result)

View File

@@ -18,7 +18,8 @@ from .. import constants
from . import (
analytics_handler,
global_settings
global_settings,
subprocess_wrapper
)
@@ -130,7 +131,7 @@ class InitializeLoggingSupport:
This in turn breaks normal OCLP execution to write to log file
"""
if os.geteuid() != 0:
if os.geteuid() != 0 and subprocess_wrapper.supports_privileged_helper() is False:
return
paths = [
@@ -139,15 +140,10 @@ class InitializeLoggingSupport:
]
for path in paths:
result = subprocess.run(["/bin/chmod", "777", path], capture_output=True)
result = subprocess_wrapper.run_as_root(["/bin/chmod", "777", path], capture_output=True)
if result.returncode != 0:
logging.error(f"Failed to fix log file permissions")
if result.stdout:
logging.error("STDOUT:")
logging.error(result.stdout.decode("utf-8"))
if result.stderr:
logging.error("STDERR:")
logging.error(result.stderr.decode("utf-8"))
subprocess_wrapper.log(result)
def _initialize_logging_configuration(self, log_to_file: bool = True) -> None:

View File

@@ -12,6 +12,8 @@ import logging
from pathlib import Path
from . import subprocess_wrapper
from .. import constants
@@ -55,8 +57,7 @@ class RoutePayloadDiskImage:
atexit.register(self._unmount_active_dmgs, unmount_all_active=False)
else:
logging.info("Failed to mount payloads.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
subprocess_wrapper.log(output)
def _unmount_active_dmgs(self, unmount_all_active: bool = True) -> None:

View File

@@ -0,0 +1,179 @@
"""
subprocess_wrapper.py: Wrapper for subprocess module to better handle errors and output
Additionally handles our Privileged Helper Tool
"""
import os
import enum
import logging
import subprocess
from pathlib import Path
from functools import cache
from . import utilities
OCLP_PRIVILEGED_HELPER = "/Library/PrivilegedHelperTools/com.dortania.opencore-legacy-patcher.privileged-helper"
class PrivilegedHelperErrorCodes(enum.IntEnum):
"""
Error codes for Privileged Helper Tool.
Reference:
payloads/Tools/PrivilegedHelperTool/main.m
"""
OCLP_PHT_ERROR_MISSING_ARGUMENTS = 160
OCLP_PHT_ERROR_SET_UID_MISSING = 161
OCLP_PHT_ERROR_SET_UID_FAILED = 162
OCLP_PHT_ERROR_SELF_PATH_MISSING = 163
OCLP_PHT_ERROR_PARENT_PATH_MISSING = 164
OCLP_PHT_ERROR_SIGNING_INFORMATION_MISSING = 165
OCLP_PHT_ERROR_INVALID_TEAM_ID = 166
OCLP_PHT_ERROR_INVALID_CERTIFICATES = 167
OCLP_PHT_ERROR_COMMAND_MISSING = 168
OCLP_PHT_ERROR_COMMAND_FAILED = 169
OCLP_PHT_ERROR_CATCH_ALL = 170
@cache
def supports_privileged_helper() -> bool:
"""
Check if Privileged Helper Tool is supported.
When privileged helper is officially shipped, this function should always return True.
Something would have gone very wrong if it doesn't exist past that point.
"""
return Path(OCLP_PRIVILEGED_HELPER).exists()
def run(*args, **kwargs) -> subprocess.CompletedProcess:
"""
Basic subprocess.run wrapper.
"""
return subprocess.run(*args, **kwargs)
def run_as_root(*args, **kwargs) -> subprocess.CompletedProcess:
"""
Run subprocess as root.
Note: Full path to first argument is required.
Helper tool does not resolve PATH.
"""
# Check if first argument exists
if not Path(args[0][0]).exists():
raise FileNotFoundError(f"File not found: {args[0][0]}")
if supports_privileged_helper() is False:
# Fall back to old logic
# This should be removed when we start shipping the helper tool officially
if os.getuid() == 0 or utilities.check_cli_args() is not None:
return subprocess.run(*args, **kwargs)
else:
return subprocess.run(["/usr/bin/sudo"] + [args[0][0]] + args[0][1:], **kwargs)
return subprocess.run([OCLP_PRIVILEGED_HELPER] + [args[0][0]] + args[0][1:], **kwargs)
def verify(process_result: subprocess.CompletedProcess) -> None:
"""
Verify process result and raise exception if failed.
"""
if process_result.returncode == 0:
return
log(process_result)
raise Exception(f"Process failed with exit code {process_result.returncode}")
def run_and_verify(*args, **kwargs) -> None:
"""
Run subprocess and verify result.
Asserts on failure.
"""
verify(run(*args, **kwargs))
def run_as_root_and_verify(*args, **kwargs) -> None:
"""
Run subprocess as root and verify result.
Asserts on failure.
"""
verify(run_as_root(*args, **kwargs))
def log(process: subprocess.CompletedProcess) -> None:
"""
Display subprocess error output in formatted string.
"""
for line in generate_log(process).split("\n"):
logging.error(line)
def generate_log(process: subprocess.CompletedProcess) -> str:
"""
Display subprocess error output in formatted string.
Note this function is still used for zero return code errors, since
some software don't ever return non-zero regardless of success.
Format:
Command: <command>
Return Code: <return code>
Standard Output:
<standard output line 1>
<standard output line 2>
...
Standard Error:
<standard error line 1>
<standard error line 2>
...
"""
output = "Subprocess failed.\n"
output += f" Command: {process.args}\n"
output += f" Return Code: {process.returncode}\n"
_returned_error = __resolve_privileged_helper_errors(process.returncode)
if _returned_error:
output += f" Likely Enum: {_returned_error}\n"
output += f" Standard Output:\n"
if process.stdout:
output += __format_output(process.stdout.decode("utf-8"))
else:
output += " None\n"
output += f" Standard Error:\n"
if process.stderr:
output += __format_output(process.stderr.decode("utf-8"))
else:
output += " None\n"
return output
def __resolve_privileged_helper_errors(return_code: int) -> str:
"""
Attempt to resolve Privileged Helper Tool error codes.
"""
if return_code not in [error_code.value for error_code in PrivilegedHelperErrorCodes]:
return None
return PrivilegedHelperErrorCodes(return_code).name
def __format_output(output: str) -> str:
"""
Format output.
"""
if not output:
# Shouldn't happen, but just in case
return " None\n"
_result = "\n".join([f" {line}" for line in output.split("\n") if line not in ["", "\n"]])
if not _result.endswith("\n"):
_result += "\n"
return _result

View File

@@ -41,13 +41,6 @@ def string_to_hex(input_string):
return input_string
def process_status(process_result):
if process_result.returncode != 0:
logging.info(f"Process failed with exit code {process_result.returncode}")
logging.info(f"Please report the issue on the Discord server")
raise Exception(f"Process result: \n{process_result.stdout.decode()}")
def human_fmt(num):
for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
if abs(num) < 1000.0:
@@ -553,14 +546,6 @@ def check_boot_mode():
except (KeyError, TypeError, plistlib.InvalidFileException):
return None
def elevated(*args, **kwargs) -> subprocess.CompletedProcess:
# When running through our GUI, we run as root, however we do not get uid 0
# Best to assume CLI is running as root
if os.getuid() == 0 or check_cli_args() is not None:
return subprocess.run(*args, **kwargs)
else:
return subprocess.run(["/usr/bin/sudo"] + [args[0][0]] + args[0][1:], **kwargs)
def fetch_staged_update(variant: str = "Update") -> tuple[str, str]:
"""

View File

@@ -13,6 +13,7 @@ from .. import constants
from ..sys_patch import sys_patch_helpers
from ..efi_builder import build
from ..support import subprocess_wrapper
from ..datasets import (
example_data,
@@ -83,7 +84,7 @@ class PatcherValidation:
result = subprocess.run([self.constants.ocvalidate_path, f"{self.constants.opencore_release_folder}/EFI/OC/config.plist"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Error on build!")
logging.info(result.stdout.decode())
subprocess_wrapper.log(result)
raise Exception(f"Validation failed for predefined model: {model}")
else:
logging.info(f"Validation succeeded for predefined model: {model}")
@@ -103,7 +104,7 @@ class PatcherValidation:
result = subprocess.run([self.constants.ocvalidate_path, f"{self.constants.opencore_release_folder}/EFI/OC/config.plist"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
logging.info("Error on build!")
logging.info(result.stdout.decode())
subprocess_wrapper.log(result)
raise Exception(f"Validation failed for predefined model: {self.constants.computer.real_model}")
else:
logging.info(f"Validation succeeded for predefined model: {self.constants.computer.real_model}")
@@ -178,8 +179,7 @@ class PatcherValidation:
if output.returncode != 0:
logging.info("Failed to unmount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
subprocess_wrapper.log(output)
raise Exception("Failed to unmount Universal-Binaries.dmg")
@@ -196,8 +196,7 @@ class PatcherValidation:
if output.returncode != 0:
logging.info("Failed to mount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
subprocess_wrapper.log(output)
raise Exception("Failed to mount Universal-Binaries.dmg")
@@ -226,8 +225,7 @@ class PatcherValidation:
if output.returncode != 0:
logging.info("Failed to unmount Universal-Binaries.dmg")
logging.info(f"Output: {output.stdout.decode()}")
logging.info(f"Return Code: {output.returncode}")
subprocess_wrapper.log(output)
raise Exception("Failed to unmount Universal-Binaries.dmg")