mirror of
https://github.com/wavestone-cdt/EDRSandblast.git
synced 2026-06-08 16:37:12 +00:00
ExtractOffsets.py: handle new offsets & duplicate PEs on MS servers
This commit is contained in:
+70
-51
@@ -11,6 +11,7 @@ import threading
|
||||
|
||||
from lightpdbparser import Pdb
|
||||
|
||||
from pefile import PE, DIRECTORY_ENTRY, PEFormatError
|
||||
|
||||
THREADS_LIMIT = None
|
||||
CSVLock = threading.Lock()
|
||||
@@ -19,12 +20,36 @@ machineType = dict(x86=332, x64=34404)
|
||||
knownImageVersions = dict(ntoskrnl=list(), wdigest=list(), ci=list())
|
||||
extensions_by_mode = dict(ntoskrnl="exe", wdigest="dll", ci="dll")
|
||||
|
||||
symbols = dict(
|
||||
ntoskrnl=[
|
||||
("PspCreateProcessNotifyRoutine", "symbol"),
|
||||
("PspCreateThreadNotifyRoutine", "symbol"),
|
||||
("PspLoadImageNotifyRoutine", "symbol"),
|
||||
("_EPROCESS", "Protection", "field"),
|
||||
("EtwThreatIntProvRegHandle", "symbol"),
|
||||
("_ETW_REG_ENTRY", "GuidEntry", "field"),
|
||||
("_ETW_GUID_ENTRY", "ProviderEnableInfo", "field"),
|
||||
("PsProcessType", "symbol"),
|
||||
("PsThreadType", "symbol"),
|
||||
("_OBJECT_TYPE", "CallbackList", "field"),
|
||||
("SeCiCallbacks", "symbol"),
|
||||
],
|
||||
wdigest=[
|
||||
("g_fParameter_UseLogonCredential", "symbol"),
|
||||
("g_IsCredGuardEnabled", "symbol"),
|
||||
],
|
||||
ci=[
|
||||
("g_CiOptions", "symbol"),
|
||||
("CiValidateImageHeader", "symbol"),
|
||||
],
|
||||
)
|
||||
|
||||
def find(key, value):
|
||||
for k, v in value.items():
|
||||
|
||||
def find(key: str, d: dict):
|
||||
for k, v in d.items():
|
||||
if k == key:
|
||||
return v
|
||||
elif isinstance(v, dict):
|
||||
if isinstance(v, dict):
|
||||
return find(key, v)
|
||||
return None
|
||||
|
||||
@@ -51,11 +76,11 @@ def downloadSpecificFile(entry, pe_basename, pe_ext, knownPEVersions, output_fol
|
||||
# printl('No machine Type', lock)
|
||||
return "SKIP"
|
||||
virtual_size = entry["fileInfo"]["virtualSize"]
|
||||
file_id = hex(timestamp).replace("0x", "").zfill(8).upper() + hex(virtual_size).replace("0x", "")
|
||||
file_id = hex(timestamp).replace("0x", "").zfill(8).upper() + hex(virtual_size).replace("0x", "").upper()
|
||||
url = "https://msdl.microsoft.com/download/symbols/" + pe_name + "/" + file_id + "/" + pe_name
|
||||
try:
|
||||
version = entry["fileInfo"]["version"].split(" ")[0]
|
||||
except:
|
||||
except KeyError:
|
||||
version = find("version", entry).split(" ")[0]
|
||||
if version and version.count(".") != 3:
|
||||
version = None
|
||||
@@ -81,6 +106,9 @@ def downloadSpecificFile(entry, pe_basename, pe_ext, knownPEVersions, output_fol
|
||||
# printl(f'[*] Downloading {pe_name} version {version}... ', lock)
|
||||
try:
|
||||
peContent = get(url)
|
||||
if len(peContent.content) == 0:
|
||||
printl(f"[*] Skipping {output_file_path} which is empty on MS server", lock)
|
||||
return "SKIP"
|
||||
with open(output_file_path, "wb") as f:
|
||||
f.write(peContent.content)
|
||||
printl(
|
||||
@@ -146,9 +174,6 @@ def get_field_offset(symbols_info, field_name):
|
||||
return 0
|
||||
|
||||
|
||||
from pefile import PE, DIRECTORY_ENTRY, PEFormatError
|
||||
|
||||
|
||||
def get_file_version(path):
|
||||
pe = PE(path, fast_load=True)
|
||||
pe.parse_data_directories(directories=[DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_RESOURCE"]])
|
||||
@@ -237,41 +262,48 @@ def extractOffsets(input_file, output_file, mode):
|
||||
|
||||
if imageVersion in knownImageVersions[imageType]:
|
||||
print(f"[*] Skipping known {imageType} version {imageVersion} (file: {input_file})")
|
||||
return
|
||||
try:
|
||||
"""
|
||||
Sometimes, PEs with different versions have the same exact code (only the version and the signature
|
||||
change). They even have the same PE "timestamp" (which is a build hash in reality) and same virtual
|
||||
size. So the download links on MS servers are the same. That's why winbindex sometimes shows the
|
||||
metadata of one version of a PE, and the downloaded version is different.
|
||||
Anyway, the offsets are identical between versions affected by these "collisions".
|
||||
"""
|
||||
input_file_basename = os.path.basename(input_file)
|
||||
if not input_file_basename.startswith(f"{imageType}_"):
|
||||
return
|
||||
if not input_file_basename.endswith(f".{extension}"):
|
||||
return
|
||||
if not input_file_basename.count("-") == 1:
|
||||
return
|
||||
version_2, version_3 = tuple(
|
||||
int(part)
|
||||
for part in input_file_basename[len(f"{imageType}_") : -len(f".{extension}")].split("-")
|
||||
)
|
||||
imageVersion = input_file_basename
|
||||
if imageVersion in knownImageVersions[imageType]:
|
||||
return
|
||||
print("\r", end="") # Not skipping after all
|
||||
except ValueError:
|
||||
return
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
# print(f'[*] Processing {imageType} version {imageVersion} (file: {input_file})')
|
||||
# download the PDB if needed
|
||||
pdb_path, pdb_content = get_pdb(pe, input_file, verbose=True)
|
||||
# dump all symbols
|
||||
|
||||
pdb = Pdb(path=pdb_path, content=pdb_content)
|
||||
|
||||
if imageType == "ntoskrnl":
|
||||
symbols = [
|
||||
("PspCreateProcessNotifyRoutine", pdb.get_symbol_offset),
|
||||
("PspCreateThreadNotifyRoutine", pdb.get_symbol_offset),
|
||||
("PspLoadImageNotifyRoutine", pdb.get_symbol_offset),
|
||||
("_EPROCESS", "Protection", pdb.get_field_offset),
|
||||
("EtwThreatIntProvRegHandle", pdb.get_symbol_offset),
|
||||
("_ETW_REG_ENTRY", "GuidEntry", pdb.get_field_offset),
|
||||
("_ETW_GUID_ENTRY", "ProviderEnableInfo", pdb.get_field_offset),
|
||||
("PsProcessType", pdb.get_symbol_offset),
|
||||
("PsThreadType", pdb.get_symbol_offset),
|
||||
("_OBJECT_TYPE", "CallbackList", pdb.get_field_offset),
|
||||
]
|
||||
elif imageType == "wdigest":
|
||||
symbols = [
|
||||
("g_fParameter_UseLogonCredential", pdb.get_symbol_offset),
|
||||
("g_IsCredGuardEnabled", pdb.get_symbol_offset),
|
||||
]
|
||||
elif imageType == "ci":
|
||||
symbols = [
|
||||
("g_CiOptions", pdb.get_symbol_offset),
|
||||
]
|
||||
else:
|
||||
raise ValueError(f"Incorrect image type {imageType}")
|
||||
|
||||
symbols_values = list()
|
||||
for *symbol_name, get_offset in symbols:
|
||||
for *symbol_name, offset_type in symbols[imageType]:
|
||||
if offset_type == "symbol":
|
||||
get_offset = pdb.get_symbol_offset
|
||||
elif offset_type == "field":
|
||||
get_offset = pdb.get_field_offset
|
||||
else:
|
||||
raise ValueError
|
||||
symbol_value = get_offset(*symbol_name)
|
||||
if symbol_value is None:
|
||||
symbol_value = 0
|
||||
@@ -375,16 +407,7 @@ if __name__ == "__main__":
|
||||
print(f'[+] Loaded {len(knownImageVersions[mode])} known {mode} versions from "{args.output}"')
|
||||
else:
|
||||
with open(args.output, "w") as output:
|
||||
if mode == "ntoskrnl":
|
||||
output.write(
|
||||
"ntoskrnlVersion,PspCreateProcessNotifyRoutineOffset,PspCreateThreadNotifyRoutineOffset,PspLoadImageNotifyRoutineOffset,_PS_PROTECTIONOffset,EtwThreatIntProvRegHandleOffset,EtwRegEntry_GuidEntryOffset,EtwGuidEntry_ProviderEnableInfoOffset,PsProcessType,PsThreadType,CallbackList\n"
|
||||
)
|
||||
elif mode == "wdigest":
|
||||
output.write("wdigestVersion,g_fParameter_UseLogonCredentialOffset,g_IsCredGuardEnabledOffset\n")
|
||||
elif mode == "ci":
|
||||
output.write("g_CiOptionsOffset\n")
|
||||
else:
|
||||
assert False
|
||||
output.write(mode + "Version," + ",".join(elem[0] for elem in symbols[mode]) + "\n")
|
||||
|
||||
# In download mode, an updated list of image versions published will be retrieved from https://winbindex.m417z.com.
|
||||
# The symbols for each version will be downloaded from the Microsoft symbols servers.
|
||||
@@ -397,10 +420,6 @@ if __name__ == "__main__":
|
||||
downloadPEFileFromMS(mode, extension, knownImageVersions[mode], args.input)
|
||||
|
||||
# Extract the offsets from the specified file or the folders containing image files.
|
||||
import time
|
||||
|
||||
s = time.time()
|
||||
extractOffsets(args.input, args.output, mode)
|
||||
e = time.time()
|
||||
print(e - s)
|
||||
|
||||
sortOutputFile(args.output)
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
requests
|
||||
pefile
|
||||
Reference in New Issue
Block a user