Files
marc 4df8e844e5 Sigma -> SentinelOne PowerQuery pipeline
End-to-end workflow that turns SigmaHQ rules into SDL Scheduled
custom-detection rules:

1. SIEM-toolkit provides the coverage map to find what's thin --
   MITRE ATT&CK heatmap across all detection library rules, rule
   firing status (active vs never-fired).
2. Pick Sigma rules (https://github.com/SigmaHQ/sigma) that target
   those tactics.
3. Convert the Sigma rules to PowerQuery with
   pysigma-backend-sentinelone-pq.
4. Smoke-test against your tenant's /api/powerQuery, deploy via
   /web/api/v2.1/cloud-detection/rules as Scheduled PQ rules in Draft.
5. Re-running on a different tenant is just re-pointing the
   credentials -- the converted .pq bodies travel as-is.

Files:
  README_sigma_pipeline.md       full workflow doc
  recommend_sigma_imports.py     coverage-map reader -> rule shortlist
  probe_wel_schema.py            WEL parser field discovery
  convert_test_deploy_sigma.py   pick + convert + 3 variants + deploy
  fixup_rules_6_7.py             OriginalFileName pre-processor
  run_sigma_on_tenant.py         redeploy already-converted bodies
  verify_rule_exists_via_put.py  PUT-existence test (RBAC workaround)
  verify_deployed_sigma_rules.py RBAC visibility diagnostic
  tenant_config.example.json     credentials template (gitignored real one)

Each converted rule emits three PowerQuery variants:
  <stem>.pq          faithful (S1 DV schema)
  <stem>.relaxed.pq  drops endpoint.os + event.type clauses
  <stem>.wel.pq      rewritten onto microsoft_windows_eventlog-latest

All scripts read credentials from tenant_config.json (or the
SIEM_TOOLKIT_CONFIG env var), discover the target site_id at runtime,
and persist deployed rule IDs to deployed_rule_ids.json so the verify
scripts work without hardcoded IDs.
2026-05-28 12:29:37 +02:00

296 lines
12 KiB
Python

#!/usr/bin/env python3
"""
run_sigma_on_tenant.py
Re-runs the same 10 Sigma->PowerQuery rules against ANY tenant by
re-pointing the credentials. The 10 converted .pq bodies in
SIGMA_OUT_DIR (default /tmp/sigma_converted_v4) are tenant-agnostic --
they only depend on the SDL DV schema, not on the specific tenant URL.
Pipeline:
Step 0 -- discover sites via /sites?limit=10 (token introspection)
Step 1 -- probe tenant telemetry: last 24 h volume on the EDR/DV
fields the converted rules query
(event.type, endpoint.os, tgt.process.cmdline, ...)
Step 2 -- smoke-test each of the 10 faithful .pq bodies against the
tenant's /api/powerQuery
Step 3 -- deploy each as an SDL Scheduled rule via the Mgmt API
POST /web/api/v2.1/cloud-detection/rules
Step 4 -- verify the deployed rules via PUT-existence test
Reads tenant credentials from tenant_config.json next to this script.
Override with the SIEM_TOOLKIT_CONFIG env var. Override the artefact
location with SIGMA_OUT_DIR. Override the target site with SITE_ID.
"""
from __future__ import annotations
import json
import os
import pathlib
import time
import urllib.error
import urllib.request
from typing import Any
HERE = pathlib.Path(__file__).resolve().parent
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
ART = pathlib.Path(os.environ.get(
"SIGMA_OUT_DIR", "/tmp/sigma_converted_v4"))
SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/")
SDL_KEY = CFG["SDL_LOG_READ_KEY"]
S1_CONS = CFG["S1_CONSOLE_URL"].rstrip("/")
S1_TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".")
RULES: list[tuple[str, str, str]] = [
("Lateral Movement", "T1021.006 WinRM (evil-winrm)",
"proc_creation_win_hktl_evil_winrm.pq"),
("Collection", "T1113 Screen Capture (Psr.exe)",
"proc_creation_win_psr_capture_screenshots.pq"),
("Collection", "T1115 Clipboard (Get-Clipboard)",
"proc_creation_win_powershell_get_clipboard.pq"),
("Exfiltration", "T1560.001 RAR (.dmp files)",
"proc_creation_win_winrar_exfil_dmp_files.pq"),
("Exfiltration", "T1567.002 rclone",
"proc_creation_win_pua_rclone_execution.pq"),
("Reconnaissance", "T1016 netsh portproxy",
"proc_creation_win_netsh_port_forwarding.pq"),
("Discovery", "T1087/T1033 whoami /priv",
"proc_creation_win_whoami_priv_discovery.pq"),
("Discovery", "T1087/T1482 SharpHound",
"proc_creation_win_hktl_bloodhound_sharphound.pq"),
("Credential Access", "T1003.001 Mimikatz cmd-line",
"proc_creation_win_hktl_mimikatz_command_line.pq"),
("Credential Access", "T1003.001 ProcDump LSASS",
"proc_creation_win_sysinternals_procdump_lsass.pq"),
]
# ----------------------------------------------------- helpers --------------
def pq(query: str, hours: int = 24) -> tuple[int, str, int]:
end = int(time.time() * 1000); start = end - hours * 3600 * 1000
body = {"token": SDL_KEY, "query": query,
"startTime": str(start), "endTime": str(end)}
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps(body).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as r:
d = json.loads(r.read())
return 200, "ok", len(d.get("values") or [])
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:250], 0
def pq_count(query: str) -> int:
wrapped = f"{query} | group n=count() | limit 1"
code, _, rows = pq(wrapped)
if code != 200 or rows == 0:
return 0
end = int(time.time() * 1000); start = end - 24 * 3600 * 1000
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps({"token": SDL_KEY, "query": wrapped,
"startTime": str(start),
"endTime": str(end)}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
d = json.loads(urllib.request.urlopen(req, timeout=60).read())
v = (d.get("values") or [[None]])[0]
return int(v[0]) if v and v[0] is not None else 0
except Exception:
return 0
def mgmt_get(path: str) -> tuple[int, dict]:
req = urllib.request.Request(f"{S1_CONS}{path}")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, json.loads(r.read())
except urllib.error.HTTPError as e:
try:
return e.code, json.loads(e.read())
except Exception:
return e.code, {"_body": "(non-json)"}
def deploy_rule(site_id: str, name: str, desc: str,
body: str) -> tuple[int, str]:
payload = {
"data": {"name": name, "description": desc, "severity": "Medium",
"expirationMode": "Permanent", "queryType": "scheduled",
"queryLang": "2.0", "status": "Draft",
"treatAsThreat": "UNDEFINED", "networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [site_id]}}
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules",
data=json.dumps(payload).encode(), method="POST")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
d = json.loads(r.read())
return 200, str((d.get("data") or {}).get("id") or "?")
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:300]
def put_rule(site_id: str, rule_id: str, name: str,
body: str) -> tuple[int, str]:
payload = {
"data": {"name": name, "description": f"verify-by-PUT for {name}",
"severity": "Medium", "expirationMode": "Permanent",
"queryType": "scheduled", "queryLang": "2.0",
"status": "Draft", "treatAsThreat": "UNDEFINED",
"networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [site_id]}}
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules/{rule_id}",
data=json.dumps(payload).encode(), method="PUT")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, "ok"
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:200]
# ----------------------------------------------------- main -----------------
def main() -> int:
print(f"\n{'='*78}\n Sigma -> PowerQuery -> SDL on US tenant\n"
f"{'='*78}")
print(f" Mgmt API : {S1_CONS}")
print(f" SDL : {SDL_BASE}")
print(f" Artefact : {ART}\n")
# --- 0. discover sites on US tenant ----------------------------------
print("--- Step 0: discover sites + token identity ---------------------")
code, d = mgmt_get("/web/api/v2.1/sites?limit=10")
if code != 200:
print(f" HTTP {code} {str(d)[:300]}")
return 1
sites = (d.get("data") or {}).get("sites") or []
print(f" Sites visible to token: {len(sites)}")
for s in sites[:5]:
print(f" id={s.get('id')} name={s.get('name')} "
f"state={s.get('state')}")
if not sites:
print(" FATAL: no sites visible -- token has no scope here")
return 1
site_id = sites[0]["id"]
print(f" --> deploying into site_id={site_id} "
f"({sites[0].get('name')})\n")
# --- 1. tenant schema probe ------------------------------------------
print("--- Step 1: probe US tenant telemetry (last 24 h) --------------")
probes = {
"event.type='Process Creation'":
"event.type='Process Creation'",
"endpoint.os='windows'":
"endpoint.os='windows'",
"tgt.process.cmdline non-empty":
"tgt.process.cmdline!=''",
"src.process.image.path non-empty":
"src.process.image.path!=''",
}
for label, q in probes.items():
n = pq_count(q)
print(f" {label:<45}{n}")
print()
# --- 2. smoke-test 10 rules ------------------------------------------
print("--- Step 2: smoke-test 10 faithful PQ bodies -------------------")
test_results = []
for i, (tactic, tech, fname) in enumerate(RULES, 1):
pq_path = ART / fname
if not pq_path.exists():
print(f" [{i:>2}] {tactic:<18}{tech:<32} MISSING {fname}")
test_results.append((i, tactic, tech, fname, None, None))
continue
body = pq_path.read_text()
code, msg, rows = pq(body)
print(f" [{i:>2}] {tactic:<18}{tech:<32} HTTP {code} rows={rows}")
if code != 200:
print(f" err: {msg[:160]}")
test_results.append((i, tactic, tech, fname, code, rows))
print()
# --- 3. deploy --------------------------------------------------------
print("--- Step 3: deploy each valid PQ as SDL Scheduled rule ---------")
deployed: list[tuple[int, str, str, str, str]] = [] # i, tactic, tech, fname, id
for (i, tactic, tech, fname, code, rows) in test_results:
if code != 200:
print(f" [{i:>2}] SKIP (smoke-test failed)")
continue
body = (ART / fname).read_text()
name = f"[Sigma->PQ USEA1] {tactic} / {tech} ({pathlib.Path(fname).stem})"[:128]
desc = (f"Auto-converted Sigma rule. "
f"Source: /tmp/sigma_converted_v4/{fname}. "
f"Faithful S1 DV schema.")
dc, dmsg = deploy_rule(site_id, name, desc, body)
verdict = (f"id={dmsg}" if dc == 200 else f"FAIL HTTP {dc} "
f"{dmsg[:160]}")
print(f" [{i:>2}] DEPLOY HTTP {dc} {verdict}")
if dc == 200:
deployed.append((i, tactic, tech, fname, dmsg))
print()
# --- 4. PUT verification ---------------------------------------------
if deployed:
print("--- Step 4: PUT-existence verification --------------------")
exists = 0; gone = 0
for (i, tactic, tech, fname, rid) in deployed:
body = (ART / fname).read_text()
name = f"[Sigma->PQ USEA1 verify] {tactic} / {tech}"[:128]
pc, pmsg = put_rule(site_id, rid, name, body)
verdict = ("EXISTS" if pc in (200, 204)
else "NOT FOUND" if pc == 404
else f"HTTP {pc} {pmsg[:80]}")
print(f" [{i:>2}] id={rid} PUT HTTP {pc} {verdict}")
if pc in (200, 204):
exists += 1
elif pc == 404:
gone += 1
# --- summary ----------------------------------------------------------
print(f"\n{'='*78}\n SUMMARY\n{'='*78}")
valid = sum(1 for (_, _, _, _, c, _) in test_results if c == 200)
print(f" Smoke-test passed : {valid}/10")
print(f" Rules deployed : {len(deployed)}/10")
if deployed:
ids_file = HERE / "deployed_rule_ids.json"
ids_file.write_text(json.dumps(
{"tenant": S1_CONS, "site_id": site_id,
"rules": [{"rule_id": rid, "pq_file": fname,
"tactic": tactic, "tech": tech}
for (_, tactic, tech, fname, rid) in deployed]},
indent=2))
print(f" Deployed IDs : {ids_file}")
print(f" PUT-verified exists : (see Step 4 above)")
print(f"\n Console: {S1_CONS}/#/cloud-detection/rules\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())