mirror of
https://github.com/marcredhat/SIEM-toolkit-patched
synced 2026-06-08 12:33:51 +00:00
4df8e844e5
End-to-end workflow that turns SigmaHQ rules into SDL Scheduled custom-detection rules: 1. SIEM-toolkit provides the coverage map to find what's thin -- MITRE ATT&CK heatmap across all detection library rules, rule firing status (active vs never-fired). 2. Pick Sigma rules (https://github.com/SigmaHQ/sigma) that target those tactics. 3. Convert the Sigma rules to PowerQuery with pysigma-backend-sentinelone-pq. 4. Smoke-test against your tenant's /api/powerQuery, deploy via /web/api/v2.1/cloud-detection/rules as Scheduled PQ rules in Draft. 5. Re-running on a different tenant is just re-pointing the credentials -- the converted .pq bodies travel as-is. Files: README_sigma_pipeline.md full workflow doc recommend_sigma_imports.py coverage-map reader -> rule shortlist probe_wel_schema.py WEL parser field discovery convert_test_deploy_sigma.py pick + convert + 3 variants + deploy fixup_rules_6_7.py OriginalFileName pre-processor run_sigma_on_tenant.py redeploy already-converted bodies verify_rule_exists_via_put.py PUT-existence test (RBAC workaround) verify_deployed_sigma_rules.py RBAC visibility diagnostic tenant_config.example.json credentials template (gitignored real one) Each converted rule emits three PowerQuery variants: <stem>.pq faithful (S1 DV schema) <stem>.relaxed.pq drops endpoint.os + event.type clauses <stem>.wel.pq rewritten onto microsoft_windows_eventlog-latest All scripts read credentials from tenant_config.json (or the SIEM_TOOLKIT_CONFIG env var), discover the target site_id at runtime, and persist deployed rule IDs to deployed_rule_ids.json so the verify scripts work without hardcoded IDs.
325 lines
14 KiB
Python
325 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
recommend_sigma_imports.py
|
|
|
|
Reads the local Threat Coverage state from the SIEM-toolkit-patched backend
|
|
(http://localhost:8001) and recommends concrete Sigma rules from
|
|
https://github.com/sigmahq/sigma to import.
|
|
|
|
Strategy
|
|
--------
|
|
Sigma rules only add value when:
|
|
1. The targeted log source is ACTIVELY ingested by your tenant.
|
|
2. The MITRE technique is currently weak (low rule_count) or missing.
|
|
|
|
The script therefore:
|
|
- Lists every active source the backend has detected (with event counts).
|
|
- Lists every covered MITRE technique and per-tactic rule counts.
|
|
- Maps each active source -> the Sigma folder(s) under sigmahq/sigma that
|
|
target that telemetry.
|
|
- Queries the Sigma repo's directory listing on GitHub to confirm the
|
|
folders exist and to count available rules.
|
|
- Prints a prioritised import list, plus the exact `git sparse-checkout`
|
|
commands you can copy/paste.
|
|
|
|
Usage
|
|
-----
|
|
python3 recommend_sigma_imports.py
|
|
python3 recommend_sigma_imports.py --backend http://localhost:8001
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
GITHUB_API = "https://api.github.com/repos/SigmaHQ/sigma/contents"
|
|
SIGMA_REPO = "https://github.com/SigmaHQ/sigma"
|
|
|
|
# Each active SDL source -> ordered list of (sigma_folder, why_this_folder).
|
|
# The folder path is RELATIVE to the sigmahq/sigma repo root.
|
|
SOURCE_TO_SIGMA: dict[str, list[tuple[str, str]]] = {
|
|
"Windows Event Logs": [
|
|
("rules/windows/builtin/security",
|
|
"Direct match: rules keyed on EventID against Security channel."),
|
|
("rules/windows/builtin/system",
|
|
"System channel: service install, driver load, time tampering."),
|
|
("rules/windows/builtin/application",
|
|
"Application channel: MSI installs, app crashes used as TTPs."),
|
|
("rules/windows/process_creation",
|
|
"Process creation (EID 4688 / Sysmon 1). Highest-value Windows folder."),
|
|
("rules/windows/powershell",
|
|
"PowerShell Operational/Script-block (EID 4103/4104)."),
|
|
("rules/windows/registry",
|
|
"Sysmon registry events for persistence and config tampering."),
|
|
("rules/windows/network_connection",
|
|
"Sysmon 3 / 5156 outbound connections from suspicious processes."),
|
|
("rules/windows/file",
|
|
"Sysmon 11/15 file create + raw-access read (LSASS dump)."),
|
|
("rules-emerging-threats/2024/Exploits",
|
|
"Recent CVE detections, many Windows-targeted."),
|
|
],
|
|
"Azure Platform": [
|
|
("rules/cloud/azure/activity_logs",
|
|
"Azure Activity Log -- subscription/resource manager events."),
|
|
("rules/cloud/azure/microsoft365",
|
|
"M365 Unified Audit Log."),
|
|
("rules/cloud/azure/signinlogs",
|
|
"Azure AD / Entra ID sign-in logs."),
|
|
("rules/cloud/azure/auditlogs",
|
|
"Entra ID directory audit (role assignments, app consent)."),
|
|
],
|
|
"Identity": [
|
|
("rules/cloud/azure/signinlogs",
|
|
"Same Entra ID sign-in folder -- maps Identity source."),
|
|
("rules/cloud/azure/auditlogs",
|
|
"Entra ID directory audit."),
|
|
("rules/category/authentication",
|
|
"Cross-vendor authentication category."),
|
|
],
|
|
"Mimecast": [
|
|
("rules/category/proxy",
|
|
"Sigma generic proxy category covers email-gateway URL events."),
|
|
("rules-emerging-threats/2024/Malware",
|
|
"Recent phishing / malware lure detections."),
|
|
],
|
|
"Stormshield": [
|
|
("rules/network/firewall",
|
|
"Vendor-neutral firewall log rules -- works on Stormshield once "
|
|
"field-mapped via your existing stormshield parser."),
|
|
("rules/network/cisco",
|
|
"Borrow Cisco ASA rules as templates -- many TTPs translate 1:1."),
|
|
],
|
|
"Prompt Security": [
|
|
# No first-party Sigma coverage yet; recommend hunting category.
|
|
("rules-threat-hunting/application",
|
|
"Generic application hunting rules -- closest fit for LLM prompt-"
|
|
"abuse signals until a vendor-specific Sigma category lands."),
|
|
],
|
|
}
|
|
|
|
# Tactics where rule_count is small enough to be a clear gap. Tuned to the
|
|
# Mitre coverage observed on this tenant (Reconnaissance=11, Lateral=83,
|
|
# Collection=77, Exfiltration=91, Discovery=86).
|
|
GAP_TACTICS = {"Reconnaissance", "Lateral Movement", "Collection",
|
|
"Exfiltration", "Discovery"}
|
|
|
|
|
|
def http_json(url: str, timeout: int = 30) -> Any:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"})
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def github_dir_count(path: str) -> tuple[int, str]:
|
|
"""Return (rule_count, http_status) for a sigma repo subdir."""
|
|
url = f"{GITHUB_API}/{path}"
|
|
try:
|
|
data = http_json(url)
|
|
if isinstance(data, list):
|
|
yml = sum(1 for e in data if isinstance(e, dict)
|
|
and e.get("name", "").endswith((".yml", ".yaml")))
|
|
sub = sum(1 for e in data if isinstance(e, dict)
|
|
and e.get("type") == "dir")
|
|
return yml + sub * 0, "OK" # files at top level only here
|
|
return 0, "no-list"
|
|
except urllib.error.HTTPError as e:
|
|
return 0, f"HTTP {e.code}"
|
|
except Exception as e:
|
|
return 0, f"err {type(e).__name__}"
|
|
|
|
|
|
def github_recursive_count(path: str) -> int:
|
|
"""Walk the tree under `path` and count *.yml files (1 level deep is
|
|
enough for Sigma's flat-folder convention; we descend 2 to be safe)."""
|
|
total = 0
|
|
try:
|
|
listing = http_json(f"{GITHUB_API}/{path}")
|
|
if not isinstance(listing, list):
|
|
return 0
|
|
for e in listing:
|
|
if not isinstance(e, dict):
|
|
continue
|
|
if e.get("type") == "file" and e["name"].endswith((".yml", ".yaml")):
|
|
total += 1
|
|
elif e.get("type") == "dir":
|
|
sub = http_json(f"{GITHUB_API}/{path}/{e['name']}")
|
|
if isinstance(sub, list):
|
|
total += sum(1 for s in sub if isinstance(s, dict)
|
|
and s.get("type") == "file"
|
|
and s["name"].endswith((".yml", ".yaml")))
|
|
except Exception:
|
|
return total
|
|
return total
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--backend", default="http://localhost:8001",
|
|
help="SIEM-toolkit-patched backend URL")
|
|
ap.add_argument("--no-github", action="store_true",
|
|
help="Skip GitHub API calls (offline / rate-limited).")
|
|
args = ap.parse_args()
|
|
|
|
print(f"\n{'='*78}\n SIGMA IMPORT RECOMMENDATIONS\n{'='*78}")
|
|
print(f" Backend : {args.backend}")
|
|
print(f" Sigma repo : {SIGMA_REPO}")
|
|
print(f" GitHub lookups : {'disabled' if args.no_github else 'enabled'}")
|
|
|
|
# 1) Coverage health
|
|
try:
|
|
health = http_json(f"{args.backend}/api/coverage/health")
|
|
except Exception as e:
|
|
print(f"\n[FATAL] cannot reach backend: {e}")
|
|
return 1
|
|
|
|
print(f"\n--- Current coverage health ---")
|
|
print(f" health_score : {health['health_score']}")
|
|
print(f" parser_pct : {health['parser_pct']}")
|
|
print(f" mitre_pct : {health['mitre_pct']}")
|
|
print(f" firing_pct : {health['firing_pct']} "
|
|
f"(only {health['rules_fired']} of {health['rules_loaded']} "
|
|
f"have fired -- importing rules without verifying they fire is "
|
|
f"the #1 source of dashboard noise)")
|
|
print(f" active_sources : {health['active_sources']}")
|
|
print(f" tactics_covered : {health['tactics_covered']}/15")
|
|
print(f" techniques cov. : {health['techniques_covered']}")
|
|
|
|
# 2) Active sources
|
|
cov_map = http_json(f"{args.backend}/api/coverage/map")
|
|
print(f"\n--- Active log sources (ordered by event volume) ---")
|
|
print(f" {'source':<24}{'events':>10} {'parser':<32} rule_count")
|
|
sources = sorted(cov_map["sources"], key=lambda s: -s["event_count"])
|
|
for s in sources:
|
|
print(f" {s['source_name']:<24}{s['event_count']:>10} "
|
|
f"{(s.get('parser') or '-'):<32}{s.get('rule_count', '-')}")
|
|
|
|
# 3) MITRE tactic gaps
|
|
mitre = http_json(f"{args.backend}/api/coverage/mitre")
|
|
print(f"\n--- MITRE tactic depth (rules / techniques per tactic) ---")
|
|
print(f" {'tactic':<26}{'rules':>8}{'techs':>8} gap?")
|
|
for t in mitre["tactics"]:
|
|
gap = " <-- THIN" if t["tactic"] in GAP_TACTICS else ""
|
|
print(f" {t['tactic']:<26}{t['rule_count']:>8}"
|
|
f"{t['technique_count']:>8}{gap}")
|
|
|
|
# 4) Recommended Sigma folders, prioritised by active-source volume
|
|
print(f"\n{'='*78}\n RECOMMENDED SIGMA FOLDERS TO IMPORT\n{'='*78}")
|
|
print(" Priority order = which active source has the most events.\n"
|
|
" Only folders for sources that are ACTIVELY producing telemetry\n"
|
|
" appear below -- rules for sources you don't ingest add zero\n"
|
|
" detection value and pollute the rule library.\n")
|
|
|
|
seen = set()
|
|
sparse_paths: list[str] = []
|
|
for s in sources:
|
|
name = s["source_name"]
|
|
evt = s["event_count"]
|
|
folders = SOURCE_TO_SIGMA.get(name, [])
|
|
if not folders:
|
|
print(f"--- {name} ({evt:,} events) -- no Sigma mapping curated")
|
|
continue
|
|
print(f"\n--- {name} ({evt:,} events) ---")
|
|
for folder, why in folders:
|
|
if folder in seen:
|
|
continue
|
|
seen.add(folder)
|
|
sparse_paths.append(folder)
|
|
count_str = ""
|
|
if not args.no_github:
|
|
n = github_recursive_count(folder)
|
|
count_str = f" [~{n} rules]"
|
|
print(f" * {folder}{count_str}")
|
|
print(f" {why}")
|
|
|
|
# 5) Concrete import commands
|
|
print(f"\n{'='*78}\n COPY/PASTE: import these folders only\n{'='*78}\n")
|
|
print(" # 1. clone Sigma with sparse-checkout (no full 5GB history)")
|
|
print(" git clone --filter=blob:none --no-checkout "
|
|
f"{SIGMA_REPO}.git /tmp/sigma")
|
|
print(" cd /tmp/sigma")
|
|
print(" git sparse-checkout init --cone")
|
|
print(" git sparse-checkout set \\")
|
|
for p in sparse_paths:
|
|
print(f" {p} \\")
|
|
print(" # end of folder list")
|
|
print(" git checkout main")
|
|
print()
|
|
print(" # 2. push each .yml file into SIEM-toolkit-patched via the")
|
|
print(" # backend's /api/coverage/upload-sigma endpoint (one POST")
|
|
print(" # per file, multipart/form-data):")
|
|
print(f"""
|
|
find . -path './rules*' -name '*.yml' | while read f ; do
|
|
curl -sS -F "file=@$f" {args.backend}/api/coverage/upload-sigma \\
|
|
-w "%{{http_code}} $f\\n" -o /dev/null
|
|
done
|
|
""")
|
|
|
|
# 6) High-value individual rules (curated -- always worth importing)
|
|
print(f"{'='*78}\n HIGH-PRIORITY INDIVIDUAL RULES (curated)\n{'='*78}")
|
|
must_have = [
|
|
# Lateral Movement -- weak tactic (83 rules)
|
|
("rules/windows/builtin/security/win_security_admin_rdp_login.yml",
|
|
"Lateral Movement", "T1021.001 RDP"),
|
|
("rules/windows/builtin/security/"
|
|
"win_security_susp_smb_share_object_access_lateral_movement.yml",
|
|
"Lateral Movement", "T1021.002 SMB"),
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_winrm_lateral_movement.yml",
|
|
"Lateral Movement", "T1021.006 WinRM"),
|
|
# Collection -- weak tactic (77 rules)
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_screenshot.yml",
|
|
"Collection", "T1113 Screen Capture"),
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_powershell_clipboard.yml",
|
|
"Collection", "T1115 Clipboard Data"),
|
|
# Exfiltration -- weak tactic (91 rules)
|
|
("rules/windows/network_connection/"
|
|
"net_connection_win_rclone.yml",
|
|
"Exfiltration", "T1567.002 Exfil to Cloud Storage"),
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_rar_compress_data.yml",
|
|
"Exfiltration", "T1560.001 Archive via Utility"),
|
|
# Reconnaissance -- THINNEST tactic (11 rules)
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_netsh_dump_config.yml",
|
|
"Reconnaissance", "T1016 System Network Config Discovery"),
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_adsisearcher.yml",
|
|
"Reconnaissance", "T1087.002 Domain Account Discovery"),
|
|
# Discovery
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_bloodhound_sharphound.yml",
|
|
"Discovery", "T1087/T1482 BloodHound/SharpHound"),
|
|
# Credential Access (already 217 rules but always topical)
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_mimikatz_command_line.yml",
|
|
"Credential Access", "T1003.001 LSASS Memory"),
|
|
("rules/windows/process_creation/"
|
|
"proc_creation_win_susp_lsass_dump.yml",
|
|
"Credential Access", "T1003.001 LSASS Memory"),
|
|
# Azure -- broad coverage gap
|
|
("rules/cloud/azure/signinlogs/"
|
|
"azure_aad_sign_ins_from_noninteractive_devices.yml",
|
|
"Initial Access", "T1078.004 Cloud Account abuse"),
|
|
("rules/cloud/azure/auditlogs/"
|
|
"azure_aad_role_assigned.yml",
|
|
"Privilege Escalation", "T1098 Account Manipulation"),
|
|
]
|
|
print(f" {'tactic':<22}{'technique':<35}rule")
|
|
for path, tactic, tech in must_have:
|
|
print(f" {tactic:<22}{tech:<35}{path}")
|
|
|
|
print(f"\n These 14 rules close the thinnest gaps surfaced by the")
|
|
print(f" Threat Coverage map above. Import them FIRST, then iterate")
|
|
print(f" through the bulk folders.\n")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|