Completely revamp IOReg probing

This commit is contained in:
Dhinak G
2021-06-24 19:55:22 -04:00
parent 1d8f87fb78
commit 657fc97f4f
7 changed files with 340 additions and 151 deletions

View File

@@ -1,10 +1,14 @@
# Hardware probing
# Copyright (C) 2020-2021, Dhinak G, Mykola Grymalyuk
from __future__ import annotations
import binascii
import enum
import itertools
import plistlib
import subprocess
from dataclasses import dataclass, field
from typing import Any, ClassVar, Optional, Type
from typing import Any, ClassVar, Optional, Type, Union
from Resources import PCIIDArray, Utilities, ioreg
@@ -34,15 +38,16 @@ class PCIDevice:
# return state
@classmethod
def from_ioregistry(cls, entry: ioreg.IORegistryEntry, anti_spoof=False):
if anti_spoof and "IOName" in entry.properties:
vendor_id, device_id = (int(i, 16) for i in entry.properties["IOName"][3:].split(","))
def from_ioregistry(cls, entry: ioreg.io_registry_entry_t, anti_spoof=False):
properties: dict = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperties(entry, None, ioreg.kCFAllocatorDefault, ioreg.kNilOptions)[1]) # type: ignore
if anti_spoof and "IOName" in properties:
vendor_id, device_id = (int(i, 16) for i in properties["IOName"][3:].split(","))
else:
vendor_id, device_id = [int.from_bytes(entry.properties[i][:4], byteorder="little") for i in ["vendor-id", "device-id"]]
vendor_id, device_id = [int.from_bytes(properties[i][:4], byteorder="little") for i in ["vendor-id", "device-id"]]
device = cls(vendor_id, device_id, int.from_bytes(entry.properties["class-code"][:6], byteorder="little"), name=entry.name)
if "model" in entry.properties:
device.model = entry.properties["model"].strip(b"\0").decode()
device = cls(vendor_id, device_id, int.from_bytes(properties["class-code"][:6], byteorder="little"), name=ioreg.io_name_t_to_str(ioreg.IORegistryEntryGetName(entry, None)[1]))
if "model" in properties:
device.model = properties["model"].strip(b"\0").decode()
device.populate_pci_path(entry)
return device
@@ -67,23 +72,27 @@ class PCIDevice:
# # Eventually
# raise NotImplementedError
def populate_pci_path(self, entry: ioreg.IORegistryEntry):
def populate_pci_path(self, original_entry: ioreg.io_registry_entry_t):
# Based off gfxutil logic, seems to work.
paths = []
entry = original_entry
while entry:
if entry.entry_class == "IOPCIDevice":
location = [hex(int(i, 16)) for i in entry.location.split(",") + ["0"]]
if ioreg.IOObjectConformsTo(entry, "IOPCIDevice".encode()):
location = [hex(int(i, 16)) for i in ioreg.io_name_t_to_str(ioreg.IORegistryEntryGetLocationInPlane(entry, "IOService".encode(), None)[1]).split(",") + ["0"]]
paths.append(f"Pci({location[0]},{location[1]})")
elif entry.entry_class == "IOACPIPlatformDevice":
paths.append(f"PciRoot({hex(int(entry.properties.get('_UID', 0)))})")
elif ioreg.IOObjectConformsTo(entry, "IOACPIPlatformDevice".encode()):
paths.append(f"PciRoot({hex(int(ioreg.IORegistryEntryCreateCFProperty(entry, '_UID', ioreg.kCFAllocatorDefault, ioreg.kNilOptions) or 0))})") # type: ignore
break
elif entry.entry_class in ["IOPCI2PCIBridge", "IOPCIBridge", "AppleACPIPCI"]:
elif ioreg.IOObjectConformsTo(entry, "IOPCIBridge".encode()):
pass
else:
# There's something in between that's not PCI! Abort
paths = []
break
entry = entry.parent
parent = ioreg.IORegistryEntryGetParentEntry(entry, "IOService".encode(), None)[1]
if entry != original_entry:
ioreg.IOObjectRelease(entry)
entry = parent
self.pci_path = "/".join(reversed(paths))
@@ -105,10 +114,25 @@ class WirelessCard(PCIDevice):
chipset: enum.Enum = field(init=False)
def __post_init__(self):
system_profiler = plistlib.loads(subprocess.run("system_profiler -xml SPAirPortDataType".split(), stdout=subprocess.PIPE).stdout)
self.country_code = system_profiler[0]["_items"][0]["spairport_airport_interfaces"][0]["spairport_wireless_country_code"]
self.detect_chipset()
@classmethod
def from_ioregistry(cls, entry: ioreg.io_registry_entry_t, anti_spoof=True):
device = super().from_ioregistry(entry, anti_spoof=anti_spoof)
matching_dict = {
"IOParentMatch": ioreg.corefoundation_to_native(ioreg.IORegistryEntryIDMatching(ioreg.IORegistryEntryGetRegistryEntryID(entry, None)[1])),
"IOProviderClass": "IO80211Interface",
}
interface = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, matching_dict, None)[1]), None)
if interface:
device.country_code = ioreg.IORegistryEntryCreateCFProperty(interface, "IO80211CountryCode", ioreg.kCFAllocatorDefault, ioreg.kNilOptions) # type: ignore # If not present, will be None anyways
else:
device.country_code = None # type: ignore
return device
def detect_chipset(self):
raise NotImplementedError
@@ -118,7 +142,7 @@ class NVMeController(PCIDevice):
CLASS_CODE: ClassVar[int] = 0x010802
aspm: Optional[int] = None
parent_aspm: Optional[int] = None
# parent_aspm: Optional[int] = None
@dataclass
@@ -267,12 +291,11 @@ class Computer:
wifi: Optional[WirelessCard] = None
cpu: Optional[CPU] = None
oclp_version: Optional[str] = None
ioregistry: Optional[ioreg.IOReg] = None
opencore_version: Optional[str] = None
@staticmethod
def probe():
computer = Computer()
computer.ioregistry = ioreg.IOReg()
computer.gpu_probe()
computer.dgpu_probe()
computer.igpu_probe()
@@ -284,15 +307,20 @@ class Computer:
def gpu_probe(self):
# Chain together two iterators: one for class code 00000300, the other for class code 00800300
devices = itertools.chain(self.ioregistry.find(property=("class-code", binascii.a2b_hex("00000300"))), self.ioregistry.find(property=("class-code", binascii.a2b_hex("00800300"))))
devices = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault, {"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": [{"class-code": binascii.a2b_hex("00000300")}, {"class-code": binascii.a2b_hex("00800300")}]}, None
)[1]
)
for device in devices:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor:
self.gpus.append(vendor.from_ioregistry(device)) # type: ignore
ioreg.IOObjectRelease(device)
def dgpu_probe(self):
device = next(self.ioregistry.find(name="GFX0"), None)
device = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceNameMatching("GFX0".encode()), None)[1]), None)
if not device:
# No devices
return
@@ -300,9 +328,10 @@ class Computer:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor:
self.dgpu = vendor.from_ioregistry(device) # type: ignore
ioreg.IOObjectRelease(device)
def igpu_probe(self):
device = next(self.ioregistry.find(name="IGPU"), None)
device = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceNameMatching("IGPU".encode()), None)[1]), None)
if not device:
# No devices
return
@@ -310,63 +339,63 @@ class Computer:
vendor: Type[GPU] = PCIDevice.from_ioregistry(device).vendor_detect(inherits=GPU) # type: ignore
if vendor:
self.igpu = vendor.from_ioregistry(device) # type: ignore
ioreg.IOObjectRelease(device)
def wifi_probe(self):
# result = subprocess.run("ioreg -r -c IOPCIDevice -a -d2".split(), stdout=subprocess.PIPE).stdout.strip()
devices = self.ioregistry.find(property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(WirelessCard.CLASS_CODE)[2:].zfill(8)))))
# if not result:
# # No devices
# print("A")
# return
# devices = plistlib.loads(result)
# devices = [i for i in devices if i["class-code"] == binascii.a2b_hex("00800200")]
# if not devices:
# # No devices
# print("B")
# return
devices = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault,
{"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": {"class-code": binascii.a2b_hex(Utilities.hexswap(hex(WirelessCard.CLASS_CODE)[2:].zfill(8)))}},
None,
)[1]
)
for device in devices:
vendor: Type[WirelessCard] = PCIDevice.from_ioregistry(device, anti_spoof=True).vendor_detect(inherits=WirelessCard) # type: ignore
if vendor:
self.wifi = vendor.from_ioregistry(device, anti_spoof=True) # type: ignore
break
ioreg.IOObjectRelease(device)
def storage_probe(self):
sata_controllers = self.ioregistry.find(entry_class="IOPCIDevice", property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(SATAController.CLASS_CODE)[2:].zfill(8)))))
nvme_controllers = itertools.chain.from_iterable(
[
# self.ioregistry.find(entry_class="IOPCIDevice", property=("class-code", binascii.a2b_hex(Utilities.hexswap(hex(NVMeController.CLASS_CODE)[2:].zfill(8))))),
self.ioregistry.find(entry_class="IOPCIDevice", children={"entry_class": "IONVMeController"}),
]
sata_controllers = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault,
{"IOProviderClass": "IOPCIDevice", "IOPropertyMatch": [{"class-code": binascii.a2b_hex(Utilities.hexswap(hex(SATAController.CLASS_CODE)[2:].zfill(8)))}]},
None,
)[1]
)
nvme_controllers = ioreg.ioiterator_to_list(
ioreg.IOServiceGetMatchingServices(
ioreg.kIOMasterPortDefault, {"IOProviderClass": "IONVMeController", "IOParentMatch": {"IOProviderClass": "IOPCIDevice"}, "IOPropertyMatch": {"IOClass": "IONVMeController"}}, None
)[1]
)
for device in sata_controllers:
self.storage.append(SATAController.from_ioregistry(device))
ioreg.IOObjectRelease(device)
for device in nvme_controllers:
aspm = device.properties.get("pci-aspm-default", 0)
parent = ioreg.IORegistryEntryGetParentEntry(device, "IOService".encode(), None)[1]
ioreg.IOObjectRelease(device)
aspm: Union[int, bytes] = ioreg.IORegistryEntryCreateCFProperty(parent, "pci-aspm-default", ioreg.kCFAllocatorDefault, ioreg.kNilOptions) or 0 # type: ignore
if isinstance(aspm, bytes):
aspm = int.from_bytes(aspm, byteorder="little")
if device.parent.parent.entry_class == "IOPCIDevice":
parent_aspm = device.parent.parent.properties.get("pci-aspm-default", 0)
if isinstance(parent_aspm, bytes):
parent_aspm = int.from_bytes(parent_aspm, byteorder="little")
else:
parent_aspm = None
controller = NVMeController.from_ioregistry(device)
controller = NVMeController.from_ioregistry(parent)
controller.aspm = aspm
controller.parent_aspm = parent_aspm
if controller.vendor_id != 0x106B:
self.storage.append(controller)
ioreg.IOObjectRelease(parent)
def smbios_probe(self):
# Reported model
entry = next(self.ioregistry.find(name="Root")).children[0]
self.reported_model = entry.properties["model"].strip(b"\0").decode()
self.reported_board_id = entry.properties.get("board-id", entry.properties.get("target-type", b"")).strip(b"\0").decode()
entry = next(ioreg.ioiterator_to_list(ioreg.IOServiceGetMatchingServices(ioreg.kIOMasterPortDefault, ioreg.IOServiceMatching("IOPlatformExpertDevice".encode()), None)[1]))
self.reported_model = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperty(entry, "model", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)).strip(b"\0").decode() # type: ignore
self.reported_board_id = ioreg.corefoundation_to_native(ioreg.IORegistryEntryCreateCFProperty(entry, "model", ioreg.kCFAllocatorDefault, ioreg.kNilOptions)).strip(b"\0").decode() # type: ignore
ioreg.IOObjectRelease(entry)
# Real model
# TODO: We previously had logic for OC users using iMacPro1,1 with incorrect ExposeSensitiveData. Add logic?
@@ -375,6 +404,7 @@ class Computer:
# OCLP version
self.oclp_version = Utilities.get_nvram("OCLP-Version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
self.opencore_version = Utilities.get_nvram("opencore-version", "4D1FDA02-38C7-4A6A-9CC6-4BCCA8B30102", decode=True)
def cpu_probe(self):
self.cpu = CPU(