Merge pull request #339 from dortania/ioreg-rewrite

Completely revamp IOReg probing
This commit is contained in:
Mykola Grymalyuk
2021-06-27 10:32:58 -06:00
committed by GitHub
7 changed files with 340 additions and 151 deletions

View File

@@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Install Python Dependencies - name: Install Python Dependencies
run: pip3 install --upgrade pyinstaller requests run: pip3 install --upgrade pyinstaller requests pyobjc
- run: pyinstaller OpenCore-Patcher.spec - run: pyinstaller OpenCore-Patcher.spec
- run: ./after_pyinstaller.sh - run: ./after_pyinstaller.sh

View File

@@ -13,7 +13,7 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Install Python Dependencies - name: Install Python Dependencies
run: pip3 install --upgrade pyinstaller requests run: pip3 install --upgrade pyinstaller requests pyobjc
- run: pyinstaller OCLP-CLI.spec - run: pyinstaller OCLP-CLI.spec
- run: cd dist; cp OCLP-CLI ../; cd .. - run: cd dist; cp OCLP-CLI ../; cd ..

View File

@@ -224,7 +224,7 @@ class BuildOpenCore:
self.config["#Revision"][f"Hardware-NVMe-{i}"] = f"{Utilities.friendly_hex(controller.vendor_id)}:{Utilities.friendly_hex(controller.device_id)}" self.config["#Revision"][f"Hardware-NVMe-{i}"] = f"{Utilities.friendly_hex(controller.vendor_id)}:{Utilities.friendly_hex(controller.device_id)}"
# Disable Bit 0 (L0s), enable Bit 1 (L1) # Disable Bit 0 (L0s), enable Bit 1 (L1)
nvme_aspm = (controller.aspm & (~3)) | 2 nvme_aspm = (controller.aspm & (~0b11)) | 0b10
if controller.pci_path: if controller.pci_path:
print(f"- Found NVMe ({i}) at {controller.pci_path}") print(f"- Found NVMe ({i}) at {controller.pci_path}")
@@ -263,7 +263,9 @@ class BuildOpenCore:
arpt_path = "PciRoot(0x0)/Pci(0x1C,0x1)/Pci(0x0,0x0)" arpt_path = "PciRoot(0x0)/Pci(0x1C,0x1)/Pci(0x0,0x0)"
print(f"- Using known DevicePath {arpt_path}") print(f"- Using known DevicePath {arpt_path}")
print(f"- Applying fake ID for WiFi, setting Country Code: {self.computer.wifi.country_code}") print(f"- Applying fake ID for WiFi, setting Country Code: {self.computer.wifi.country_code}")
self.config["DeviceProperties"]["Add"][arpt_path] = {"device-id": binascii.unhexlify("ba430000"), "compatible": "pci14e4,43ba", "brcmfx-country": self.computer.wifi.country_code} self.config["DeviceProperties"]["Add"][arpt_path] = {"device-id": binascii.unhexlify("ba430000"), "compatible": "pci14e4,43ba"}
if not self.constants.custom_model and self.computer.wifi and self.computer.wifi.country_code:
self.config["DeviceProperties"]["Add"][arpt_path] += {"brcmfx-country": self.computer.wifi.country_code}
# WiFi patches # WiFi patches
# TODO: -a is not supported in Lion and older, need to add proper fix # TODO: -a is not supported in Lion and older, need to add proper fix
@@ -279,10 +281,10 @@ class BuildOpenCore:
elif not self.constants.custom_model and self.computer.wifi: elif not self.constants.custom_model and self.computer.wifi:
if isinstance(self.computer.wifi, device_probe.Broadcom): if isinstance(self.computer.wifi, device_probe.Broadcom):
# This works around OCLP spoofing the Wifi card and therefore unable to actually detect the correct device # This works around OCLP spoofing the Wifi card and therefore unable to actually detect the correct device
if self.computer.wifi.chipset == device_probe.Broadcom.Chipsets.AirportBrcmNIC: if self.computer.wifi.chipset == device_probe.Broadcom.Chipsets.AirportBrcmNIC and self.computer.wifi.country_code:
self.enable_kext("AirportBrcmFixup.kext", self.constants.airportbcrmfixup_version, self.constants.airportbcrmfixup_path) self.enable_kext("AirportBrcmFixup.kext", self.constants.airportbcrmfixup_version, self.constants.airportbcrmfixup_path)
print(f"- Setting Wireless Card's Country Code: {self.computer.wifi.country_code}") print(f"- Setting Wireless Card's Country Code: {self.computer.wifi.country_code}")
if not self.constants.custom_model and self.computer.wifi and self.computer.wifi.pci_path: if self.computer.wifi.pci_path:
arpt_path = self.computer.wifi.pci_path arpt_path = self.computer.wifi.pci_path
print(f"- Found ARPT device at {arpt_path}") print(f"- Found ARPT device at {arpt_path}")
self.config["DeviceProperties"]["Add"][arpt_path] = {"brcmfx-country": self.computer.wifi.country_code} self.config["DeviceProperties"]["Add"][arpt_path] = {"brcmfx-country": self.computer.wifi.country_code}
@@ -315,8 +317,8 @@ class BuildOpenCore:
self.get_kext_by_bundle_path("IO80211HighSierra.kext/Contents/PlugIns/AirPortAtheros40.kext")["Enabled"] = True self.get_kext_by_bundle_path("IO80211HighSierra.kext/Contents/PlugIns/AirPortAtheros40.kext")["Enabled"] = True
else: else:
self.enable_kext("AirportBrcmFixup.kext", self.constants.airportbcrmfixup_version, self.constants.airportbcrmfixup_path) self.enable_kext("AirportBrcmFixup.kext", self.constants.airportbcrmfixup_version, self.constants.airportbcrmfixup_path)
print(f"- Setting Wireless Card's Country Code: {self.computer.wifi.country_code}") # print(f"- Setting Wireless Card's Country Code: {self.computer.wifi.country_code}")
self.config["NVRAM"]["Add"]["7C436110-AB2A-4BBB-A880-FE41995C9F82"]["boot-args"] += f" brcmfx-country={self.computer.wifi.country_code}" # self.config["NVRAM"]["Add"]["7C436110-AB2A-4BBB-A880-FE41995C9F82"]["boot-args"] += f" brcmfx-country={self.computer.wifi.country_code}"
# CPUFriend # CPUFriend
pp_map_path = Path(self.constants.platform_plugin_plist_path) / Path(f"{self.model}/Info.plist") pp_map_path = Path(self.constants.platform_plugin_plist_path) / Path(f"{self.model}/Info.plist")

View File

@@ -1,16 +1,16 @@
# Copyright (C) 2020-2021, Dhinak G # Copyright (C) 2020-2021, Dhinak G
from __future__ import print_function from __future__ import print_function
import os import hashlib
import math import math
from pathlib import Path import os
import plistlib import plistlib
import subprocess import subprocess
import requests from pathlib import Path
import hashlib
import requests import requests
from Resources import Constants from Resources import Constants, ioreg
def hexswap(input_hex: str): def hexswap(input_hex: str):
@@ -121,12 +121,19 @@ def get_nvram(variable: str, uuid: str = None, *, decode: bool = False):
uuid += ":" uuid += ":"
else: else:
uuid = "" uuid = ""
result = subprocess.run(f"nvram -x {uuid}{variable}".split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.strip()
try: nvram = ioreg.IORegistryEntryFromPath(ioreg.kIOMasterPortDefault, "IODeviceTree:/options".encode())
value = plistlib.loads(result)[f"{uuid}{variable}"]
except plistlib.InvalidFileException: value = ioreg.IORegistryEntryCreateCFProperty(nvram, f"{uuid}{variable}", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)
ioreg.IOObjectRelease(nvram)
if not value:
return None return None
if decode:
value = ioreg.corefoundation_to_native(value)
if decode and isinstance(value, bytes):
value = value.strip(b"\0").decode() value = value.strip(b"\0").decode()
return value return value

View File

@@ -1,10 +1,14 @@
# Hardware probing
# Copyright (C) 2020-2021, Dhinak G, Mykola Grymalyuk
from __future__ import annotations
import binascii import binascii
import enum import enum
import itertools import itertools
import plistlib
import subprocess import subprocess
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, ClassVar, Optional, Type from typing import Any, ClassVar, Optional, Type, Union
from Resources import PCIIDArray, Utilities, ioreg from Resources import PCIIDArray, Utilities, ioreg
@@ -34,15 +38,16 @@ class PCIDevice:
# return state # return state
@classmethod @classmethod
def from_ioregistry(cls, entry: ioreg.IORegistryEntry, anti_spoof=False): def from_ioregistry(cls, entry: ioreg.io_registry_entry_t, anti_spoof=False):
if anti_spoof and "IOName" in entry.properties: properties: dict = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperties(entry, None, ioreg.kCFAllocatorDefault, ioreg.kNilOptions)[1]) # type: ignore
vendor_id, device_id = (int(i, 16) for i in entry.properties["IOName"][3:].split(",")) if anti_spoof and "IOName" in properties:
vendor_id, device_id = (int(i, 16) for i in properties["IOName"][3:].split(","))
else: else:
vendor_id, device_id = [int.from_bytes(entry.properties[i][:4], byteorder="little") for i in ["vendor-id", "device-id"]] vendor_id, device_id = [int.from_bytes(properties[i][:4], byteorder="little") for i in ["vendor-id", "device-id"]]
device = cls(vendor_id, device_id, int.from_bytes(entry.properties["class-code"][:6], byteorder="little"), name=entry.name) device = cls(vendor_id, device_id, int.from_bytes(properties["class-code"][:6], byteorder="little"), name=ioreg.io_name_t_to_str(ioreg.IORegistryEntryGetName(entry, None)[1]))
if "model" in entry.properties: if "model" in properties:
device.model = entry.properties["model"].strip(b"\0").decode() device.model = properties["model"].strip(b"\0").decode()
device.populate_pci_path(entry) device.populate_pci_path(entry)
return device return device
@@ -67,23 +72,27 @@ class PCIDevice:
# # Eventually # # Eventually
# raise NotImplementedError # raise NotImplementedError
def populate_pci_path(self, entry: ioreg.IORegistryEntry): def populate_pci_path(self, original_entry: ioreg.io_registry_entry_t):
# Based off gfxutil logic, seems to work. # Based off gfxutil logic, seems to work.
paths = [] paths = []
entry = original_entry
while entry: while entry:
if entry.entry_class == "IOPCIDevice": if ioreg.IOObjectConformsTo(entry, "IOPCIDevice".encode()):
location = [hex(int(i, 16)) for i in entry.location.split(",") + ["0"]] location = [hex(int(i, 16)) for i in ioreg.io_name_t_to_str(ioreg.IORegistryEntryGetLocationInPlane(entry, "IOService".encode(), None)[1]).split(",") + ["0"]]
paths.append(f"Pci({location[0]},{location[1]})") paths.append(f"Pci({location[0]},{location[1]})")
elif entry.entry_class == "IOACPIPlatformDevice": elif ioreg.IOObjectConformsTo(entry, "IOACPIPlatformDevice".encode()):
paths.append(f"PciRoot({hex(int(entry.properties.get('_UID', 0)))})") paths.append(f"PciRoot({hex(int(ioreg.IORegistryEntryCreateCFProperty(entry, '_UID', ioreg.kCFAllocatorDefault, ioreg.kNilOptions) or 0))})") # type: ignore
break break
elif entry.entry_class in ["IOPCI2PCIBridge", "IOPCIBridge", "AppleACPIPCI"]: elif ioreg.IOObjectConformsTo(entry, "IOPCIBridge".encode()):
pass pass
else: else:
# There's something in between that's not PCI! Abort # There's something in between that's not PCI! Abort
paths = [] paths = []
break break
entry = entry.parent parent = ioreg.IORegistryEntryGetParentEntry(entry, "IOService".encode(), None)[1]
if entry != original_entry:
ioreg.IOObjectRelease(entry)
entry = parent
self.pci_path = "/".join(reversed(paths)) self.pci_path = "/".join(reversed(paths))
@@ -105,10 +114,25 @@ class WirelessCard(PCIDevice):
chipset: enum.Enum = field(init=False) chipset: enum.Enum = field(init=False)
def __post_init__(self): def __post_init__(self):
system_profiler = plistlib.loads(subprocess.run("system_profiler -xml SPAirPortDataType".split(), stdout=subprocess.PIPE).stdout)
self.country_code = system_profiler[0]["_items"][0]["spairport_airport_interfaces"][0]["spairport_wireless_country_code"]
self.detect_chipset() self.detect_chipset()
@classmethod
def from_ioregistry(cls, entry: ioreg.io_registry_entry_t, anti_spoof=True):
device = super().from_ioregistry(entry, anti_spoof=anti_spoof)
matching_dict = {
"IOParentMatch": ioreg.corefoundation_to_native(ioreg.IORegistryEntryIDMatching(ioreg.IORegistryEntryGetRegistryEntryID(entry, None)[1])),
"IOProviderClass": "IO80211Interface",
}
interface = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, matching_dict, None)[1]), None)
if interface:
device.country_code = ioreg.IORegistryEntryCreateCFProperty(interface, "IO80211CountryCode", ioreg.kCFAllocatorDefault, ioreg.kNilOptions) # type: ignore # If not present, will be None anyways
else:
device.country_code = None # type: ignore
return device
def detect_chipset(self): def detect_chipset(self):
raise NotImplementedError raise NotImplementedError
@@ -118,7 +142,7 @@ class NVMeController(PCIDevice):
CLASS_CODE: ClassVar[int] = 0x010802 CLASS_CODE: ClassVar[int] = 0x010802
aspm: Optional[int] = None aspm: Optional[int] = None
parent_aspm: Optional[int] = None # parent_aspm: Optional[int] = None
@dataclass @dataclass
@@ -267,12 +291,11 @@ class Computer:
wifi: Optional[WirelessCard] = None wifi: Optional[WirelessCard] = None
cpu: Optional[CPU] = None cpu: Optional[CPU] = None
oclp_version: Optional[str] = None oclp_version: Optional[str] = None
ioregistry: Optional[ioreg.IOReg] = None opencore_version: Optional[str] = None
@staticmethod @staticmethod
def probe(): def probe():
computer = Computer() computer = Computer()
computer.ioregistry = ioreg.IOReg()
computer.gpu_probe() computer.gpu_probe()
computer.dgpu_probe() computer.dgpu_probe()
computer.igpu_probe() computer.igpu_probe()
@@ -284,15 +307,20 @@ class Computer:
def gpu_probe(self): def gpu_probe(self):
# Chain together two iterators: one for class code 00000300, the other for class code 00800300 # Chain together two iterators: one for class code 00000300, the other for class code 00800300
devices = itertools.chain(self.ioregistry.find(property=("class-code", binascii.a2b_hex("00000300"))), self.ioregistry.find(property=("class-code", binascii.a2b_hex("00800300")))) devices = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault, {"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": [{"class-code": binascii.a2b_hex("00000300")}, {"class-code": binascii.a2b_hex("00800300")}]}, None
)[1]
)
for device in devices: for device in devices:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor: if vendor:
self.gpus.append(vendor.from_ioregistry(device)) # type: ignore self.gpus.append(vendor.from_ioregistry(device)) # type: ignore
ioreg.IOObjectRelease(device)
def dgpu_probe(self): def dgpu_probe(self):
device = next(self.ioregistry.find(name="GFX0"), None) device = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceNameMatching("GFX0".encode()), None)[1]), None)
if not device: if not device:
# No devices # No devices
return return
@@ -300,9 +328,10 @@ class Computer:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor: if vendor:
self.dgpu = vendor.from_ioregistry(device) # type: ignore self.dgpu = vendor.from_ioregistry(device) # type: ignore
ioreg.IOObjectRelease(device)
def igpu_probe(self): def igpu_probe(self):
device = next(self.ioregistry.find(name="IGPU"), None) device = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceNameMatching("IGPU".encode()), None)[1]), None)
if not device: if not device:
# No devices # No devices
return return
@@ -310,63 +339,63 @@ class Computer:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor: if vendor:
self.igpu = vendor.from_ioregistry(device) # type: ignore self.igpu = vendor.from_ioregistry(device) # type: ignore
ioreg.IOObjectRelease(device)
def wifi_probe(self): def wifi_probe(self):
# result = subprocess.run("ioreg -r -c IOPCIDevice -a -d2".split(), stdout=subprocess.PIPE).stdout.strip() # result = subprocess.run("ioreg -r -c IOPCIDevice -a -d2".split(), stdout=subprocess.PIPE).stdout.strip()
devices = self.ioregistry.find(property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(WirelessCard.CLASS_CODE)[2:].zfill(8))))) devices = ioreg.ioiterator_to_list(
# if not result: ioreg.IOServiceGetMatchingServices(
# # No devices ioreg.kIOMasterPortDefault,
# print("A") {"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": {"class-code": binascii.a2b_hex(Utilities.hexswap(hex(WirelessCard.CLASS_CODE)[2:].zfill(8)))}},
# return None,
)[1]
# devices = plistlib.loads(result) )
# devices = [i for i in devices if i["class-code"] == binascii.a2b_hex("00800200")]
# if not devices:
# # No devices
# print("B")
# return
for device in devices: for device in devices:
vendor: Type[WirelessCard] = PCIDevice.from_ioregistry(device, anti_spoof=True).vendor_detect(inherits=WirelessCard) # type: ignore vendor: Type[WirelessCard] = PCIDevice.from_ioregistry(device, anti_spoof=True).vendor_detect(inherits=WirelessCard) # type: ignore
if vendor: if vendor:
self.wifi = vendor.from_ioregistry(device, anti_spoof=True) # type: ignore self.wifi = vendor.from_ioregistry(device, anti_spoof=True) # type: ignore
break break
ioreg.IOObjectRelease(device)
def storage_probe(self): def storage_probe(self):
sata_controllers = self.ioregistry.find(entry_class="IOPCIDevice", property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(SATAController.CLASS_CODE)[2:].zfill(8))))) sata_controllers = ioreg.ioiterator_to_list(
nvme_controllers = itertools.chain.from_iterable( ioreg.IOServiceGetMatchingServices(
[ ioreg.kIOMasterPortDefault,
# self.ioregistry.find(entry_class="IOPCIDevice", property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(NVMeController.CLASS_CODE)[2:].zfill(8))))), {"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": [{"class-code": binascii.a2b_hex(Utilities.hexswap(hex(SATAController.CLASS_CODE)[2:].zfill(8)))}]},
self.ioregistry.find(entry_class="IOPCIDevice", children={"entry_class": "IONVMeController"}), None,
] )[1]
)
nvme_controllers = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault, {"IOProviderClass": "IONVMeController", "IOParentMatch": {"IOProviderClass": "IOPCIDevice"}, "IOPropertyMatch": {"IOClass": "IONVMeController"}}, None
)[1]
) )
for device in sata_controllers: for device in sata_controllers:
self.storage.append(SATAController.from_ioregistry(device)) self.storage.append(SATAController.from_ioregistry(device))
ioreg.IOObjectRelease(device)
for device in nvme_controllers: for device in nvme_controllers:
aspm = device.properties.get("pci-aspm-default", 0) parent = ioreg.IORegistryEntryGetParentEntry(device, "IOService".encode(), None)[1]
ioreg.IOObjectRelease(device)
aspm: Union[int, bytes] = ioreg.IORegistryEntryCreateCFProperty(parent, "pci-aspm-default", ioreg.kCFAllocatorDefault, ioreg.kNilOptions) or 0 # type: ignore
if isinstance(aspm, bytes): if isinstance(aspm, bytes):
aspm = int.from_bytes(aspm, byteorder="little") aspm = int.from_bytes(aspm, byteorder="little")
if device.parent.parent.entry_class == "IOPCIDevice": controller = NVMeController.from_ioregistry(parent)
parent_aspm = device.parent.parent.properties.get("pci-aspm-default", 0)
if isinstance(parent_aspm, bytes):
parent_aspm = int.from_bytes(parent_aspm, byteorder="little")
else:
parent_aspm = None
controller = NVMeController.from_ioregistry(device)
controller.aspm = aspm controller.aspm = aspm
controller.parent_aspm = parent_aspm
if controller.vendor_id != 0x106B: if controller.vendor_id != 0x106B:
self.storage.append(controller) self.storage.append(controller)
ioreg.IOObjectRelease(parent)
def smbios_probe(self): def smbios_probe(self):
# Reported model # Reported model
entry = next(self.ioregistry.find(name="Root")).children[0] entry = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceMatching("IOPlatformExpertDevice".encode()), None)[1]))
self.reported_model = entry.properties["model"].strip(b"\0").decode() self.reported_model = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperty(entry, "model", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)).strip(b"\0").decode() # type: ignore
self.reported_board_id = entry.properties.get("board-id", entry.properties.get("target-type", b"")).strip(b"\0").decode() self.reported_board_id = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperty(entry, "model", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)).strip(b"\0").decode() # type: ignore
ioreg.IOObjectRelease(entry)
# Real model # Real model
# TODO: We previously had logic for OC users using iMacPro1,1 with incorrect ExposeSensitiveData. Add logic? # TODO: We previously had logic for OC users using iMacPro1,1 with incorrect ExposeSensitiveData. Add logic?
@@ -375,6 +404,7 @@ class Computer:
# OCLP version # OCLP version
self.oclp_version = Utilities.get_nvram("OCLP-Version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True) self.oclp_version = Utilities.get_nvram("OCLP-Version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
self.opencore_version = Utilities.get_nvram("opencore-version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
def cpu_probe(self): def cpu_probe(self):
self.cpu = CPU( self.cpu = CPU(

View File

@@ -1,93 +1,242 @@
# Handle misc CLI menu options
# Copyright (C) 2020-2021, Dhinak G
from __future__ import annotations from __future__ import annotations
import plistlib from typing import NewType, Union
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Generator
import objc
from CoreFoundation import CFRelease, kCFAllocatorDefault # type: ignore # pylint: disable=no-name-in-module
from Foundation import NSBundle # type: ignore # pylint: disable=no-name-in-module
from PyObjCTools import Conversion
@dataclass IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit")
class IORegistryEntry:
name: str
entry_class: str
properties: dict
location: str
children: list[IORegistryEntry]
parent: IORegistryEntry
# pylint: disable=invalid-name
io_name_t_ref_out = b"[128c]" # io_name_t is char[128]
const_io_name_t_ref_in = b"r*"
CFStringRef = b"^{__CFString=}"
CFDictionaryRef = b"^{__CFDictionary=}"
CFAllocatorRef = b"^{__CFAllocator=}"
# pylint: enable=invalid-name
class IOReg: # https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/ObjCRuntimeGuide/Articles/ocrtTypeEncodings.html
def __init__(self): functions = [
try: ("IORegistryEntryCreateCFProperties", b"IIo^@" + CFAllocatorRef + b"I"),
self.ioreg = plistlib.loads(subprocess.run("ioreg -a -l".split(), stdout=subprocess.PIPE).stdout.strip()) ("IOServiceMatching", CFDictionaryRef + b"r*"),
except Exception: ("IOServiceGetMatchingServices", b"II" + CFDictionaryRef + b"o^I"),
fd, file_path = tempfile.mkstemp(suffix=".plist") ("IOIteratorNext", b"II"),
with open(fd, "wb") as file_obj: ("IORegistryEntryGetParentEntry", b"IIr*o^I"),
file_obj.write(subprocess.run("ioreg -a -l".split(), stdout=subprocess.PIPE).stdout.strip()) ("IOObjectRelease", b"II"),
("IORegistryEntryGetName", b"IIo" + io_name_t_ref_out),
subprocess.run("plutil -convert binary1".split() + [file_path]) ("IOObjectGetClass", b"IIo" + io_name_t_ref_out),
self.ioreg = plistlib.load(Path(file_path).open("rb")) ("IOObjectCopyClass", CFStringRef + b"I"),
self.tree = self.recurse(self.ioreg, None) ("IOObjectCopySuperclassForClass", CFStringRef + CFStringRef),
("IORegistryEntryGetChildIterator", b"IIr*o^I"),
def recurse(self, entry, parent): ("IORegistryCreateIterator", b"IIr*Io^I"),
converted = IORegistryEntry( ("IORegistryEntryCreateIterator", b"IIr*Io^I"),
entry["IORegistryEntryName"], ("IORegistryIteratorEnterEntry", b"II"),
entry["IOObjectClass"], ("IORegistryIteratorExitEntry", b"II"),
{ ("IORegistryEntryCreateCFProperty", b"@I" + CFStringRef + CFAllocatorRef + b"I"),
i: v ("IORegistryEntryGetPath", b"IIr*oI"),
for i, v in entry.items() ("IORegistryEntryCopyPath", CFStringRef + b"Ir*"),
if i ("IOObjectConformsTo", b"II" + const_io_name_t_ref_in),
not in [ ("IORegistryEntryGetLocationInPlane", b"II" + const_io_name_t_ref_in + b"o" + io_name_t_ref_out),
"IOServiceBusyState", ("IOServiceNameMatching", CFDictionaryRef + b"r*"),
"IOServiceBusyTime", ("IORegistryEntryGetRegistryEntryID", b"IIo^Q"),
"IOServiceState", ("IORegistryEntryIDMatching", CFDictionaryRef + b"Q"),
"IORegistryEntryLocation", ("IORegistryEntryFromPath", b"II" + const_io_name_t_ref_in),
"IORegistryEntryName",
"IORegistryEntryID",
"IOObjectClass",
"IORegistryEntryChildren",
"IOObjectRetainCount",
] ]
},
entry.get("IORegistryEntryLocation"),
[],
parent,
)
for i in entry.get("IORegistryEntryChildren", []): variables = [("kIOMasterPortDefault", b"I")]
converted.children.append(self.recurse(i, converted))
return converted # pylint: disable=invalid-name
pointer = type(None)
def parse_conditions(self, entry: IORegistryEntry, **kwargs): kern_return_t = NewType("kern_return_t", int)
conditions = [] boolean_t = int
if "parent" in kwargs:
conditions.append(self.parse_conditions(entry.parent, **kwargs["parent"]))
if "children" in kwargs:
conditions.append(any(self.parse_conditions(i, **kwargs["children"]) for i in entry.children))
if "name" in kwargs:
conditions.append(kwargs["name"] == entry.name)
if "entry_class" in kwargs:
conditions.append(kwargs["entry_class"] == entry.entry_class)
if "key" in kwargs:
conditions.append(kwargs["key"] in entry.properties)
if "property" in kwargs:
conditions.append(kwargs["property"][0] in entry.properties and entry.properties[kwargs["property"][0]] == kwargs["property"][1])
return all(conditions) io_object_t = NewType("io_object_t", object)
io_name_t = bytes
io_string_t = bytes
def find(self, root: IORegistryEntry = None, **kwargs) -> Generator[IORegistryEntry, None, None]: # io_registry_entry_t = NewType("io_registry_entry_t", io_object_t)
if not root: io_registry_entry_t = io_object_t
root = self.tree io_iterator_t = NewType("io_iterator_t", io_object_t)
if not kwargs: CFTypeRef = Union[int, float, bytes, dict, list]
return
if self.parse_conditions(root, **kwargs): IOOptionBits = int
yield root mach_port_t = int
CFAllocatorType = type(kCFAllocatorDefault)
for i in root.children: NULL = 0
for j in self.find(i, **kwargs):
yield j kIOMasterPortDefault: mach_port_t
kNilOptions: IOOptionBits = NULL
# IOKitLib.h
kIORegistryIterateRecursively = 1
kIORegistryIterateParents = 2
# pylint: enable=invalid-name
# kern_return_t IORegistryEntryCreateCFProperties(io_registry_entry_t entry, CFMutableDictionaryRef * properties, CFAllocatorRef allocator, IOOptionBits options);
def IORegistryEntryCreateCFProperties(entry: io_registry_entry_t, properties: pointer, allocator: CFAllocatorType, options: IOOptionBits) -> tuple[kern_return_t, dict]: # pylint: disable=invalid-name
raise NotImplementedError
# CFMutableDictionaryRef IOServiceMatching(const char * name);
def IOServiceMatching(name: bytes) -> dict: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOServiceGetMatchingServices(mach_port_t masterPort, CFDictionaryRef matching CF_RELEASES_ARGUMENT, io_iterator_t * existing);
def IOServiceGetMatchingServices(masterPort: mach_port_t, matching: dict, existing: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# io_object_t IOIteratorNext(io_iterator_t iterator);
def IOIteratorNext(iterator: io_iterator_t) -> io_object_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetParentEntry(io_registry_entry_t entry, const io_name_t plane, io_registry_entry_t * parent);
def IORegistryEntryGetParentEntry(entry: io_registry_entry_t, plane: io_name_t, parent: pointer) -> tuple[kern_return_t, io_registry_entry_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOObjectRelease(io_object_t object);
def IOObjectRelease(object: io_object_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetName(io_registry_entry_t entry, io_name_t name);
def IORegistryEntryGetName(entry: io_registry_entry_t, name: pointer) -> tuple[kern_return_t, bytes]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOObjectGetClass(io_object_t object, io_name_t className);
def IOObjectGetClass(object: io_object_t, className: pointer) -> tuple[kern_return_t, bytes]: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IOObjectCopyClass(io_object_t object);
def IOObjectCopyClass(object: io_object_t) -> str: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IOObjectCopySuperclassForClass(CFStringRef classname)
def IOObjectCopySuperclassForClass(classname: str) -> str: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetChildIterator(io_registry_entry_t entry, const io_name_t plane, io_iterator_t * iterator);
def IORegistryEntryGetChildIterator(entry: io_registry_entry_t, plane: io_name_t, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryCreateIterator(mach_port_t masterPort, const io_name_t plane, IOOptionBits options, io_iterator_t * iterator)
def IORegistryCreateIterator(masterPort: mach_port_t, plane: io_name_t, options: IOOptionBits, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryCreateIterator(io_registry_entry_t entry, const io_name_t plane, IOOptionBits options, io_iterator_t * iterator)
def IORegistryEntryCreateIterator(entry: io_registry_entry_t, plane: io_name_t, options: IOOptionBits, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryIteratorEnterEntry(io_iterator_t iterator)
def IORegistryIteratorEnterEntry(iterator: io_iterator_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryIteratorExitEntry(io_iterator_t iterator)
def IORegistryIteratorExitEntry(iterator: io_iterator_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# CFTypeRef IORegistryEntryCreateCFProperty(io_registry_entry_t entry, CFStringRef key, CFAllocatorRef allocator, IOOptionBits options);
def IORegistryEntryCreateCFProperty(entry: io_registry_entry_t, key: str, allocator: CFAllocatorType, options: IOOptionBits) -> CFTypeRef: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetPath(io_registry_entry_t entry, const io_name_t plane, io_string_t path);
def IORegistryEntryGetPath(entry: io_registry_entry_t, plane: io_name_t, path: pointer) -> tuple[kern_return_t, io_string_t]: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IORegistryEntryCopyPath(io_registry_entry_t entry, const io_name_t plane)
def IORegistryEntryCopyPath(entry: io_registry_entry_t, plane: bytes) -> str: # pylint: disable=invalid-name
raise NotImplementedError
# boolean_t IOObjectConformsTo(io_object_t object, const io_name_t className)
def IOObjectConformsTo(object: io_object_t, className: bytes) -> boolean_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetLocationInPlane(io_registry_entry_t entry, const io_name_t plane, io_name_t location)
def IORegistryEntryGetLocationInPlane(entry: io_registry_entry_t, plane: io_name_t, location: pointer) -> tuple[kern_return_t, bytes]: # pylint: disable=invalid-name
raise NotImplementedError
# CFMutableDictionaryRef IOServiceNameMatching(const char * name);
def IOServiceNameMatching(name: bytes) -> dict: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetRegistryEntryID(io_registry_entry_t entry, uint64_t * entryID)
def IORegistryEntryGetRegistryEntryID(entry: io_registry_entry_t, entryID: pointer) -> tuple[kern_return_t, int]: # pylint: disable=invalid-name
raise NotImplementedError
# CFMutableDictionaryRef IORegistryEntryIDMatching(uint64_t entryID);
def IORegistryEntryIDMatching(entryID: int) -> dict: # pylint: disable=invalid-name
raise NotImplementedError
# io_registry_entry_t IORegistryEntryFromPath(mach_port_t mainPort, const io_string_t path)
def IORegistryEntryFromPath(mainPort: mach_port_t, path: io_string_t) -> io_registry_entry_t: # pylint: disable=invalid-name
raise NotImplementedError
objc.loadBundleFunctions(IOKit_bundle, globals(), functions) # type: ignore # pylint: disable=no-member
objc.loadBundleVariables(IOKit_bundle, globals(), variables) # type: ignore # pylint: disable=no-member
def ioiterator_to_list(iterator: io_iterator_t):
# items = []
item = IOIteratorNext(iterator) # noqa: F821
while item:
# items.append(next)
yield item
item = IOIteratorNext(iterator) # noqa: F821
IOObjectRelease(iterator) # noqa: F821
# return items
def corefoundation_to_native(collection):
native = Conversion.pythonCollectionFromPropertyList(collection)
CFRelease(collection)
return native
def native_to_corefoundation(native):
return Conversion.propertyListFromPythonCollection(native)
def io_name_t_to_str(name):
return name.partition(b"\0")[0].decode()
def get_class_inheritance(io_object):
classes = []
cls = IOObjectCopyClass(io_object)
while cls:
# yield cls
classes.append(cls)
CFRelease(cls)
cls = IOObjectCopySuperclassForClass(cls)
return classes

View File

@@ -1 +1,2 @@
requests requests
pyobjc