mirror of
https://github.com/wavestone-cdt/EDRSandblast.git
synced 2026-06-08 16:37:12 +00:00
Initial commit for public version
Co-authored-by: Thomas Diot <thomas.diot@wavestone.com>
This commit is contained in:
@@ -0,0 +1,268 @@
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
|
||||
from requests import get
|
||||
from gzip import decompress
|
||||
from json import loads, dumps
|
||||
from subprocess import run
|
||||
|
||||
import win32api
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import threading
|
||||
CSVLock = threading.Lock()
|
||||
|
||||
machineType = dict(x86=332, x64=34404)
|
||||
knownImageVersions = dict(ntoskrnl=list(), wdigest=list())
|
||||
extensions_by_mode = dict(ntoskrnl="exe", wdigest="dll")
|
||||
|
||||
def downloadSpecificFile(entry, pe_basename, pe_ext, knownPEVersions, output_folder):
|
||||
pe_name = f'{pe_basename}.{pe_ext}'
|
||||
|
||||
if 'fileInfo' not in entry:
|
||||
# print(f'[!] Entry {pe_hash} has no fileInfo, skipping it.')
|
||||
return "SKIP"
|
||||
if 'timestamp' not in entry['fileInfo']:
|
||||
# print(f'[!] Entry {pe_hash} has no timestamp, skipping it.')
|
||||
return "SKIP"
|
||||
timestamp = entry['fileInfo']['timestamp']
|
||||
if 'virtualSize' not in entry['fileInfo']:
|
||||
# print(f'[!] Entry {pe_hash} has no virtualSize, skipping it.')
|
||||
return "SKIP"
|
||||
if "machineType" not in entry["fileInfo"] or entry["fileInfo"]["machineType"] != machineType["x64"]:
|
||||
return "SKIP"
|
||||
virtual_size = entry['fileInfo']['virtualSize']
|
||||
file_id = hex(timestamp).replace('0x','').zfill(8).upper() + hex(virtual_size).replace('0x','')
|
||||
url = 'https://msdl.microsoft.com/download/symbols/' + pe_name + '/' + file_id + '/' + pe_name
|
||||
version = entry['fileInfo']['version'].split(' ')[0]
|
||||
|
||||
# Output file format: <PE>_build-revision.<exe | dll>
|
||||
output_version = '-'.join(version.split('.')[-2:])
|
||||
output_file = f'{pe_basename}_{output_version}.{pe_ext}'
|
||||
|
||||
# If the PE version is already known, skip download.
|
||||
if output_file in knownPEVersions:
|
||||
print(f'[*] Skipping download of known {pe_name} version: {output_file}')
|
||||
return "SKIP"
|
||||
|
||||
output_file_path = os.path.join(output_folder, output_file)
|
||||
if os.path.isfile(output_file_path):
|
||||
print(f"[*] Skipping {output_file_path} which already exists")
|
||||
return "SKIP"
|
||||
|
||||
print(f'[*] Downloading {pe_name} version {version}... ')
|
||||
try:
|
||||
peContent = get(url)
|
||||
with open(output_file_path, 'wb') as f:
|
||||
f.write(peContent.content)
|
||||
print(f'[+] Finished download of {pe_name} version {version} (file: {output_file})!')
|
||||
return "OK"
|
||||
except Exception:
|
||||
print(f'[!] ERROR : Could not download {pe_name} version {version} (URL: {url}).')
|
||||
return "KO"
|
||||
|
||||
def dowloadPEFileFromMS(pe_basename, pe_ext, knownPEVersions, output_folder):
|
||||
pe_name = f'{pe_basename}.{pe_ext}'
|
||||
|
||||
print (f'[*] Downloading {pe_name} files!')
|
||||
|
||||
pe_json_gz = get(f'https://winbindex.m417z.com/data/by_filename_compressed/{pe_name}.json.gz').content
|
||||
pe_json = decompress(pe_json_gz)
|
||||
pe_list = loads(pe_json)
|
||||
|
||||
futures = dict()
|
||||
with ThreadPoolExecutor() as executor:
|
||||
for pe_hash in pe_list:
|
||||
entry = pe_list[pe_hash]
|
||||
futures[pe_hash] = executor.submit(downloadSpecificFile,entry, pe_basename, pe_ext, knownPEVersions, output_folder)
|
||||
for (i,f) in enumerate(futures):
|
||||
res = futures[f].result()
|
||||
print(f"{i+1}/{len(futures)}", end="\r")
|
||||
|
||||
def get_symbol_offset(symbols_info, symbol_name):
|
||||
for line in symbols_info:
|
||||
# sometimes, a "_" is prepended to the symbol name ...
|
||||
if line.strip().split(" ")[-1].endswith(symbol_name):
|
||||
return int(line.split(" ")[0], 16)
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_field_offset(symbols_info, field_name):
|
||||
for line in symbols_info:
|
||||
if field_name in line:
|
||||
assert "offset" in line
|
||||
symbol_offset = int(line.split("+")[-1], 16)
|
||||
return symbol_offset
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_file_version(path):
|
||||
info = win32api.GetFileVersionInfo(path, '\\')
|
||||
ms = info['FileVersionMS']
|
||||
ls = info['FileVersionLS']
|
||||
return (win32api.HIWORD(ms), win32api.LOWORD(ms),
|
||||
win32api.HIWORD(ls), win32api.LOWORD(ls))
|
||||
|
||||
def extractOffsets(input_file, output_file, mode):
|
||||
if os.path.isfile(input_file):
|
||||
try:
|
||||
# check image type (ntoskrnl, wdigest, etc.)
|
||||
r = run(["r2", "-c", "iE", "-qq", input_file], shell=True, capture_output=True)
|
||||
for line in r.stdout.decode().splitlines():
|
||||
if "ntoskrnl.exe" in line:
|
||||
imageType = "ntoskrnl"
|
||||
break
|
||||
elif "wdigest.dll" in line:
|
||||
imageType = "wdigest"
|
||||
break
|
||||
else:
|
||||
print(f"[*] File {input_file} unrecognized")
|
||||
return
|
||||
|
||||
#todo : remove this and make a unique function
|
||||
if mode != imageType:
|
||||
print(f"[*] Skipping {input_file} since we are in {mode} mode")
|
||||
return
|
||||
# dump version number
|
||||
"""
|
||||
r = run(["r2", "-c", "iV", "-qq", input_file], shell=True, capture_output=True)
|
||||
for line in r.stdout.decode().splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("FileVersion:"):
|
||||
full_version = [int(frag) for frag in line.split(" ")[-1].split(".")]
|
||||
break
|
||||
else:
|
||||
assert(False)
|
||||
"""
|
||||
if os.path.sep not in input_file:
|
||||
input_file = "." + os.path.sep + input_file
|
||||
full_version = get_file_version(input_file)
|
||||
|
||||
# Checks if the image version is already present in the CSV
|
||||
extension = extensions_by_mode[imageType]
|
||||
imageVersion = f'{imageType}_{full_version[2]}-{full_version[3]}.{extension}'
|
||||
|
||||
if imageVersion in knownImageVersions[imageType]:
|
||||
print(f'[*] Skipping known {imageType} version {imageVersion} (file: {input_file})')
|
||||
return
|
||||
|
||||
|
||||
print(f'[*] Processing {imageType} version {imageVersion} (file: {input_file})')
|
||||
# download the PDB if needed
|
||||
r = run(["r2", "-c", "idpd", "-qq", input_file], shell=True, capture_output=True)
|
||||
# dump all symbols
|
||||
r = run(["r2", "-c", "idpi", "-qq", '-B', '0', input_file], shell=True, capture_output=True)
|
||||
all_symbols_info = [line.strip() for line in r.stdout.decode().splitlines()]
|
||||
|
||||
if imageType == "ntoskrnl":
|
||||
symbols = [("PspCreateProcessNotifyRoutine",get_symbol_offset),
|
||||
("PspCreateThreadNotifyRoutine",get_symbol_offset),
|
||||
("PspLoadImageNotifyRoutine", get_symbol_offset),
|
||||
('_PS_PROTECTION Protection', get_field_offset),
|
||||
("EtwThreatIntProvRegHandle", get_symbol_offset),
|
||||
('_ETW_GUID_ENTRY* GuidEntry', get_field_offset),
|
||||
('_TRACE_ENABLE_INFO ProviderEnableInfo', get_field_offset)]
|
||||
elif imageType == "wdigest":
|
||||
symbols = [
|
||||
("g_fParameter_UseLogonCredential",get_symbol_offset),
|
||||
("g_IsCredGuardEnabled",get_symbol_offset)
|
||||
]
|
||||
|
||||
|
||||
symbols_values = list()
|
||||
for symbol_name, get_offset in symbols:
|
||||
symbol_value = get_offset(all_symbols_info, symbol_name)
|
||||
symbols_values.append(symbol_value)
|
||||
#print(f"[+] {symbol_name} = {hex(symbol_value)}")
|
||||
|
||||
with CSVLock:
|
||||
with open(output_file, 'a') as output:
|
||||
output.write(f'{imageVersion},{",".join(hex(val).replace("0x","") for val in symbols_values)}\n')
|
||||
|
||||
#print("wrote into CSV !")
|
||||
|
||||
knownImageVersions[imageType].append(imageVersion)
|
||||
|
||||
print(f'[+] Finished processing of {imageType} {input_file}!')
|
||||
|
||||
except Exception as e:
|
||||
print(f'[!] ERROR : Could not process file {input_file}.')
|
||||
print(f'[!] Error message: {e}')
|
||||
print(f'[!] If error is of the like of "\'NoneType\' object has no attribute \'group\'", kernel callbacks may not be supported by this version.')
|
||||
|
||||
elif os.path.isdir(input_file):
|
||||
print(f'[*] Processing folder: {input_file}')
|
||||
with ThreadPoolExecutor() as extractorPool:
|
||||
args = [(os.path.join(input_file, file), output_file, mode) for file in os.listdir(input_file)]
|
||||
for (i,res) in enumerate(extractorPool.map(extractOffsets, *zip(*args))):
|
||||
print(f"{i+1}/{len(args)}", end="\r")
|
||||
print(f'[+] Finished processing of folder {input_file}!')
|
||||
|
||||
else:
|
||||
print(f'[!] ERROR : The specified input {input_file} is neither a file nor a directory.')
|
||||
|
||||
|
||||
|
||||
def loadOffsetsFromCSV(loadedVersions, CSVPath):
|
||||
print(f'[*] Loading the known known PE versions from "{CSVPath}".')
|
||||
|
||||
with open(CSVPath, "r") as csvFile:
|
||||
csvReader = csv.reader(csvFile, delimiter=',')
|
||||
next(csvReader)
|
||||
for peLine in csvReader:
|
||||
loadedVersions.append(peLine[0])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('mode', help='ntoskrnl or wdigest. Mode to download and extract offsets for either ntoskrnl or wdigest')
|
||||
parser.add_argument('-i', '--input', dest='input', required=True,
|
||||
help='Single file or directory containing ntoskrnl.exe / wdigest.dll to extract offsets from. If in dowload mode, the PE downloaded from MS symbols servers will be placed in this folder.')
|
||||
parser.add_argument('-o', '--output', dest='output',
|
||||
help='CSV file to write offsets to. If the specified file already exists, only new ntoskrnl versions will be downloaded / analyzed. Defaults to NtoskrnlOffsets.csv / WdigestOffsets.csv in the current folder.')
|
||||
parser.add_argument('-d', '--dowload', dest='dowload', action='store_true',
|
||||
help='Flag to download the PE from Microsoft servers using list of versions from winbindex.m417z.com.')
|
||||
|
||||
args = parser.parse_args()
|
||||
mode = args.mode
|
||||
if mode not in knownImageVersions:
|
||||
print(f'[!] ERROR : unsupported mode "{args.mode}", supported mode are: "ntoskrnl" and "wdigest"')
|
||||
exit(1)
|
||||
|
||||
# check R2 version
|
||||
output = run(["r2", "-V"], shell=True, capture_output=True).stdout.decode()
|
||||
ma,me,mi = map(int, output.splitlines()[0].split(" ")[0].split("."))
|
||||
if (ma, me, mi) < (5,4,3):
|
||||
print("WARNING : This script has been tested with radare2 5.4.3 (works) and 4.3.1 (does NOT work)")
|
||||
print(f"You have version {ma}.{me}.{mi}, if is does not work correctly, meaning most of the offsets are not found (i.e. 0), check radare2's 'idpi' command output and modify get_symbol_offset() & get_field_offset() to parse symbols correctly")
|
||||
input("Press enter to continue")
|
||||
|
||||
|
||||
# If the output file exists, load the already analyzed image versions.
|
||||
# Otherwise, write CSV headers to the new file.
|
||||
if not args.output:
|
||||
args.output = mode.capitalize() + 'Offsets.csv'
|
||||
if os.path.isfile(args.output):
|
||||
loadOffsetsFromCSV(knownImageVersions[mode], args.output)
|
||||
print(f'[+] Loaded {len(knownImageVersions[mode])} known {mode} versions from "{args.output}"')
|
||||
else:
|
||||
with open(args.output, 'w') as output:
|
||||
if mode == "ntoskrnl":
|
||||
output.write('ntoskrnlVersion,PspCreateProcessNotifyRoutineOffset,PspCreateThreadNotifyRoutineOffset,PspLoadImageNotifyRoutineOffset,_PS_PROTECTIONOffset,EtwThreatIntProvRegHandleOffset,EtwRegEntry_GuidEntryOffset,EtwGuidEntry_ProviderEnableInfoOffset\n')
|
||||
elif mode == "wdigest":
|
||||
output.write('wdigestVersion,g_fParameter_UseLogonCredentialOffset,g_IsCredGuardEnabledOffset\n')
|
||||
else:
|
||||
assert False
|
||||
# In download mode, an updated list of image versions published will be retrieved from https://winbindex.m417z.com.
|
||||
# The symbols for each version will be dowloaded from the Microsoft symbols servers.
|
||||
# Only new versions will be downloaded if the specified output file already contains offsets.
|
||||
if (args.dowload):
|
||||
if not os.path.isdir(args.input):
|
||||
print('[!] ERROR : in download mode, -i / --input option must specify a folder')
|
||||
exit(1)
|
||||
extension = extensions_by_mode[mode]
|
||||
dowloadPEFileFromMS(mode, extension, knownImageVersions[mode], args.input)
|
||||
|
||||
# Extract the offsets from the specified file or the folders containing image files.
|
||||
extractOffsets(args.input, args.output, mode)
|
||||
Reference in New Issue
Block a user