Use full path and arguments for subprocess

This commit is contained in:
Mykola Grymalyuk
2023-12-30 13:49:59 -07:00
parent b46e55d3f6
commit 0dfcf03c0c
23 changed files with 152 additions and 154 deletions
+11 -11
View File
@@ -125,7 +125,7 @@ def check_if_root_is_apfs_snapshot():
def check_seal():
# 'Snapshot Sealed' property is only listed on booted snapshots
sealed = subprocess.run(["diskutil", "apfs", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
sealed = subprocess.run(["/usr/sbin/diskutil", "apfs", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if "Snapshot Sealed: Yes" in sealed.stdout.decode():
return True
else:
@@ -133,7 +133,7 @@ def check_seal():
def check_filesystem_type():
# Expected to return 'apfs' or 'hfs'
filesystem_type = plistlib.loads(subprocess.run(["diskutil", "info", "-plist", "/"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
filesystem_type = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", "/"], stdout=subprocess.PIPE).stdout.decode().strip().encode())
return filesystem_type["FilesystemType"]
@@ -187,10 +187,10 @@ def check_kext_loaded(bundle_id: str) -> str:
# no UUID for kextstat
pattern = re.compile(re.escape(bundle_id) + r"\s+\((?P<version>.+)\)")
args = ["kextstat", "-l", "-b", bundle_id]
args = ["/usr/sbin/kextstat", "-list-only", "-bundle-id", bundle_id]
if Path("/usr/bin/kmutil").exists():
args = ["kmutil", "showloaded", "--list-only", "--variant-suffix", "release", "--optional-identifier", bundle_id]
args = ["/usr/bin/kmutil", "showloaded", "--list-only", "--variant-suffix", "release", "--optional-identifier", bundle_id]
kext_loaded = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if kext_loaded.returncode != 0:
@@ -420,7 +420,7 @@ def find_apfs_physical_volume(device):
disk_list = None
physical_disks = []
try:
disk_list = plistlib.loads(subprocess.run(["diskutil", "info", "-plist", device], stdout=subprocess.PIPE).stdout)
disk_list = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", device], stdout=subprocess.PIPE).stdout)
except TypeError:
pass
@@ -465,7 +465,7 @@ def find_disk_off_uuid(uuid):
# Find disk by UUID
disk_list = None
try:
disk_list = plistlib.loads(subprocess.run(["diskutil", "info", "-plist", uuid], stdout=subprocess.PIPE).stdout)
disk_list = plistlib.loads(subprocess.run(["/usr/sbin/diskutil", "info", "-plist", uuid], stdout=subprocess.PIPE).stdout)
except TypeError:
pass
if disk_list:
@@ -497,7 +497,7 @@ def grab_mount_point_from_disk(disk):
def monitor_disk_output(disk):
# Returns MB written on drive
output = subprocess.check_output(["iostat", "-Id", disk])
output = subprocess.check_output(["/usr/sbin/iostat", "-Id", disk])
output = output.decode("utf-8")
# Grab second last entry (last is \n)
output = output.split(" ")
@@ -509,7 +509,7 @@ def get_preboot_uuid() -> str:
"""
Get the UUID of the Preboot volume
"""
args = ["ioreg", "-a", "-n", "chosen", "-p", "IODeviceTree", "-r"]
args = ["/usr/sbin/ioreg", "-a", "-n", "chosen", "-p", "IODeviceTree", "-r"]
output = plistlib.loads(subprocess.run(args, stdout=subprocess.PIPE).stdout)
return output[0]["apfs-preboot-uuid"].strip(b"\0").decode()
@@ -533,13 +533,13 @@ def block_os_updaters():
if bad_process in current_process:
if pid != "":
logging.info(f"Killing Process: {pid} - {current_process.split('/')[-1]}")
subprocess.run(["kill", "-9", pid])
subprocess.run(["/bin/kill", "-9", pid])
break
def check_boot_mode():
# Check whether we're in Safe Mode or not
try:
sys_plist = plistlib.loads(subprocess.run(["system_profiler", "SPSoftwareDataType"], stdout=subprocess.PIPE).stdout)
sys_plist = plistlib.loads(subprocess.run(["/usr/sbin/system_profiler", "SPSoftwareDataType"], stdout=subprocess.PIPE).stdout)
return sys_plist[0]["_items"][0]["boot_mode"]
except (KeyError, TypeError, plistlib.InvalidFileException):
return None
@@ -550,7 +550,7 @@ def elevated(*args, **kwargs) -> subprocess.CompletedProcess:
if os.getuid() == 0 or check_cli_args() is not None:
return subprocess.run(*args, **kwargs)
else:
return subprocess.run(["sudo"] + [args[0][0]] + args[0][1:], **kwargs)
return subprocess.run(["/usr/bin/sudo"] + [args[0][0]] + args[0][1:], **kwargs)
def fetch_staged_update(variant: str = "Update") -> (str, str):