Files
marc 4df8e844e5 Sigma -> SentinelOne PowerQuery pipeline
End-to-end workflow that turns SigmaHQ rules into SDL Scheduled
custom-detection rules:

1. SIEM-toolkit provides the coverage map to find what's thin --
   MITRE ATT&CK heatmap across all detection library rules, rule
   firing status (active vs never-fired).
2. Pick Sigma rules (https://github.com/SigmaHQ/sigma) that target
   those tactics.
3. Convert the Sigma rules to PowerQuery with
   pysigma-backend-sentinelone-pq.
4. Smoke-test against your tenant's /api/powerQuery, deploy via
   /web/api/v2.1/cloud-detection/rules as Scheduled PQ rules in Draft.
5. Re-running on a different tenant is just re-pointing the
   credentials -- the converted .pq bodies travel as-is.

Files:
  README_sigma_pipeline.md       full workflow doc
  recommend_sigma_imports.py     coverage-map reader -> rule shortlist
  probe_wel_schema.py            WEL parser field discovery
  convert_test_deploy_sigma.py   pick + convert + 3 variants + deploy
  fixup_rules_6_7.py             OriginalFileName pre-processor
  run_sigma_on_tenant.py         redeploy already-converted bodies
  verify_rule_exists_via_put.py  PUT-existence test (RBAC workaround)
  verify_deployed_sigma_rules.py RBAC visibility diagnostic
  tenant_config.example.json     credentials template (gitignored real one)

Each converted rule emits three PowerQuery variants:
  <stem>.pq          faithful (S1 DV schema)
  <stem>.relaxed.pq  drops endpoint.os + event.type clauses
  <stem>.wel.pq      rewritten onto microsoft_windows_eventlog-latest

All scripts read credentials from tenant_config.json (or the
SIEM_TOOLKIT_CONFIG env var), discover the target site_id at runtime,
and persist deployed rule IDs to deployed_rule_ids.json so the verify
scripts work without hardcoded IDs.
2026-05-28 12:29:37 +02:00

325 lines
14 KiB
Python

#!/usr/bin/env python3
"""
recommend_sigma_imports.py
Reads the local Threat Coverage state from the SIEM-toolkit-patched backend
(http://localhost:8001) and recommends concrete Sigma rules from
https://github.com/sigmahq/sigma to import.
Strategy
--------
Sigma rules only add value when:
1. The targeted log source is ACTIVELY ingested by your tenant.
2. The MITRE technique is currently weak (low rule_count) or missing.
The script therefore:
- Lists every active source the backend has detected (with event counts).
- Lists every covered MITRE technique and per-tactic rule counts.
- Maps each active source -> the Sigma folder(s) under sigmahq/sigma that
target that telemetry.
- Queries the Sigma repo's directory listing on GitHub to confirm the
folders exist and to count available rules.
- Prints a prioritised import list, plus the exact `git sparse-checkout`
commands you can copy/paste.
Usage
-----
python3 recommend_sigma_imports.py
python3 recommend_sigma_imports.py --backend http://localhost:8001
"""
from __future__ import annotations
import argparse
import json
import sys
import urllib.request
from typing import Any
GITHUB_API = "https://api.github.com/repos/SigmaHQ/sigma/contents"
SIGMA_REPO = "https://github.com/SigmaHQ/sigma"
# Each active SDL source -> ordered list of (sigma_folder, why_this_folder).
# The folder path is RELATIVE to the sigmahq/sigma repo root.
SOURCE_TO_SIGMA: dict[str, list[tuple[str, str]]] = {
"Windows Event Logs": [
("rules/windows/builtin/security",
"Direct match: rules keyed on EventID against Security channel."),
("rules/windows/builtin/system",
"System channel: service install, driver load, time tampering."),
("rules/windows/builtin/application",
"Application channel: MSI installs, app crashes used as TTPs."),
("rules/windows/process_creation",
"Process creation (EID 4688 / Sysmon 1). Highest-value Windows folder."),
("rules/windows/powershell",
"PowerShell Operational/Script-block (EID 4103/4104)."),
("rules/windows/registry",
"Sysmon registry events for persistence and config tampering."),
("rules/windows/network_connection",
"Sysmon 3 / 5156 outbound connections from suspicious processes."),
("rules/windows/file",
"Sysmon 11/15 file create + raw-access read (LSASS dump)."),
("rules-emerging-threats/2024/Exploits",
"Recent CVE detections, many Windows-targeted."),
],
"Azure Platform": [
("rules/cloud/azure/activity_logs",
"Azure Activity Log -- subscription/resource manager events."),
("rules/cloud/azure/microsoft365",
"M365 Unified Audit Log."),
("rules/cloud/azure/signinlogs",
"Azure AD / Entra ID sign-in logs."),
("rules/cloud/azure/auditlogs",
"Entra ID directory audit (role assignments, app consent)."),
],
"Identity": [
("rules/cloud/azure/signinlogs",
"Same Entra ID sign-in folder -- maps Identity source."),
("rules/cloud/azure/auditlogs",
"Entra ID directory audit."),
("rules/category/authentication",
"Cross-vendor authentication category."),
],
"Mimecast": [
("rules/category/proxy",
"Sigma generic proxy category covers email-gateway URL events."),
("rules-emerging-threats/2024/Malware",
"Recent phishing / malware lure detections."),
],
"Stormshield": [
("rules/network/firewall",
"Vendor-neutral firewall log rules -- works on Stormshield once "
"field-mapped via your existing stormshield parser."),
("rules/network/cisco",
"Borrow Cisco ASA rules as templates -- many TTPs translate 1:1."),
],
"Prompt Security": [
# No first-party Sigma coverage yet; recommend hunting category.
("rules-threat-hunting/application",
"Generic application hunting rules -- closest fit for LLM prompt-"
"abuse signals until a vendor-specific Sigma category lands."),
],
}
# Tactics where rule_count is small enough to be a clear gap. Tuned to the
# Mitre coverage observed on this tenant (Reconnaissance=11, Lateral=83,
# Collection=77, Exfiltration=91, Discovery=86).
GAP_TACTICS = {"Reconnaissance", "Lateral Movement", "Collection",
"Exfiltration", "Discovery"}
def http_json(url: str, timeout: int = 30) -> Any:
req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"})
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read())
def github_dir_count(path: str) -> tuple[int, str]:
"""Return (rule_count, http_status) for a sigma repo subdir."""
url = f"{GITHUB_API}/{path}"
try:
data = http_json(url)
if isinstance(data, list):
yml = sum(1 for e in data if isinstance(e, dict)
and e.get("name", "").endswith((".yml", ".yaml")))
sub = sum(1 for e in data if isinstance(e, dict)
and e.get("type") == "dir")
return yml + sub * 0, "OK" # files at top level only here
return 0, "no-list"
except urllib.error.HTTPError as e:
return 0, f"HTTP {e.code}"
except Exception as e:
return 0, f"err {type(e).__name__}"
def github_recursive_count(path: str) -> int:
"""Walk the tree under `path` and count *.yml files (1 level deep is
enough for Sigma's flat-folder convention; we descend 2 to be safe)."""
total = 0
try:
listing = http_json(f"{GITHUB_API}/{path}")
if not isinstance(listing, list):
return 0
for e in listing:
if not isinstance(e, dict):
continue
if e.get("type") == "file" and e["name"].endswith((".yml", ".yaml")):
total += 1
elif e.get("type") == "dir":
sub = http_json(f"{GITHUB_API}/{path}/{e['name']}")
if isinstance(sub, list):
total += sum(1 for s in sub if isinstance(s, dict)
and s.get("type") == "file"
and s["name"].endswith((".yml", ".yaml")))
except Exception:
return total
return total
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--backend", default="http://localhost:8001",
help="SIEM-toolkit-patched backend URL")
ap.add_argument("--no-github", action="store_true",
help="Skip GitHub API calls (offline / rate-limited).")
args = ap.parse_args()
print(f"\n{'='*78}\n SIGMA IMPORT RECOMMENDATIONS\n{'='*78}")
print(f" Backend : {args.backend}")
print(f" Sigma repo : {SIGMA_REPO}")
print(f" GitHub lookups : {'disabled' if args.no_github else 'enabled'}")
# 1) Coverage health
try:
health = http_json(f"{args.backend}/api/coverage/health")
except Exception as e:
print(f"\n[FATAL] cannot reach backend: {e}")
return 1
print(f"\n--- Current coverage health ---")
print(f" health_score : {health['health_score']}")
print(f" parser_pct : {health['parser_pct']}")
print(f" mitre_pct : {health['mitre_pct']}")
print(f" firing_pct : {health['firing_pct']} "
f"(only {health['rules_fired']} of {health['rules_loaded']} "
f"have fired -- importing rules without verifying they fire is "
f"the #1 source of dashboard noise)")
print(f" active_sources : {health['active_sources']}")
print(f" tactics_covered : {health['tactics_covered']}/15")
print(f" techniques cov. : {health['techniques_covered']}")
# 2) Active sources
cov_map = http_json(f"{args.backend}/api/coverage/map")
print(f"\n--- Active log sources (ordered by event volume) ---")
print(f" {'source':<24}{'events':>10} {'parser':<32} rule_count")
sources = sorted(cov_map["sources"], key=lambda s: -s["event_count"])
for s in sources:
print(f" {s['source_name']:<24}{s['event_count']:>10} "
f"{(s.get('parser') or '-'):<32}{s.get('rule_count', '-')}")
# 3) MITRE tactic gaps
mitre = http_json(f"{args.backend}/api/coverage/mitre")
print(f"\n--- MITRE tactic depth (rules / techniques per tactic) ---")
print(f" {'tactic':<26}{'rules':>8}{'techs':>8} gap?")
for t in mitre["tactics"]:
gap = " <-- THIN" if t["tactic"] in GAP_TACTICS else ""
print(f" {t['tactic']:<26}{t['rule_count']:>8}"
f"{t['technique_count']:>8}{gap}")
# 4) Recommended Sigma folders, prioritised by active-source volume
print(f"\n{'='*78}\n RECOMMENDED SIGMA FOLDERS TO IMPORT\n{'='*78}")
print(" Priority order = which active source has the most events.\n"
" Only folders for sources that are ACTIVELY producing telemetry\n"
" appear below -- rules for sources you don't ingest add zero\n"
" detection value and pollute the rule library.\n")
seen = set()
sparse_paths: list[str] = []
for s in sources:
name = s["source_name"]
evt = s["event_count"]
folders = SOURCE_TO_SIGMA.get(name, [])
if not folders:
print(f"--- {name} ({evt:,} events) -- no Sigma mapping curated")
continue
print(f"\n--- {name} ({evt:,} events) ---")
for folder, why in folders:
if folder in seen:
continue
seen.add(folder)
sparse_paths.append(folder)
count_str = ""
if not args.no_github:
n = github_recursive_count(folder)
count_str = f" [~{n} rules]"
print(f" * {folder}{count_str}")
print(f" {why}")
# 5) Concrete import commands
print(f"\n{'='*78}\n COPY/PASTE: import these folders only\n{'='*78}\n")
print(" # 1. clone Sigma with sparse-checkout (no full 5GB history)")
print(" git clone --filter=blob:none --no-checkout "
f"{SIGMA_REPO}.git /tmp/sigma")
print(" cd /tmp/sigma")
print(" git sparse-checkout init --cone")
print(" git sparse-checkout set \\")
for p in sparse_paths:
print(f" {p} \\")
print(" # end of folder list")
print(" git checkout main")
print()
print(" # 2. push each .yml file into SIEM-toolkit-patched via the")
print(" # backend's /api/coverage/upload-sigma endpoint (one POST")
print(" # per file, multipart/form-data):")
print(f"""
find . -path './rules*' -name '*.yml' | while read f ; do
curl -sS -F "file=@$f" {args.backend}/api/coverage/upload-sigma \\
-w "%{{http_code}} $f\\n" -o /dev/null
done
""")
# 6) High-value individual rules (curated -- always worth importing)
print(f"{'='*78}\n HIGH-PRIORITY INDIVIDUAL RULES (curated)\n{'='*78}")
must_have = [
# Lateral Movement -- weak tactic (83 rules)
("rules/windows/builtin/security/win_security_admin_rdp_login.yml",
"Lateral Movement", "T1021.001 RDP"),
("rules/windows/builtin/security/"
"win_security_susp_smb_share_object_access_lateral_movement.yml",
"Lateral Movement", "T1021.002 SMB"),
("rules/windows/process_creation/"
"proc_creation_win_winrm_lateral_movement.yml",
"Lateral Movement", "T1021.006 WinRM"),
# Collection -- weak tactic (77 rules)
("rules/windows/process_creation/"
"proc_creation_win_susp_screenshot.yml",
"Collection", "T1113 Screen Capture"),
("rules/windows/process_creation/"
"proc_creation_win_powershell_clipboard.yml",
"Collection", "T1115 Clipboard Data"),
# Exfiltration -- weak tactic (91 rules)
("rules/windows/network_connection/"
"net_connection_win_rclone.yml",
"Exfiltration", "T1567.002 Exfil to Cloud Storage"),
("rules/windows/process_creation/"
"proc_creation_win_rar_compress_data.yml",
"Exfiltration", "T1560.001 Archive via Utility"),
# Reconnaissance -- THINNEST tactic (11 rules)
("rules/windows/process_creation/"
"proc_creation_win_susp_netsh_dump_config.yml",
"Reconnaissance", "T1016 System Network Config Discovery"),
("rules/windows/process_creation/"
"proc_creation_win_susp_adsisearcher.yml",
"Reconnaissance", "T1087.002 Domain Account Discovery"),
# Discovery
("rules/windows/process_creation/"
"proc_creation_win_susp_bloodhound_sharphound.yml",
"Discovery", "T1087/T1482 BloodHound/SharpHound"),
# Credential Access (already 217 rules but always topical)
("rules/windows/process_creation/"
"proc_creation_win_susp_mimikatz_command_line.yml",
"Credential Access", "T1003.001 LSASS Memory"),
("rules/windows/process_creation/"
"proc_creation_win_susp_lsass_dump.yml",
"Credential Access", "T1003.001 LSASS Memory"),
# Azure -- broad coverage gap
("rules/cloud/azure/signinlogs/"
"azure_aad_sign_ins_from_noninteractive_devices.yml",
"Initial Access", "T1078.004 Cloud Account abuse"),
("rules/cloud/azure/auditlogs/"
"azure_aad_role_assigned.yml",
"Privilege Escalation", "T1098 Account Manipulation"),
]
print(f" {'tactic':<22}{'technique':<35}rule")
for path, tactic, tech in must_have:
print(f" {tactic:<22}{tech:<35}{path}")
print(f"\n These 14 rules close the thinnest gaps surfaced by the")
print(f" Threat Coverage map above. Import them FIRST, then iterate")
print(f" through the bulk folders.\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())