mirror of
https://github.com/wavestone-cdt/EDRSandblast.git
synced 2026-06-08 16:37:12 +00:00
4bff81986b
Co-authored-by: Thomas Diot <thomas.diot@wavestone.com>
269 lines
12 KiB
Python
269 lines
12 KiB
Python
import argparse
|
|
import csv
|
|
import os
|
|
|
|
from requests import get
|
|
from gzip import decompress
|
|
from json import loads, dumps
|
|
from subprocess import run
|
|
|
|
import win32api
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import threading
|
|
CSVLock = threading.Lock()
|
|
|
|
machineType = dict(x86=332, x64=34404)
|
|
knownImageVersions = dict(ntoskrnl=list(), wdigest=list())
|
|
extensions_by_mode = dict(ntoskrnl="exe", wdigest="dll")
|
|
|
|
def downloadSpecificFile(entry, pe_basename, pe_ext, knownPEVersions, output_folder):
|
|
pe_name = f'{pe_basename}.{pe_ext}'
|
|
|
|
if 'fileInfo' not in entry:
|
|
# print(f'[!] Entry {pe_hash} has no fileInfo, skipping it.')
|
|
return "SKIP"
|
|
if 'timestamp' not in entry['fileInfo']:
|
|
# print(f'[!] Entry {pe_hash} has no timestamp, skipping it.')
|
|
return "SKIP"
|
|
timestamp = entry['fileInfo']['timestamp']
|
|
if 'virtualSize' not in entry['fileInfo']:
|
|
# print(f'[!] Entry {pe_hash} has no virtualSize, skipping it.')
|
|
return "SKIP"
|
|
if "machineType" not in entry["fileInfo"] or entry["fileInfo"]["machineType"] != machineType["x64"]:
|
|
return "SKIP"
|
|
virtual_size = entry['fileInfo']['virtualSize']
|
|
file_id = hex(timestamp).replace('0x','').zfill(8).upper() + hex(virtual_size).replace('0x','')
|
|
url = 'https://msdl.microsoft.com/download/symbols/' + pe_name + '/' + file_id + '/' + pe_name
|
|
version = entry['fileInfo']['version'].split(' ')[0]
|
|
|
|
# Output file format: <PE>_build-revision.<exe | dll>
|
|
output_version = '-'.join(version.split('.')[-2:])
|
|
output_file = f'{pe_basename}_{output_version}.{pe_ext}'
|
|
|
|
# If the PE version is already known, skip download.
|
|
if output_file in knownPEVersions:
|
|
print(f'[*] Skipping download of known {pe_name} version: {output_file}')
|
|
return "SKIP"
|
|
|
|
output_file_path = os.path.join(output_folder, output_file)
|
|
if os.path.isfile(output_file_path):
|
|
print(f"[*] Skipping {output_file_path} which already exists")
|
|
return "SKIP"
|
|
|
|
print(f'[*] Downloading {pe_name} version {version}... ')
|
|
try:
|
|
peContent = get(url)
|
|
with open(output_file_path, 'wb') as f:
|
|
f.write(peContent.content)
|
|
print(f'[+] Finished download of {pe_name} version {version} (file: {output_file})!')
|
|
return "OK"
|
|
except Exception:
|
|
print(f'[!] ERROR : Could not download {pe_name} version {version} (URL: {url}).')
|
|
return "KO"
|
|
|
|
def dowloadPEFileFromMS(pe_basename, pe_ext, knownPEVersions, output_folder):
|
|
pe_name = f'{pe_basename}.{pe_ext}'
|
|
|
|
print (f'[*] Downloading {pe_name} files!')
|
|
|
|
pe_json_gz = get(f'https://winbindex.m417z.com/data/by_filename_compressed/{pe_name}.json.gz').content
|
|
pe_json = decompress(pe_json_gz)
|
|
pe_list = loads(pe_json)
|
|
|
|
futures = dict()
|
|
with ThreadPoolExecutor() as executor:
|
|
for pe_hash in pe_list:
|
|
entry = pe_list[pe_hash]
|
|
futures[pe_hash] = executor.submit(downloadSpecificFile,entry, pe_basename, pe_ext, knownPEVersions, output_folder)
|
|
for (i,f) in enumerate(futures):
|
|
res = futures[f].result()
|
|
print(f"{i+1}/{len(futures)}", end="\r")
|
|
|
|
def get_symbol_offset(symbols_info, symbol_name):
|
|
for line in symbols_info:
|
|
# sometimes, a "_" is prepended to the symbol name ...
|
|
if line.strip().split(" ")[-1].endswith(symbol_name):
|
|
return int(line.split(" ")[0], 16)
|
|
else:
|
|
return 0
|
|
|
|
def get_field_offset(symbols_info, field_name):
|
|
for line in symbols_info:
|
|
if field_name in line:
|
|
assert "offset" in line
|
|
symbol_offset = int(line.split("+")[-1], 16)
|
|
return symbol_offset
|
|
else:
|
|
return 0
|
|
|
|
def get_file_version(path):
|
|
info = win32api.GetFileVersionInfo(path, '\\')
|
|
ms = info['FileVersionMS']
|
|
ls = info['FileVersionLS']
|
|
return (win32api.HIWORD(ms), win32api.LOWORD(ms),
|
|
win32api.HIWORD(ls), win32api.LOWORD(ls))
|
|
|
|
def extractOffsets(input_file, output_file, mode):
|
|
if os.path.isfile(input_file):
|
|
try:
|
|
# check image type (ntoskrnl, wdigest, etc.)
|
|
r = run(["r2", "-c", "iE", "-qq", input_file], shell=True, capture_output=True)
|
|
for line in r.stdout.decode().splitlines():
|
|
if "ntoskrnl.exe" in line:
|
|
imageType = "ntoskrnl"
|
|
break
|
|
elif "wdigest.dll" in line:
|
|
imageType = "wdigest"
|
|
break
|
|
else:
|
|
print(f"[*] File {input_file} unrecognized")
|
|
return
|
|
|
|
#todo : remove this and make a unique function
|
|
if mode != imageType:
|
|
print(f"[*] Skipping {input_file} since we are in {mode} mode")
|
|
return
|
|
# dump version number
|
|
"""
|
|
r = run(["r2", "-c", "iV", "-qq", input_file], shell=True, capture_output=True)
|
|
for line in r.stdout.decode().splitlines():
|
|
line = line.strip()
|
|
if line.startswith("FileVersion:"):
|
|
full_version = [int(frag) for frag in line.split(" ")[-1].split(".")]
|
|
break
|
|
else:
|
|
assert(False)
|
|
"""
|
|
if os.path.sep not in input_file:
|
|
input_file = "." + os.path.sep + input_file
|
|
full_version = get_file_version(input_file)
|
|
|
|
# Checks if the image version is already present in the CSV
|
|
extension = extensions_by_mode[imageType]
|
|
imageVersion = f'{imageType}_{full_version[2]}-{full_version[3]}.{extension}'
|
|
|
|
if imageVersion in knownImageVersions[imageType]:
|
|
print(f'[*] Skipping known {imageType} version {imageVersion} (file: {input_file})')
|
|
return
|
|
|
|
|
|
print(f'[*] Processing {imageType} version {imageVersion} (file: {input_file})')
|
|
# download the PDB if needed
|
|
r = run(["r2", "-c", "idpd", "-qq", input_file], shell=True, capture_output=True)
|
|
# dump all symbols
|
|
r = run(["r2", "-c", "idpi", "-qq", '-B', '0', input_file], shell=True, capture_output=True)
|
|
all_symbols_info = [line.strip() for line in r.stdout.decode().splitlines()]
|
|
|
|
if imageType == "ntoskrnl":
|
|
symbols = [("PspCreateProcessNotifyRoutine",get_symbol_offset),
|
|
("PspCreateThreadNotifyRoutine",get_symbol_offset),
|
|
("PspLoadImageNotifyRoutine", get_symbol_offset),
|
|
('_PS_PROTECTION Protection', get_field_offset),
|
|
("EtwThreatIntProvRegHandle", get_symbol_offset),
|
|
('_ETW_GUID_ENTRY* GuidEntry', get_field_offset),
|
|
('_TRACE_ENABLE_INFO ProviderEnableInfo', get_field_offset)]
|
|
elif imageType == "wdigest":
|
|
symbols = [
|
|
("g_fParameter_UseLogonCredential",get_symbol_offset),
|
|
("g_IsCredGuardEnabled",get_symbol_offset)
|
|
]
|
|
|
|
|
|
symbols_values = list()
|
|
for symbol_name, get_offset in symbols:
|
|
symbol_value = get_offset(all_symbols_info, symbol_name)
|
|
symbols_values.append(symbol_value)
|
|
#print(f"[+] {symbol_name} = {hex(symbol_value)}")
|
|
|
|
with CSVLock:
|
|
with open(output_file, 'a') as output:
|
|
output.write(f'{imageVersion},{",".join(hex(val).replace("0x","") for val in symbols_values)}\n')
|
|
|
|
#print("wrote into CSV !")
|
|
|
|
knownImageVersions[imageType].append(imageVersion)
|
|
|
|
print(f'[+] Finished processing of {imageType} {input_file}!')
|
|
|
|
except Exception as e:
|
|
print(f'[!] ERROR : Could not process file {input_file}.')
|
|
print(f'[!] Error message: {e}')
|
|
print(f'[!] If error is of the like of "\'NoneType\' object has no attribute \'group\'", kernel callbacks may not be supported by this version.')
|
|
|
|
elif os.path.isdir(input_file):
|
|
print(f'[*] Processing folder: {input_file}')
|
|
with ThreadPoolExecutor() as extractorPool:
|
|
args = [(os.path.join(input_file, file), output_file, mode) for file in os.listdir(input_file)]
|
|
for (i,res) in enumerate(extractorPool.map(extractOffsets, *zip(*args))):
|
|
print(f"{i+1}/{len(args)}", end="\r")
|
|
print(f'[+] Finished processing of folder {input_file}!')
|
|
|
|
else:
|
|
print(f'[!] ERROR : The specified input {input_file} is neither a file nor a directory.')
|
|
|
|
|
|
|
|
def loadOffsetsFromCSV(loadedVersions, CSVPath):
|
|
print(f'[*] Loading the known known PE versions from "{CSVPath}".')
|
|
|
|
with open(CSVPath, "r") as csvFile:
|
|
csvReader = csv.reader(csvFile, delimiter=',')
|
|
next(csvReader)
|
|
for peLine in csvReader:
|
|
loadedVersions.append(peLine[0])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument('mode', help='ntoskrnl or wdigest. Mode to download and extract offsets for either ntoskrnl or wdigest')
|
|
parser.add_argument('-i', '--input', dest='input', required=True,
|
|
help='Single file or directory containing ntoskrnl.exe / wdigest.dll to extract offsets from. If in dowload mode, the PE downloaded from MS symbols servers will be placed in this folder.')
|
|
parser.add_argument('-o', '--output', dest='output',
|
|
help='CSV file to write offsets to. If the specified file already exists, only new ntoskrnl versions will be downloaded / analyzed. Defaults to NtoskrnlOffsets.csv / WdigestOffsets.csv in the current folder.')
|
|
parser.add_argument('-d', '--dowload', dest='dowload', action='store_true',
|
|
help='Flag to download the PE from Microsoft servers using list of versions from winbindex.m417z.com.')
|
|
|
|
args = parser.parse_args()
|
|
mode = args.mode
|
|
if mode not in knownImageVersions:
|
|
print(f'[!] ERROR : unsupported mode "{args.mode}", supported mode are: "ntoskrnl" and "wdigest"')
|
|
exit(1)
|
|
|
|
# check R2 version
|
|
output = run(["r2", "-V"], shell=True, capture_output=True).stdout.decode()
|
|
ma,me,mi = map(int, output.splitlines()[0].split(" ")[0].split("."))
|
|
if (ma, me, mi) < (5,4,3):
|
|
print("WARNING : This script has been tested with radare2 5.4.3 (works) and 4.3.1 (does NOT work)")
|
|
print(f"You have version {ma}.{me}.{mi}, if is does not work correctly, meaning most of the offsets are not found (i.e. 0), check radare2's 'idpi' command output and modify get_symbol_offset() & get_field_offset() to parse symbols correctly")
|
|
input("Press enter to continue")
|
|
|
|
|
|
# If the output file exists, load the already analyzed image versions.
|
|
# Otherwise, write CSV headers to the new file.
|
|
if not args.output:
|
|
args.output = mode.capitalize() + 'Offsets.csv'
|
|
if os.path.isfile(args.output):
|
|
loadOffsetsFromCSV(knownImageVersions[mode], args.output)
|
|
print(f'[+] Loaded {len(knownImageVersions[mode])} known {mode} versions from "{args.output}"')
|
|
else:
|
|
with open(args.output, 'w') as output:
|
|
if mode == "ntoskrnl":
|
|
output.write('ntoskrnlVersion,PspCreateProcessNotifyRoutineOffset,PspCreateThreadNotifyRoutineOffset,PspLoadImageNotifyRoutineOffset,_PS_PROTECTIONOffset,EtwThreatIntProvRegHandleOffset,EtwRegEntry_GuidEntryOffset,EtwGuidEntry_ProviderEnableInfoOffset\n')
|
|
elif mode == "wdigest":
|
|
output.write('wdigestVersion,g_fParameter_UseLogonCredentialOffset,g_IsCredGuardEnabledOffset\n')
|
|
else:
|
|
assert False
|
|
# In download mode, an updated list of image versions published will be retrieved from https://winbindex.m417z.com.
|
|
# The symbols for each version will be dowloaded from the Microsoft symbols servers.
|
|
# Only new versions will be downloaded if the specified output file already contains offsets.
|
|
if (args.dowload):
|
|
if not os.path.isdir(args.input):
|
|
print('[!] ERROR : in download mode, -i / --input option must specify a folder')
|
|
exit(1)
|
|
extension = extensions_by_mode[mode]
|
|
dowloadPEFileFromMS(mode, extension, knownImageVersions[mode], args.input)
|
|
|
|
# Extract the offsets from the specified file or the folders containing image files.
|
|
extractOffsets(args.input, args.output, mode)
|