From 4df8e844e5f59a40ba7ff6e7b6a697bf56c65687 Mon Sep 17 00:00:00 2001 From: marc Date: Thu, 28 May 2026 12:29:37 +0200 Subject: [PATCH] Sigma -> SentinelOne PowerQuery pipeline End-to-end workflow that turns SigmaHQ rules into SDL Scheduled custom-detection rules: 1. SIEM-toolkit provides the coverage map to find what's thin -- MITRE ATT&CK heatmap across all detection library rules, rule firing status (active vs never-fired). 2. Pick Sigma rules (https://github.com/SigmaHQ/sigma) that target those tactics. 3. Convert the Sigma rules to PowerQuery with pysigma-backend-sentinelone-pq. 4. Smoke-test against your tenant's /api/powerQuery, deploy via /web/api/v2.1/cloud-detection/rules as Scheduled PQ rules in Draft. 5. Re-running on a different tenant is just re-pointing the credentials -- the converted .pq bodies travel as-is. Files: README_sigma_pipeline.md full workflow doc recommend_sigma_imports.py coverage-map reader -> rule shortlist probe_wel_schema.py WEL parser field discovery convert_test_deploy_sigma.py pick + convert + 3 variants + deploy fixup_rules_6_7.py OriginalFileName pre-processor run_sigma_on_tenant.py redeploy already-converted bodies verify_rule_exists_via_put.py PUT-existence test (RBAC workaround) verify_deployed_sigma_rules.py RBAC visibility diagnostic tenant_config.example.json credentials template (gitignored real one) Each converted rule emits three PowerQuery variants: .pq faithful (S1 DV schema) .relaxed.pq drops endpoint.os + event.type clauses .wel.pq rewritten onto microsoft_windows_eventlog-latest All scripts read credentials from tenant_config.json (or the SIEM_TOOLKIT_CONFIG env var), discover the target site_id at runtime, and persist deployed rule IDs to deployed_rule_ids.json so the verify scripts work without hardcoded IDs. --- .gitignore | 6 + README_sigma_pipeline.md | 241 +++++++++++++++++++ convert_test_deploy_sigma.py | 406 +++++++++++++++++++++++++++++++++ fixup_rules_6_7.py | 243 ++++++++++++++++++++ probe_wel_schema.py | 98 ++++++++ recommend_sigma_imports.py | 324 ++++++++++++++++++++++++++ run_sigma_on_tenant.py | 295 ++++++++++++++++++++++++ tenant_config.example.json | 8 + verify_deployed_sigma_rules.py | 137 +++++++++++ verify_rule_exists_via_put.py | 124 ++++++++++ 10 files changed, 1882 insertions(+) create mode 100644 README_sigma_pipeline.md create mode 100644 convert_test_deploy_sigma.py create mode 100644 fixup_rules_6_7.py create mode 100644 probe_wel_schema.py create mode 100644 recommend_sigma_imports.py create mode 100644 run_sigma_on_tenant.py create mode 100644 tenant_config.example.json create mode 100644 verify_deployed_sigma_rules.py create mode 100644 verify_rule_exists_via_put.py diff --git a/.gitignore b/.gitignore index ef704ef..617fc88 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,9 @@ data/ # Parsers ARE committed in this fork (snapshot of the demo tenant). # .env still excluded for safety. tools/stormshield-verify/config.json + +# Sigma->PowerQuery pipeline: real tenant credentials live here. +# Use tenant_config.example.json as the template. +tenant_config.json +deployed_rule_ids.json + diff --git a/README_sigma_pipeline.md b/README_sigma_pipeline.md new file mode 100644 index 0000000..e44af5f --- /dev/null +++ b/README_sigma_pipeline.md @@ -0,0 +1,241 @@ +# Sigma → SentinelOne PowerQuery pipeline + +End-to-end workflow that turns SigmaHQ rules into SentinelOne SDL +Scheduled custom-detection rules, **starting from the coverage gaps the +SIEM-toolkit identifies**. + +## TL;DR + +1. **SIEM-toolkit** provides the coverage map to find what's thin — + MITRE ATT&CK heatmap across all detection library rules, rule firing + status (active vs never-fired). +2. **Pick Sigma rules** ([SigmaHQ/sigma](https://github.com/SigmaHQ/sigma)) + that target those tactics. +3. **Convert** the Sigma rules to PowerQuery with + [`pysigma-backend-sentinelone-pq`](https://pypi.org/project/pysigma-backend-sentinelone-pq/). +4. **Smoke-test** against your tenant's `/api/powerQuery`, **deploy** + via `/web/api/v2.1/cloud-detection/rules` as Scheduled PQ rules in + Draft. +5. **Re-running on a different tenant** is just re-pointing the + credentials — the converted `.pq` bodies travel as-is. + +## Setup (once) + +```bash +# 1. Tooling +python3 -m venv /tmp/sigma_venv +/tmp/sigma_venv/bin/pip install pysigma pysigma-backend-sentinelone-pq +brew install gh && gh auth login # avoids GitHub rate limits + +# 2. Credentials +cp tenant_config.example.json tenant_config.json +$EDITOR tenant_config.json # fill in 5 keys +# tenant_config.json is gitignored. +``` + +`tenant_config.json` shape: +```json +{ + "S1_CONSOLE_URL": "https://-.example", + "S1_CONSOLE_API_TOKEN": "", + "SDL_XDR_URL": "https://xdr..example", + "SDL_LOG_READ_KEY": "", + "SDL_CONFIG_READ_KEY": "" +} +``` + +Optional environment overrides: + +| Variable | Default | Purpose | +|---|---|---| +| `SIEM_TOOLKIT_CONFIG` | `./tenant_config.json` | path to credentials | +| `SIGMA_OUT_DIR` | `/tmp/sigma_converted_v4` | where `.pq` artefacts land | +| `SIGMA_VENV_PY` | `/tmp/sigma_venv/bin/python3` | Python that hosts pysigma | +| `GH_BIN` | `gh` | GitHub CLI binary | +| `SITE_ID` | (auto-discovered) | force-deploy into a specific site | +| `DEPLOYED_IDS_FILE` | `./deployed_rule_ids.json` | input for verify scripts | + +## The 5-step workflow + +### Step 1 — Find thin tactics + +```bash +python3 recommend_sigma_imports.py +``` + +Reads the SIEM-toolkit coverage endpoints (`/api/coverage/health`, +`/api/coverage/mitre`, `/api/coverage/map`) and prints, in order: + +- Tenant **health row** (`health_score`, `firing_pct`, active sources). +- **Active log sources** ranked by event volume — only import Sigma + rules whose `logsource` matches a source that actually produces + events here. +- **MITRE tactic depth** — tactics with `rule_count < 100` and a high + `technique_count` are the THIN ones. Typical findings: + Reconnaissance, Discovery, Lateral Movement, Collection, Exfiltration. +- **Recommended SigmaHQ folders** with GitHub-verified rule counts. +- A curated **14-rule shortlist** for the thinnest gaps. + +### Step 2 — Pick Sigma rules + +The picker in `convert_test_deploy_sigma.py` matches filename-stem +keywords against the SigmaHQ tree it lists via `gh api`. Edit the +`WANTED` table to change the 10 rules. Each row is +`(tactic, technique_label, [keywords], allow_powershell_folder)`. + +The default list covers: + +| Tactic | Technique | Sigma file | +|---|---|---| +| Lateral Movement | T1021.006 WinRM (evil-winrm) | `proc_creation_win_hktl_evil_winrm.yml` | +| Collection | T1113 Screen Capture (Psr.exe) | `proc_creation_win_psr_capture_screenshots.yml` | +| Collection | T1115 Clipboard (Get-Clipboard) | `proc_creation_win_powershell_get_clipboard.yml` | +| Exfiltration | T1560.001 RAR (.dmp files) | `proc_creation_win_winrar_exfil_dmp_files.yml` | +| Exfiltration | T1567.002 rclone | `proc_creation_win_pua_rclone_execution.yml` | +| Reconnaissance | T1016 netsh portproxy | `proc_creation_win_netsh_port_forwarding.yml` | +| Discovery | T1087/T1033 whoami /priv | `proc_creation_win_whoami_priv_discovery.yml` | +| Discovery | T1087/T1482 SharpHound | `proc_creation_win_hktl_bloodhound_sharphound.yml` | +| Credential Access | T1003.001 Mimikatz cmd-line | `proc_creation_win_hktl_mimikatz_command_line.yml` | +| Credential Access | T1003.001 ProcDump LSASS | `proc_creation_win_sysinternals_procdump_lsass.yml` | + +### Step 3 — Convert + smoke-test + deploy + +Optional preliminary: probe what fields the tenant's WEL parser +actually emits so the WEL-mapped variant queries land on real columns: + +```bash +python3 probe_wel_schema.py +``` + +Then run the master pipeline: + +```bash +# Convert + smoke-test only: +python3 convert_test_deploy_sigma.py + +# Convert + smoke-test + create SDL Scheduled rules in Draft: +python3 convert_test_deploy_sigma.py --deploy +``` + +For each of the 10 rules the script writes **three** PowerQuery variants: + +| File | Purpose | +|---|---| +| `.pq` | **faithful** — S1 DV schema (production form) | +| `.relaxed.pq` | strips `endpoint.os` and `event.type` clauses (useful on tenants where those fields are null) | +| `.wel.pq` | rewritten onto the `microsoft_windows_eventlog-latest` parser fields (`CommandLine`, `Image`, `ParentImage`, `EventID=4688\|1`, `dataSource.name='Windows Event Logs'`) | + +Each variant is smoke-tested against `POST {SDL_XDR_URL}/api/powerQuery` +(last 24 h). HTTP 200 is what we want; rows=0 simply means no telemetry +matched in the window. + +With `--deploy`, the **faithful** variant is also POSTed to +`/web/api/v2.1/cloud-detection/rules` as a `Scheduled` rule in `Draft` +status, then `deployed_rule_ids.json` is written next to the script +mapping each rule ID back to its source. + +#### Edge cases the converter handles + +- **Unsupported Sigma fields** (e.g. `OriginalFileName`) cause the + backend to print its known-field list as the error. + `fixup_rules_6_7.py` strips those keys from the YAML and re-converts. + The rule remains semantic because `Image|endswith:` is the primary + selector. +- **Wrong folder** — some rules live under `rules/windows/powershell/` + not `process_creation/`. The picker can expand its scope. +- **`event.type='Process Creation'` and `endpoint.os='windows'`** are + often empty on real tenants — that's why the **relaxed** and **WEL** + variants exist. + +### Step 4 — Verify + +The service-user role that can POST a rule often **cannot** GET it +back (`cloudDetectionRulesView` missing). The collection endpoint +silently filters the rule out, and `GET /rules/{id}` returns HTTP 405 +on this API version. PUT is the definitive existence test: + +```bash +python3 verify_rule_exists_via_put.py +``` + +Reads `deployed_rule_ids.json` and PUTs each rule ID. 200/204 = EXISTS, +404 = NOT FOUND. Optional deeper diagnostic: + +```bash +python3 verify_deployed_sigma_rules.py +``` + +Probes the list endpoint with several scope-filter variants so you can +see exactly which RBAC layer is hiding what. + +### Step 5 — Run on another tenant + +The 30 `.pq` files in `SIGMA_OUT_DIR` are tenant-agnostic. Point the +credentials at a different tenant and re-run only Step 3's deploy + +Step 4: + +```bash +# Option A: replace tenant_config.json +cp tenant_config.example.json tenant_config.json && $EDITOR tenant_config.json +python3 run_sigma_on_tenant.py + +# Option B: keep separate config files +SIEM_TOOLKIT_CONFIG=./tenant_prod.json python3 run_sigma_on_tenant.py +SIEM_TOOLKIT_CONFIG=./tenant_lab.json python3 run_sigma_on_tenant.py +``` + +`run_sigma_on_tenant.py` is a single-shot probe → smoke-test → deploy +→ PUT-verify, useful when you already have the converted bodies and +just want to land them on a new tenant. + +## Files + +| File | Role | +|---|---| +| `recommend_sigma_imports.py` | Reads coverage endpoints, recommends folders + curated rule list | +| `probe_wel_schema.py` | Discovers WEL parser field schema on the tenant | +| `convert_test_deploy_sigma.py` | Master pipeline: pick + convert (3 variants) + smoke + `--deploy` | +| `fixup_rules_6_7.py` | Handles Sigma rules with backend-unsupported keys (e.g. `OriginalFileName`) | +| `run_sigma_on_tenant.py` | Re-deploys already-converted bodies to another tenant | +| `verify_rule_exists_via_put.py` | PUT-existence test (definitive when GET is RBAC-blocked) | +| `verify_deployed_sigma_rules.py` | Probes scope/filter variants to diagnose RBAC | +| `tenant_config.example.json` | Template — copy to `tenant_config.json` (gitignored) | + +## Where it fits in the SIEM-toolkit story + +``` +SIEM-toolkit Threat Coverage map + │ + ▼ +recommend_sigma_imports.py ──┐ + │ (suggests SigmaHQ folders) │ + ▼ │ +convert_test_deploy_sigma.py ├── single workflow + │ (Sigma → PQ → SDL) │ + ▼ │ +verify_rule_exists_via_put.py ──┘ + │ + ▼ +Activate rules in console UI + │ + ▼ +Re-run SIEM-toolkit Threat Coverage → firing_pct grows +``` + +## Pitfalls collected so far + +- **`event.type='Process Creation'`** has near-zero population unless a + live S1 EDR agent is reporting; relax variant works around it. +- **`endpoint.os='windows'`** is `null` on many tenants; always strip + for the relaxed variant. +- **GitHub anonymous rate limit** (60 req/h) kills the listing step — + use `gh auth login`. +- **Service-user RBAC** without `cloudDetectionRulesView` makes POSTed + rules invisible to GET. PUT confirms they exist. +- **`OriginalFileName`** in Sigma YAML breaks the S1-PQ backend; strip + with the pre-processor. +- **PowerQuery parser quirks** — bare `*` as a query is rejected; + comments with `/`, `-`, or non-ASCII characters cause Load Failed at + rule-validation time even when the body POSTs fine to + `/api/powerQuery`. Keep comments out of any body that will be + deployed as a Scheduled rule. diff --git a/convert_test_deploy_sigma.py b/convert_test_deploy_sigma.py new file mode 100644 index 0000000..3f632fa --- /dev/null +++ b/convert_test_deploy_sigma.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +convert_test_deploy_sigma.py -- Sigma -> PowerQuery -> SDL Scheduled Rule. + +Master pipeline that addresses every TODO from the v3 review: + + (a) Fixes rule #6 (netsh) by trying multiple candidate filenames AND by + catching the pipeline error so the loop continues. Fixes rule #7 + (AdsiSearcher) by also searching rules/windows/powershell/. + (b) Adds a WEL-mapping post-processor that rewrites the S1 EDR/DV PQ + fields to the microsoft_windows_eventlog-latest parser schema so + the queries can fire against Windows Event Log telemetry. + (c) Deploys every PQ that passes the live /api/powerQuery smoke test + as an SDL Scheduled rule via the S1 Mgmt API (POST + /web/api/v2.1/cloud-detection/rules). Requires --deploy + a valid + S1_CONSOLE_API_TOKEN in config.json. + +For each rule we emit THREE PowerQuery variants and smoke-test each: + + .pq -- faithful Sigma -> S1-PQ conversion (DV schema) + .relaxed.pq -- faithful minus the endpoint.os and event.type + clauses (DV schema but null-os-tolerant) + .wel.pq -- field-mapped onto microsoft_windows_eventlog- + latest (CommandLine, Image, ParentImage, ...) + +Usage: + python3 convert_test_deploy_sigma.py # convert + test only + python3 convert_test_deploy_sigma.py --deploy # also create SDL rules +""" +from __future__ import annotations +import argparse +import json +import os +import pathlib +import re +import subprocess +import time +import urllib.error +import urllib.request +from typing import Any + +HERE = pathlib.Path(__file__).resolve().parent +VENV_PY = os.environ.get("SIGMA_VENV_PY", "/tmp/sigma_venv/bin/python3") +GH = os.environ.get("GH_BIN", "gh") +OUT = pathlib.Path(os.environ.get( + "SIGMA_OUT_DIR", "/tmp/sigma_converted_v4")); OUT.mkdir(exist_ok=True) + +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/") +SDL_KEY = CFG["SDL_LOG_READ_KEY"] +S1_CONS = CFG.get("S1_CONSOLE_URL", "").rstrip("/") +S1_TOK = CFG.get("S1_CONSOLE_API_TOKEN", "").rstrip(".") +# Site id is discovered at runtime from /sites?limit=10 (first active site). +# Override with SITE_ID env var if you have multiple sites and want a +# specific one. +SITE_ID = os.environ.get("SITE_ID", "") +SIGMA_RAW = "https://raw.githubusercontent.com/SigmaHQ/sigma/master" + +# 10 desired (tactic, technique, keyword_list, allow_powershell_folder) +WANTED: list[tuple[str, str, list[str], bool]] = [ + ("Lateral Movement", "T1021.006 WinRM", + ["winrm", "winrs"], False), + ("Collection", "T1113 Screen Capture", + ["screen_capture", "screencapture", "screenshot"], False), + ("Collection", "T1115 Clipboard Data", + ["clipboard"], False), + ("Exfiltration", "T1560.001 Archive via RAR", + ["winrar_compress", "winrar", "rar_compress"], False), + ("Exfiltration", "T1567.002 Exfil via rclone", + ["rclone"], False), + ("Reconnaissance", "T1016 netsh port-fwd", + ["netsh_allowed_ports", "netsh_port_proxy", "netsh_port_fwd", + "netsh_fw", "netsh_portproxy"], False), + ("Discovery", "T1087.002 AdsiSearcher", + ["adsisearcher", "adsi_searcher"], True), # in powershell/ + ("Discovery", "T1087/T1482 SharpHound", + ["sharphound", "bloodhound"], False), + ("Credential Access", "T1003.001 Mimikatz cmdline", + ["mimikatz_command_line", "mimikatz_cli", "mimikatz"], False), + ("Credential Access", "T1003.001 ProcDump LSASS", + ["procdump_lsass", "procdump", "comsvcs_lsass"], False), +] + + +# ============================================================ helpers ====== +def gh_api(path: str) -> Any: + r = subprocess.run([GH, "api", path], capture_output=True, text=True, + timeout=60) + if r.returncode != 0: + raise RuntimeError(f"gh api {path}: {r.stderr.strip()[:300]}") + return json.loads(r.stdout) + + +def fetch(url: str) -> bytes: + req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"}) + with urllib.request.urlopen(req, timeout=30) as r: + return r.read() + + +def list_sigma_rules(allow_powershell: bool) -> list[str]: + tree = gh_api("repos/SigmaHQ/sigma/git/trees/master?recursive=1") + prefixes = ["rules/windows/process_creation/"] + if allow_powershell: + prefixes.append("rules/windows/powershell/") + return sorted( + e["path"] for e in tree.get("tree", []) + if e.get("type") == "blob" + and e.get("path", "").endswith(".yml") + and any(e["path"].startswith(p) for p in prefixes) + ) + + +def pick(paths: list[str], keywords: list[str]) -> str | None: + for kw in keywords: + for p in paths: + if kw in pathlib.Path(p).stem.lower(): + return p + return None + + +def convert(yaml_text: str) -> str: + code = ( + "import sys\n" + "from sigma.rule import SigmaRule\n" + "from sigma.backends.sentinelone_pq import SentinelOnePQBackend\n" + "r = SigmaRule.from_yaml(sys.stdin.read())\n" + "print(SentinelOnePQBackend().convert_rule(r)[0])\n") + res = subprocess.run([VENV_PY, "-c", code], input=yaml_text, text=True, + capture_output=True, timeout=90) + if res.returncode != 0: + # last line of the trace is usually the most informative + err = res.stderr.strip().splitlines() + msg = err[-1] if err else "(no stderr)" + raise RuntimeError(msg[:300]) + return res.stdout.strip() + + +def relax(pq_body: str) -> str: + """Strip endpoint.os and event.type filter clauses.""" + body = pq_body + body = re.sub(r'endpoint\.os\s*=\s*"[^"]*"\s+and\s+', '', body) + body = re.sub(r'\s+and\s+endpoint\.os\s*=\s*"[^"]*"', '', body) + body = re.sub(r'event\.type\s*=\s*"[^"]*"\s+and\s+', '', body) + body = re.sub(r'\s+and\s+event\.type\s*=\s*"[^"]*"', '', body) + body = re.sub(r'^\(\s*(.*)\s*\)$', r'\1', body.strip()) + return body.strip() + + +# DV schema -> WEL parser schema (microsoft_windows_eventlog-latest). +# Sysmon (EID=1) and Security (EID=4688) channels use slightly different +# field names; the WEL parser exposes Sysmon-style Image/ParentImage AND +# Security-style NewProcessName/ParentProcessName. We rewrite onto the +# more permissive Sysmon names because they're closer to S1 DV. +DV_TO_WEL = [ + (r'\btgt\.process\.cmdline\b', 'CommandLine'), + (r'\btgt\.process\.image\.path\b', 'Image'), + (r'\btgt\.process\.displayName\b', 'OriginalFileName'), + (r'\btgt\.process\.publisher\b', 'Company'), + (r'\bsrc\.process\.image\.path\b', 'ParentImage'), + (r'\bsrc\.process\.cmdline\b', 'ParentCommandLine'), + (r'\bsrc\.process\.user\.name\b', 'User'), +] + + +def wel_map(pq_body: str) -> str: + """Rewrite a faithful DV-schema PQ body to query the + microsoft_windows_eventlog-latest parser instead. Strategy: + - replace tgt.process.* / src.process.* with WEL field names + - replace `event.type="Process Creation"` with EID filter + - replace `endpoint.os="windows"` with dataSource.name='Windows Event Logs' + - prepend a parser-name pin so the filter narrows fast + """ + body = pq_body + for pat, repl in DV_TO_WEL: + body = re.sub(pat, repl, body) + body = re.sub(r'event\.type\s*=\s*"Process Creation"', + "(EventID=4688 or EventID=1)", body) + body = re.sub(r'endpoint\.os\s*=\s*"windows"', + "dataSource.name='Windows Event Logs'", body) + # Drop any leftover DV-only field comparisons that didn't map (would + # otherwise null-filter every row). Only one we've seen: integrityLevel. + body = re.sub(r'(?:\(\s*)?[\w.]+\.integrityLevel\s*=\s*"[^"]*"' + r'\s+(?:and|or)\s+', '', body) + return body.strip() + + +def pq(query: str, hours: int = 24) -> tuple[int, str, int]: + end = int(time.time() * 1000); start = end - hours * 3600 * 1000 + payload = {"token": SDL_KEY, "query": query, + "startTime": str(start), "endTime": str(end)} + req = urllib.request.Request( + f"{SDL_BASE}/api/powerQuery", + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=60) as r: + d = json.loads(r.read()) + return 200, "ok", len(d.get("values") or []) + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:250], 0 + + +def deploy_rule(name: str, description: str, pq_body: str) -> tuple[int, str]: + """POST a Scheduled-PQ rule to S1 Mgmt API.""" + if not (S1_CONS and S1_TOK): + return 0, "no S1_CONSOLE_URL or S1_CONSOLE_API_TOKEN in config" + payload = { + "data": { + "name": name, + "description": description, + "severity": "Medium", + "expirationMode": "Permanent", + "queryType": "scheduled", + "queryLang": "2.0", + "status": "Draft", + "treatAsThreat": "UNDEFINED", + "networkQuarantine": False, + "coolOffSettings": {"renotifyMinutes": 60}, + "scheduledParams": { + "query": pq_body, + "lookbackWindowMinutes": 30, + "runIntervalMinutes": 5, + "threshold": {"value": 0, "operator": "Greater"}, + }, + }, + "filter": {"siteIds": [SITE_ID]}, + } + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/cloud-detection/rules", + data=json.dumps(payload).encode(), method="POST") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + d = json.loads(r.read()) + rid = (d.get("data") or {}).get("id") or "?" + return 200, f"created id={rid}" + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:300] + + +# ============================================================ main ========= +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--deploy", action="store_true", + help="Also create each valid PQ as an SDL Scheduled rule.") + args = ap.parse_args() + + print(f"\n{'='*78}\n Sigma -> PowerQuery (faithful + relaxed + WEL) " + f"-> SDL rule\n{'='*78}\n") + print(f" Backend : pysigma-backend-sentinelone-pq") + print(f" Tenant SDL : {SDL_BASE}") + print(f" Tenant Mgmt API : {S1_CONS}") + print(f" Deploy rules : {'YES' if args.deploy else 'no (use --deploy)'}") + print(f" Output : {OUT}\n") + + # Site-id auto-discovery (only needed for --deploy). + global SITE_ID + if args.deploy and not SITE_ID: + try: + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/sites?limit=10") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Accept", "application/json") + with urllib.request.urlopen(req, timeout=20) as r: + sites = ((json.loads(r.read()).get("data") or {}) + .get("sites") or []) + if not sites: + print(" FATAL: --deploy requested but no sites visible " + "to this token.") + return 1 + SITE_ID = sites[0]["id"] + print(f" Site discovered : {SITE_ID} " + f"({sites[0].get('name')})\n") + except urllib.error.HTTPError as e: + print(f" FATAL site discovery: HTTP {e.code} " + f"{e.read().decode()[:200]}") + return 1 + + # Pre-fetch the two relevant trees once + print("--- listing sigmahq/sigma rule paths via gh api ---") + pc_only = list_sigma_rules(allow_powershell=False) + pc_and_pwsh = list_sigma_rules(allow_powershell=True) + print(f" process_creation/ : {len(pc_only)} rules") + print(f" process_creation/ + powershell/ : {len(pc_and_pwsh)} rules\n") + + summary: list[dict[str, Any]] = [] + for i, (tactic, tech, kws, allow_pwsh) in enumerate(WANTED, 1): + paths = pc_and_pwsh if allow_pwsh else pc_only + rec: dict[str, Any] = {"i": i, "tactic": tactic, "tech": tech} + print(f"[{i:02d}/10] {tactic} :: {tech}") + path = pick(paths, kws) + if not path: + print(f" PICK : no match for {kws}\n") + rec["status"] = "no_match"; summary.append(rec); continue + print(f" PICK : {path}") + rec["path"] = path + try: + raw = fetch(f"{SIGMA_RAW}/{path}").decode("utf-8") + except Exception as e: + print(f" FETCH : FAIL {e}\n") + rec["status"] = "fetch_failed"; summary.append(rec); continue + stem = pathlib.Path(path).stem + (OUT / f"{stem}.yml").write_text(raw) + + try: + pq_body = convert(raw) + except Exception as e: + print(f" CONVERT : FAIL {e}\n") + rec["status"] = "convert_failed"; rec["err"] = str(e) + summary.append(rec); continue + relaxed_body = relax(pq_body) + wel_body = wel_map(pq_body) + (OUT / f"{stem}.pq").write_text(pq_body) + (OUT / f"{stem}.relaxed.pq").write_text(relaxed_body) + (OUT / f"{stem}.wel.pq").write_text(wel_body) + rec["pq_chars"] = len(pq_body) + rec["relaxed_chars"] = len(relaxed_body) + rec["wel_chars"] = len(wel_body) + print(f" CONVERT : OK faithful={len(pq_body)}c " + f"relaxed={len(relaxed_body)}c wel={len(wel_body)}c") + + # smoke test all three + c1, _, r1 = pq(pq_body) + c2, _, r2 = pq(relaxed_body) + c3, e3, r3 = pq(wel_body) + rec.update({"fa_http": c1, "fa_rows": r1, + "re_http": c2, "re_rows": r2, + "wel_http": c3, "wel_rows": r3, + "wel_err": e3 if c3 != 200 else ""}) + print(f" TEST FA : HTTP {c1} rows={r1}") + print(f" TEST RE : HTTP {c2} rows={r2}") + print(f" TEST WEL: HTTP {c3} rows={r3}" + f"{' err=' + e3[:120] if c3 != 200 else ''}") + + valid = (c1 == 200) or (c3 == 200) + rec["status"] = ("FIRES" if (r1 > 0 or r2 > 0 or r3 > 0) + else "valid_no_data" if valid + else "PQ_ERROR") + + # deploy faithful (only) if requested + valid + if args.deploy and c1 == 200: + rule_name = (f"[Sigma->PQ] {tactic} / {tech} " + f"({pathlib.Path(path).stem})")[:128] + desc = (f"Auto-converted from SigmaHQ/sigma " + f"{path} via pysigma-backend-sentinelone-pq. " + f"Faithful S1 DV schema.") + dc, dmsg = deploy_rule(rule_name, desc, pq_body) + rec["deploy_http"] = dc; rec["deploy_msg"] = dmsg + if dc == 200: + # dmsg shape is "created id="; extract just the id + rec["rule_id"] = dmsg.split("id=")[-1].strip() + rec["pq_file"] = f"{pathlib.Path(path).stem}.pq" + print(f" DEPLOY : HTTP {dc} {dmsg[:160]}") + print() + summary.append(rec) + + # --- summary --- + print(f"{'='*78}\n SUMMARY (rows = events matched in last 24 h)\n" + f"{'='*78}") + hdr = (f" {'#':>3} {'tactic':<18}{'technique':<26}" + f"{'fa':>5}{'re':>5}{'wel':>5} status") + print(hdr); print(" " + "-" * (len(hdr) - 2)) + for s in summary: + print(f" {s['i']:>3} {s['tactic']:<18}{s['tech']:<26}" + f"{s.get('fa_rows','-')!s:>5}{s.get('re_rows','-')!s:>5}" + f"{s.get('wel_rows','-')!s:>5} {s.get('status','-')}") + fires = sum(1 for s in summary + if any(s.get(k, 0) and s[k] > 0 + for k in ('fa_rows', 're_rows', 'wel_rows'))) + valid = sum(1 for s in summary + if s.get('status') in ('valid_no_data', 'FIRES')) + failed = sum(1 for s in summary + if s.get('status') in ('no_match', 'fetch_failed', + 'convert_failed', 'PQ_ERROR')) + print(f"\n Rules with any matches : {fires}/10") + print(f" Syntactically valid : {valid}/10") + print(f" Failed / not matched : {failed}/10") + if args.deploy: + deployed = [s for s in summary if s.get('deploy_http') == 200] + print(f" SDL rules created : {len(deployed)}/10") + # Persist the (rule_id, pq_file) map for verify scripts. + ids_file = HERE / "deployed_rule_ids.json" + ids_file.write_text(json.dumps( + {"tenant": S1_CONS, + "site_id": SITE_ID, + "rules": [{"rule_id": s["rule_id"], + "pq_file": s["pq_file"], + "tactic": s["tactic"], + "tech": s["tech"]} + for s in deployed]}, indent=2)) + print(f" Deployed IDs : {ids_file}") + print(f" Artefacts : {OUT}/") + print(f"\n Next steps:") + print(f" - inspect {OUT}/*.wel.pq for WEL variants") + print(f" - re-run with --deploy to create SDL Scheduled rules") + print(f" - verify with verify_rule_exists_via_put.py") + print(f" - check console UI: {S1_CONS}/#/cloud-detection/rules\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/fixup_rules_6_7.py b/fixup_rules_6_7.py new file mode 100644 index 0000000..83e2f5c --- /dev/null +++ b/fixup_rules_6_7.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +fixup_rules_6_7.py + +Re-runs the convert -> test -> deploy pipeline for ONLY the 2 rules that +failed in convert_test_deploy_sigma.py: + + #6 Reconnaissance T1016 -- netsh port forwarding (the original + `netsh_fw_add_rule.yml` uses a Sigma `|fieldref` modifier the + S1-PQ backend doesn't support; switch to + `netsh_port_forwarding.yml`). + + #7 Discovery T1087.002 -- AdsiSearcher (no .yml under + rules/windows/process_creation/ or rules/windows/powershell/ is + named adsisearcher; replace with `whoami /priv` which covers + T1033 + T1087 Account Discovery and is highly diagnostic). + +Runs the same 3-variant pipeline (faithful, relaxed, WEL-mapped), +smoke-tests each, and POSTs the faithful PQ as an SDL Scheduled rule. +""" +from __future__ import annotations +import json, os, pathlib, re, subprocess, sys, time +import urllib.error, urllib.request + +HERE = pathlib.Path(__file__).resolve().parent +VENV_PY = os.environ.get("SIGMA_VENV_PY", "/tmp/sigma_venv/bin/python3") +OUT = pathlib.Path(os.environ.get( + "SIGMA_OUT_DIR", "/tmp/sigma_converted_v4")) +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/") +SDL_KEY = CFG["SDL_LOG_READ_KEY"] +S1_CONS = CFG["S1_CONSOLE_URL"].rstrip("/") +S1_TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".") +SITE_ID = os.environ.get("SITE_ID", "") # auto-discovered in main() +SIGMA_RAW = "https://raw.githubusercontent.com/SigmaHQ/sigma/master" + +# (tactic, technique, sigmahq/sigma path) +REPLACEMENTS = [ + ("Reconnaissance", "T1016 netsh port forwarding", + "rules/windows/process_creation/" + "proc_creation_win_netsh_port_forwarding.yml"), + ("Discovery", "T1087/T1033 whoami /priv", + "rules/windows/process_creation/" + "proc_creation_win_whoami_priv_discovery.yml"), +] + + +def strip_unsupported_sigma_fields(yaml_text: str) -> str: + """Remove Sigma fields that the S1-PQ backend doesn't map. + + The backend errors with a `{CommandLine}, {Company}, ...` field list + whenever it sees a key it has no mapping for. The only one we hit in + practice is `OriginalFileName`, which most LOLBins-style rules use as + an alternate way to fingerprint a process; the rule remains semantic + once removed because `Image|endswith:` is the primary selector. + + Strategy: drop any selection block that ONLY contains OriginalFileName, + OR delete the lone OriginalFileName line from a mixed list. + """ + out: list[str] = [] + skip_block = False + for line in yaml_text.splitlines(): + s = line.strip() + # Lone OriginalFileName key in a flow style ("- OriginalFileName: 'netsh.exe'") + if s.startswith("- OriginalFileName:") or s.startswith("OriginalFileName:"): + continue + out.append(line) + return "\n".join(out) + + +def fetch(url: str) -> bytes: + req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"}) + with urllib.request.urlopen(req, timeout=30) as r: + return r.read() + + +def convert(yaml_text: str) -> str: + code = ( + "import sys\n" + "from sigma.rule import SigmaRule\n" + "from sigma.backends.sentinelone_pq import SentinelOnePQBackend\n" + "r = SigmaRule.from_yaml(sys.stdin.read())\n" + "print(SentinelOnePQBackend().convert_rule(r)[0])\n") + res = subprocess.run([VENV_PY, "-c", code], input=yaml_text, text=True, + capture_output=True, timeout=90) + if res.returncode != 0: + err = res.stderr.strip().splitlines() + raise RuntimeError((err[-1] if err else "(no stderr)")[:300]) + return res.stdout.strip() + + +def relax(pq_body: str) -> str: + b = pq_body + b = re.sub(r'endpoint\.os\s*=\s*"[^"]*"\s+and\s+', '', b) + b = re.sub(r'\s+and\s+endpoint\.os\s*=\s*"[^"]*"', '', b) + b = re.sub(r'event\.type\s*=\s*"[^"]*"\s+and\s+', '', b) + b = re.sub(r'\s+and\s+event\.type\s*=\s*"[^"]*"', '', b) + return re.sub(r'^\(\s*(.*)\s*\)$', r'\1', b.strip()).strip() + + +DV_TO_WEL = [ + (r'\btgt\.process\.cmdline\b', 'CommandLine'), + (r'\btgt\.process\.image\.path\b', 'Image'), + (r'\btgt\.process\.displayName\b', 'OriginalFileName'), + (r'\btgt\.process\.publisher\b', 'Company'), + (r'\bsrc\.process\.image\.path\b', 'ParentImage'), + (r'\bsrc\.process\.cmdline\b', 'ParentCommandLine'), + (r'\bsrc\.process\.user\.name\b', 'User'), +] + + +def wel_map(pq_body: str) -> str: + b = pq_body + for pat, repl in DV_TO_WEL: + b = re.sub(pat, repl, b) + b = re.sub(r'event\.type\s*=\s*"Process Creation"', + "(EventID=4688 or EventID=1)", b) + b = re.sub(r'endpoint\.os\s*=\s*"windows"', + "dataSource.name='Windows Event Logs'", b) + return b.strip() + + +def pq(query: str, hours: int = 24) -> tuple[int, str, int]: + end = int(time.time() * 1000); start = end - hours * 3600 * 1000 + req = urllib.request.Request( + f"{SDL_BASE}/api/powerQuery", + data=json.dumps({"token": SDL_KEY, "query": query, + "startTime": str(start), + "endTime": str(end)}).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=60) as r: + return 200, "ok", len( + (json.loads(r.read()).get("values") or [])) + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:250], 0 + + +def deploy(name: str, desc: str, body: str) -> tuple[int, str]: + payload = { + "data": {"name": name, "description": desc, "severity": "Medium", + "expirationMode": "Permanent", "queryType": "scheduled", + "queryLang": "2.0", "status": "Draft", + "treatAsThreat": "UNDEFINED", "networkQuarantine": False, + "coolOffSettings": {"renotifyMinutes": 60}, + "scheduledParams": {"query": body, + "lookbackWindowMinutes": 30, + "runIntervalMinutes": 5, + "threshold": {"value": 0, + "operator": "Greater"}}}, + "filter": {"siteIds": [SITE_ID]}} + if not SITE_ID: + return 0, "SITE_ID not set / discoverable" + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/cloud-detection/rules", + data=json.dumps(payload).encode(), method="POST") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + d = json.loads(r.read()) + return 200, f"id={(d.get('data') or {}).get('id', '?')}" + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:300] + + +def main() -> int: + global SITE_ID + print(f"\n{'='*78}\n Fix-up: re-convert + deploy rules #6 and #7" + f"\n{'='*78}\n") + if not SITE_ID: + try: + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/sites?limit=10") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Accept", "application/json") + sites = ((json.loads(urllib.request.urlopen(req, timeout=20).read() + ).get("data") or {}).get("sites") or []) + if sites: + SITE_ID = sites[0]["id"] + print(f" Site discovered : {SITE_ID} " + f"({sites[0].get('name')})\n") + else: + print(" FATAL: no sites visible to this token.") + return 1 + except urllib.error.HTTPError as e: + print(f" FATAL site discovery: HTTP {e.code} " + f"{e.read().decode()[:200]}") + return 1 + for i, (tactic, tech, path) in enumerate(REPLACEMENTS, start=6): + idx = "06" if i == 6 else "07" + print(f"[{idx}/10] {tactic} :: {tech}") + print(f" SIGMA : {path}") + try: + raw = fetch(f"{SIGMA_RAW}/{path}").decode("utf-8") + except Exception as e: + print(f" FETCH : FAIL {e}\n"); continue + stem = pathlib.Path(path).stem + (OUT / f"{stem}.yml").write_text(raw) + cleaned = strip_unsupported_sigma_fields(raw) + if cleaned != raw: + (OUT / f"{stem}.cleaned.yml").write_text(cleaned) + removed = len(raw.splitlines()) - len(cleaned.splitlines()) + print(f" PREP : stripped {removed} OriginalFileName " + f"line(s) the S1-PQ backend can't map") + try: + body = convert(cleaned) + except Exception as e: + print(f" CONVERT : FAIL {e}\n"); continue + re_body = relax(body) + wel_body = wel_map(body) + (OUT / f"{stem}.pq").write_text(body) + (OUT / f"{stem}.relaxed.pq").write_text(re_body) + (OUT / f"{stem}.wel.pq").write_text(wel_body) + print(f" CONVERT : OK faithful={len(body)}c " + f"relaxed={len(re_body)}c wel={len(wel_body)}c") + print(f" FA : {body[:160]}{'...' if len(body)>160 else ''}") + print(f" WEL : {wel_body[:160]}" + f"{'...' if len(wel_body)>160 else ''}") + + c1, _, r1 = pq(body) + c2, _, r2 = pq(re_body) + c3, e3, r3 = pq(wel_body) + print(f" TEST FA : HTTP {c1} rows={r1}") + print(f" TEST RE : HTTP {c2} rows={r2}") + print(f" TEST WEL: HTTP {c3} rows={r3}" + f"{' err=' + e3[:100] if c3 != 200 else ''}") + + if c1 == 200: + rule_name = f"[Sigma->PQ] {tactic} / {tech} ({stem})"[:128] + dc, dmsg = deploy(rule_name, + f"Auto-converted from SigmaHQ/sigma {path}", + body) + print(f" DEPLOY : HTTP {dc} {dmsg[:160]}") + print() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/probe_wel_schema.py b/probe_wel_schema.py new file mode 100644 index 0000000..45f1f1c --- /dev/null +++ b/probe_wel_schema.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +""" +probe_wel_schema.py + +Probe the tenant's Singularity Data Lake to discover what fields the +`microsoft_windows_eventlog-latest` parser emits. Output guides the WEL +mapping pipeline in convert_test_deploy_sigma.py. + +Runs a series of read-only PowerQuery probes for the last 24 h. No state +changes -- safe to re-run. +""" +from __future__ import annotations +import json +import os +import pathlib +import time +import urllib.request +import urllib.error + +HERE = pathlib.Path(__file__).resolve().parent +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +BASE = CFG["SDL_XDR_URL"].rstrip("/") +TOK = CFG["SDL_LOG_READ_KEY"] + + +def pq(query: str, hours: int = 24) -> tuple[str, list, list[str]]: + end = int(time.time() * 1000); start = end - hours * 3600 * 1000 + req = urllib.request.Request( + f"{BASE}/api/powerQuery", + data=json.dumps({"token": TOK, "query": query, + "startTime": str(start), + "endTime": str(end)}).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + d = json.loads(urllib.request.urlopen(req, timeout=60).read()) + return ("OK", d.get("values") or [], + [c.get("name") for c in (d.get("columns") or [])]) + except urllib.error.HTTPError as e: + return (f"HTTP{e.code}", [e.read().decode()[:250]], []) + except Exception as e: + return (f"{type(e).__name__}", [str(e)], []) + + +PROBES: list[tuple[str, str]] = [ + ("WEL distribution by EventID", + "parser.name='microsoft_windows_eventlog-latest' " + "| group n=count() by EventID | sort -n | limit 20"), + ("WEL channel / provider distribution", + "parser.name='microsoft_windows_eventlog-latest' " + "| group n=count() by Channel | sort -n | limit 15"), + ("WEL ProviderName distribution", + "parser.name='microsoft_windows_eventlog-latest' " + "| group n=count() by ProviderName | sort -n | limit 15"), + ("WEL EID=4688 row sample (Security: process creation)", + "parser.name='microsoft_windows_eventlog-latest' EventID=4688 " + "| columns CommandLine, NewProcessName, ParentProcessName, " + "SubjectUserName, ProcessId | limit 3"), + ("WEL EID=1 row sample (Sysmon: process creation)", + "parser.name='microsoft_windows_eventlog-latest' EventID=1 " + "| columns CommandLine, Image, ParentImage, User, ProcessGuid | limit 3"), + ("Probe alternate camelCase fields on the WEL parser", + "parser.name='microsoft_windows_eventlog-latest' " + "| columns commandLine, image, parentImage, eventId | limit 3"), + ("Probe nested process.* fields on the WEL parser", + "parser.name='microsoft_windows_eventlog-latest' " + "| columns process.cmdLine, process.image.path, " + "process.parentImage.path, event.id | limit 3"), + ("EID=4688 count alone (volume sanity)", + "parser.name='microsoft_windows_eventlog-latest' EventID=4688 " + "| group n=count() | limit 1"), + ("EID=1 count alone", + "parser.name='microsoft_windows_eventlog-latest' EventID=1 " + "| group n=count() | limit 1"), + ("Any cmdline-bearing record sample (raw)", + "parser.name='microsoft_windows_eventlog-latest' " + "| columns rawMessage | limit 1"), +] + + +def main() -> int: + print(f"\n{'='*78}\n WEL parser schema probe -- last 24 h\n " + f"endpoint: {BASE}/api/powerQuery\n{'='*78}") + for label, query in PROBES: + status, rows, cols = pq(query) + oneline = query.replace("\n", " ") + print(f"\n--- {label} ---") + print(f" query : {oneline[:160]}{'...' if len(oneline)>160 else ''}") + print(f" status: {status} cols: {cols}") + for r in rows[:10]: + r_str = str(r) + print(f" {r_str[:240]}{'...' if len(r_str)>240 else ''}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/recommend_sigma_imports.py b/recommend_sigma_imports.py new file mode 100644 index 0000000..17696ca --- /dev/null +++ b/recommend_sigma_imports.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 +""" +recommend_sigma_imports.py + +Reads the local Threat Coverage state from the SIEM-toolkit-patched backend +(http://localhost:8001) and recommends concrete Sigma rules from +https://github.com/sigmahq/sigma to import. + +Strategy +-------- +Sigma rules only add value when: + 1. The targeted log source is ACTIVELY ingested by your tenant. + 2. The MITRE technique is currently weak (low rule_count) or missing. + +The script therefore: + - Lists every active source the backend has detected (with event counts). + - Lists every covered MITRE technique and per-tactic rule counts. + - Maps each active source -> the Sigma folder(s) under sigmahq/sigma that + target that telemetry. + - Queries the Sigma repo's directory listing on GitHub to confirm the + folders exist and to count available rules. + - Prints a prioritised import list, plus the exact `git sparse-checkout` + commands you can copy/paste. + +Usage +----- + python3 recommend_sigma_imports.py + python3 recommend_sigma_imports.py --backend http://localhost:8001 +""" +from __future__ import annotations +import argparse +import json +import sys +import urllib.request +from typing import Any + + +GITHUB_API = "https://api.github.com/repos/SigmaHQ/sigma/contents" +SIGMA_REPO = "https://github.com/SigmaHQ/sigma" + +# Each active SDL source -> ordered list of (sigma_folder, why_this_folder). +# The folder path is RELATIVE to the sigmahq/sigma repo root. +SOURCE_TO_SIGMA: dict[str, list[tuple[str, str]]] = { + "Windows Event Logs": [ + ("rules/windows/builtin/security", + "Direct match: rules keyed on EventID against Security channel."), + ("rules/windows/builtin/system", + "System channel: service install, driver load, time tampering."), + ("rules/windows/builtin/application", + "Application channel: MSI installs, app crashes used as TTPs."), + ("rules/windows/process_creation", + "Process creation (EID 4688 / Sysmon 1). Highest-value Windows folder."), + ("rules/windows/powershell", + "PowerShell Operational/Script-block (EID 4103/4104)."), + ("rules/windows/registry", + "Sysmon registry events for persistence and config tampering."), + ("rules/windows/network_connection", + "Sysmon 3 / 5156 outbound connections from suspicious processes."), + ("rules/windows/file", + "Sysmon 11/15 file create + raw-access read (LSASS dump)."), + ("rules-emerging-threats/2024/Exploits", + "Recent CVE detections, many Windows-targeted."), + ], + "Azure Platform": [ + ("rules/cloud/azure/activity_logs", + "Azure Activity Log -- subscription/resource manager events."), + ("rules/cloud/azure/microsoft365", + "M365 Unified Audit Log."), + ("rules/cloud/azure/signinlogs", + "Azure AD / Entra ID sign-in logs."), + ("rules/cloud/azure/auditlogs", + "Entra ID directory audit (role assignments, app consent)."), + ], + "Identity": [ + ("rules/cloud/azure/signinlogs", + "Same Entra ID sign-in folder -- maps Identity source."), + ("rules/cloud/azure/auditlogs", + "Entra ID directory audit."), + ("rules/category/authentication", + "Cross-vendor authentication category."), + ], + "Mimecast": [ + ("rules/category/proxy", + "Sigma generic proxy category covers email-gateway URL events."), + ("rules-emerging-threats/2024/Malware", + "Recent phishing / malware lure detections."), + ], + "Stormshield": [ + ("rules/network/firewall", + "Vendor-neutral firewall log rules -- works on Stormshield once " + "field-mapped via your existing stormshield parser."), + ("rules/network/cisco", + "Borrow Cisco ASA rules as templates -- many TTPs translate 1:1."), + ], + "Prompt Security": [ + # No first-party Sigma coverage yet; recommend hunting category. + ("rules-threat-hunting/application", + "Generic application hunting rules -- closest fit for LLM prompt-" + "abuse signals until a vendor-specific Sigma category lands."), + ], +} + +# Tactics where rule_count is small enough to be a clear gap. Tuned to the +# Mitre coverage observed on this tenant (Reconnaissance=11, Lateral=83, +# Collection=77, Exfiltration=91, Discovery=86). +GAP_TACTICS = {"Reconnaissance", "Lateral Movement", "Collection", + "Exfiltration", "Discovery"} + + +def http_json(url: str, timeout: int = 30) -> Any: + req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"}) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read()) + + +def github_dir_count(path: str) -> tuple[int, str]: + """Return (rule_count, http_status) for a sigma repo subdir.""" + url = f"{GITHUB_API}/{path}" + try: + data = http_json(url) + if isinstance(data, list): + yml = sum(1 for e in data if isinstance(e, dict) + and e.get("name", "").endswith((".yml", ".yaml"))) + sub = sum(1 for e in data if isinstance(e, dict) + and e.get("type") == "dir") + return yml + sub * 0, "OK" # files at top level only here + return 0, "no-list" + except urllib.error.HTTPError as e: + return 0, f"HTTP {e.code}" + except Exception as e: + return 0, f"err {type(e).__name__}" + + +def github_recursive_count(path: str) -> int: + """Walk the tree under `path` and count *.yml files (1 level deep is + enough for Sigma's flat-folder convention; we descend 2 to be safe).""" + total = 0 + try: + listing = http_json(f"{GITHUB_API}/{path}") + if not isinstance(listing, list): + return 0 + for e in listing: + if not isinstance(e, dict): + continue + if e.get("type") == "file" and e["name"].endswith((".yml", ".yaml")): + total += 1 + elif e.get("type") == "dir": + sub = http_json(f"{GITHUB_API}/{path}/{e['name']}") + if isinstance(sub, list): + total += sum(1 for s in sub if isinstance(s, dict) + and s.get("type") == "file" + and s["name"].endswith((".yml", ".yaml"))) + except Exception: + return total + return total + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--backend", default="http://localhost:8001", + help="SIEM-toolkit-patched backend URL") + ap.add_argument("--no-github", action="store_true", + help="Skip GitHub API calls (offline / rate-limited).") + args = ap.parse_args() + + print(f"\n{'='*78}\n SIGMA IMPORT RECOMMENDATIONS\n{'='*78}") + print(f" Backend : {args.backend}") + print(f" Sigma repo : {SIGMA_REPO}") + print(f" GitHub lookups : {'disabled' if args.no_github else 'enabled'}") + + # 1) Coverage health + try: + health = http_json(f"{args.backend}/api/coverage/health") + except Exception as e: + print(f"\n[FATAL] cannot reach backend: {e}") + return 1 + + print(f"\n--- Current coverage health ---") + print(f" health_score : {health['health_score']}") + print(f" parser_pct : {health['parser_pct']}") + print(f" mitre_pct : {health['mitre_pct']}") + print(f" firing_pct : {health['firing_pct']} " + f"(only {health['rules_fired']} of {health['rules_loaded']} " + f"have fired -- importing rules without verifying they fire is " + f"the #1 source of dashboard noise)") + print(f" active_sources : {health['active_sources']}") + print(f" tactics_covered : {health['tactics_covered']}/15") + print(f" techniques cov. : {health['techniques_covered']}") + + # 2) Active sources + cov_map = http_json(f"{args.backend}/api/coverage/map") + print(f"\n--- Active log sources (ordered by event volume) ---") + print(f" {'source':<24}{'events':>10} {'parser':<32} rule_count") + sources = sorted(cov_map["sources"], key=lambda s: -s["event_count"]) + for s in sources: + print(f" {s['source_name']:<24}{s['event_count']:>10} " + f"{(s.get('parser') or '-'):<32}{s.get('rule_count', '-')}") + + # 3) MITRE tactic gaps + mitre = http_json(f"{args.backend}/api/coverage/mitre") + print(f"\n--- MITRE tactic depth (rules / techniques per tactic) ---") + print(f" {'tactic':<26}{'rules':>8}{'techs':>8} gap?") + for t in mitre["tactics"]: + gap = " <-- THIN" if t["tactic"] in GAP_TACTICS else "" + print(f" {t['tactic']:<26}{t['rule_count']:>8}" + f"{t['technique_count']:>8}{gap}") + + # 4) Recommended Sigma folders, prioritised by active-source volume + print(f"\n{'='*78}\n RECOMMENDED SIGMA FOLDERS TO IMPORT\n{'='*78}") + print(" Priority order = which active source has the most events.\n" + " Only folders for sources that are ACTIVELY producing telemetry\n" + " appear below -- rules for sources you don't ingest add zero\n" + " detection value and pollute the rule library.\n") + + seen = set() + sparse_paths: list[str] = [] + for s in sources: + name = s["source_name"] + evt = s["event_count"] + folders = SOURCE_TO_SIGMA.get(name, []) + if not folders: + print(f"--- {name} ({evt:,} events) -- no Sigma mapping curated") + continue + print(f"\n--- {name} ({evt:,} events) ---") + for folder, why in folders: + if folder in seen: + continue + seen.add(folder) + sparse_paths.append(folder) + count_str = "" + if not args.no_github: + n = github_recursive_count(folder) + count_str = f" [~{n} rules]" + print(f" * {folder}{count_str}") + print(f" {why}") + + # 5) Concrete import commands + print(f"\n{'='*78}\n COPY/PASTE: import these folders only\n{'='*78}\n") + print(" # 1. clone Sigma with sparse-checkout (no full 5GB history)") + print(" git clone --filter=blob:none --no-checkout " + f"{SIGMA_REPO}.git /tmp/sigma") + print(" cd /tmp/sigma") + print(" git sparse-checkout init --cone") + print(" git sparse-checkout set \\") + for p in sparse_paths: + print(f" {p} \\") + print(" # end of folder list") + print(" git checkout main") + print() + print(" # 2. push each .yml file into SIEM-toolkit-patched via the") + print(" # backend's /api/coverage/upload-sigma endpoint (one POST") + print(" # per file, multipart/form-data):") + print(f""" + find . -path './rules*' -name '*.yml' | while read f ; do + curl -sS -F "file=@$f" {args.backend}/api/coverage/upload-sigma \\ + -w "%{{http_code}} $f\\n" -o /dev/null + done +""") + + # 6) High-value individual rules (curated -- always worth importing) + print(f"{'='*78}\n HIGH-PRIORITY INDIVIDUAL RULES (curated)\n{'='*78}") + must_have = [ + # Lateral Movement -- weak tactic (83 rules) + ("rules/windows/builtin/security/win_security_admin_rdp_login.yml", + "Lateral Movement", "T1021.001 RDP"), + ("rules/windows/builtin/security/" + "win_security_susp_smb_share_object_access_lateral_movement.yml", + "Lateral Movement", "T1021.002 SMB"), + ("rules/windows/process_creation/" + "proc_creation_win_winrm_lateral_movement.yml", + "Lateral Movement", "T1021.006 WinRM"), + # Collection -- weak tactic (77 rules) + ("rules/windows/process_creation/" + "proc_creation_win_susp_screenshot.yml", + "Collection", "T1113 Screen Capture"), + ("rules/windows/process_creation/" + "proc_creation_win_powershell_clipboard.yml", + "Collection", "T1115 Clipboard Data"), + # Exfiltration -- weak tactic (91 rules) + ("rules/windows/network_connection/" + "net_connection_win_rclone.yml", + "Exfiltration", "T1567.002 Exfil to Cloud Storage"), + ("rules/windows/process_creation/" + "proc_creation_win_rar_compress_data.yml", + "Exfiltration", "T1560.001 Archive via Utility"), + # Reconnaissance -- THINNEST tactic (11 rules) + ("rules/windows/process_creation/" + "proc_creation_win_susp_netsh_dump_config.yml", + "Reconnaissance", "T1016 System Network Config Discovery"), + ("rules/windows/process_creation/" + "proc_creation_win_susp_adsisearcher.yml", + "Reconnaissance", "T1087.002 Domain Account Discovery"), + # Discovery + ("rules/windows/process_creation/" + "proc_creation_win_susp_bloodhound_sharphound.yml", + "Discovery", "T1087/T1482 BloodHound/SharpHound"), + # Credential Access (already 217 rules but always topical) + ("rules/windows/process_creation/" + "proc_creation_win_susp_mimikatz_command_line.yml", + "Credential Access", "T1003.001 LSASS Memory"), + ("rules/windows/process_creation/" + "proc_creation_win_susp_lsass_dump.yml", + "Credential Access", "T1003.001 LSASS Memory"), + # Azure -- broad coverage gap + ("rules/cloud/azure/signinlogs/" + "azure_aad_sign_ins_from_noninteractive_devices.yml", + "Initial Access", "T1078.004 Cloud Account abuse"), + ("rules/cloud/azure/auditlogs/" + "azure_aad_role_assigned.yml", + "Privilege Escalation", "T1098 Account Manipulation"), + ] + print(f" {'tactic':<22}{'technique':<35}rule") + for path, tactic, tech in must_have: + print(f" {tactic:<22}{tech:<35}{path}") + + print(f"\n These 14 rules close the thinnest gaps surfaced by the") + print(f" Threat Coverage map above. Import them FIRST, then iterate") + print(f" through the bulk folders.\n") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/run_sigma_on_tenant.py b/run_sigma_on_tenant.py new file mode 100644 index 0000000..2472d3e --- /dev/null +++ b/run_sigma_on_tenant.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +""" +run_sigma_on_tenant.py + +Re-runs the same 10 Sigma->PowerQuery rules against ANY tenant by +re-pointing the credentials. The 10 converted .pq bodies in +SIGMA_OUT_DIR (default /tmp/sigma_converted_v4) are tenant-agnostic -- +they only depend on the SDL DV schema, not on the specific tenant URL. + +Pipeline: + + Step 0 -- discover sites via /sites?limit=10 (token introspection) + Step 1 -- probe tenant telemetry: last 24 h volume on the EDR/DV + fields the converted rules query + (event.type, endpoint.os, tgt.process.cmdline, ...) + Step 2 -- smoke-test each of the 10 faithful .pq bodies against the + tenant's /api/powerQuery + Step 3 -- deploy each as an SDL Scheduled rule via the Mgmt API + POST /web/api/v2.1/cloud-detection/rules + Step 4 -- verify the deployed rules via PUT-existence test + +Reads tenant credentials from tenant_config.json next to this script. +Override with the SIEM_TOOLKIT_CONFIG env var. Override the artefact +location with SIGMA_OUT_DIR. Override the target site with SITE_ID. +""" +from __future__ import annotations +import json +import os +import pathlib +import time +import urllib.error +import urllib.request +from typing import Any + +HERE = pathlib.Path(__file__).resolve().parent +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +ART = pathlib.Path(os.environ.get( + "SIGMA_OUT_DIR", "/tmp/sigma_converted_v4")) + +SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/") +SDL_KEY = CFG["SDL_LOG_READ_KEY"] +S1_CONS = CFG["S1_CONSOLE_URL"].rstrip("/") +S1_TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".") + +RULES: list[tuple[str, str, str]] = [ + ("Lateral Movement", "T1021.006 WinRM (evil-winrm)", + "proc_creation_win_hktl_evil_winrm.pq"), + ("Collection", "T1113 Screen Capture (Psr.exe)", + "proc_creation_win_psr_capture_screenshots.pq"), + ("Collection", "T1115 Clipboard (Get-Clipboard)", + "proc_creation_win_powershell_get_clipboard.pq"), + ("Exfiltration", "T1560.001 RAR (.dmp files)", + "proc_creation_win_winrar_exfil_dmp_files.pq"), + ("Exfiltration", "T1567.002 rclone", + "proc_creation_win_pua_rclone_execution.pq"), + ("Reconnaissance", "T1016 netsh portproxy", + "proc_creation_win_netsh_port_forwarding.pq"), + ("Discovery", "T1087/T1033 whoami /priv", + "proc_creation_win_whoami_priv_discovery.pq"), + ("Discovery", "T1087/T1482 SharpHound", + "proc_creation_win_hktl_bloodhound_sharphound.pq"), + ("Credential Access", "T1003.001 Mimikatz cmd-line", + "proc_creation_win_hktl_mimikatz_command_line.pq"), + ("Credential Access", "T1003.001 ProcDump LSASS", + "proc_creation_win_sysinternals_procdump_lsass.pq"), +] + + +# ----------------------------------------------------- helpers -------------- +def pq(query: str, hours: int = 24) -> tuple[int, str, int]: + end = int(time.time() * 1000); start = end - hours * 3600 * 1000 + body = {"token": SDL_KEY, "query": query, + "startTime": str(start), "endTime": str(end)} + req = urllib.request.Request( + f"{SDL_BASE}/api/powerQuery", + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=60) as r: + d = json.loads(r.read()) + return 200, "ok", len(d.get("values") or []) + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:250], 0 + + +def pq_count(query: str) -> int: + wrapped = f"{query} | group n=count() | limit 1" + code, _, rows = pq(wrapped) + if code != 200 or rows == 0: + return 0 + end = int(time.time() * 1000); start = end - 24 * 3600 * 1000 + req = urllib.request.Request( + f"{SDL_BASE}/api/powerQuery", + data=json.dumps({"token": SDL_KEY, "query": wrapped, + "startTime": str(start), + "endTime": str(end)}).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + d = json.loads(urllib.request.urlopen(req, timeout=60).read()) + v = (d.get("values") or [[None]])[0] + return int(v[0]) if v and v[0] is not None else 0 + except Exception: + return 0 + + +def mgmt_get(path: str) -> tuple[int, dict]: + req = urllib.request.Request(f"{S1_CONS}{path}") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return r.status, json.loads(r.read()) + except urllib.error.HTTPError as e: + try: + return e.code, json.loads(e.read()) + except Exception: + return e.code, {"_body": "(non-json)"} + + +def deploy_rule(site_id: str, name: str, desc: str, + body: str) -> tuple[int, str]: + payload = { + "data": {"name": name, "description": desc, "severity": "Medium", + "expirationMode": "Permanent", "queryType": "scheduled", + "queryLang": "2.0", "status": "Draft", + "treatAsThreat": "UNDEFINED", "networkQuarantine": False, + "coolOffSettings": {"renotifyMinutes": 60}, + "scheduledParams": {"query": body, + "lookbackWindowMinutes": 30, + "runIntervalMinutes": 5, + "threshold": {"value": 0, + "operator": "Greater"}}}, + "filter": {"siteIds": [site_id]}} + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/cloud-detection/rules", + data=json.dumps(payload).encode(), method="POST") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + d = json.loads(r.read()) + return 200, str((d.get("data") or {}).get("id") or "?") + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:300] + + +def put_rule(site_id: str, rule_id: str, name: str, + body: str) -> tuple[int, str]: + payload = { + "data": {"name": name, "description": f"verify-by-PUT for {name}", + "severity": "Medium", "expirationMode": "Permanent", + "queryType": "scheduled", "queryLang": "2.0", + "status": "Draft", "treatAsThreat": "UNDEFINED", + "networkQuarantine": False, + "coolOffSettings": {"renotifyMinutes": 60}, + "scheduledParams": {"query": body, + "lookbackWindowMinutes": 30, + "runIntervalMinutes": 5, + "threshold": {"value": 0, + "operator": "Greater"}}}, + "filter": {"siteIds": [site_id]}} + req = urllib.request.Request( + f"{S1_CONS}/web/api/v2.1/cloud-detection/rules/{rule_id}", + data=json.dumps(payload).encode(), method="PUT") + req.add_header("Authorization", f"ApiToken {S1_TOK}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return r.status, "ok" + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:200] + + +# ----------------------------------------------------- main ----------------- +def main() -> int: + print(f"\n{'='*78}\n Sigma -> PowerQuery -> SDL on US tenant\n" + f"{'='*78}") + print(f" Mgmt API : {S1_CONS}") + print(f" SDL : {SDL_BASE}") + print(f" Artefact : {ART}\n") + + # --- 0. discover sites on US tenant ---------------------------------- + print("--- Step 0: discover sites + token identity ---------------------") + code, d = mgmt_get("/web/api/v2.1/sites?limit=10") + if code != 200: + print(f" HTTP {code} {str(d)[:300]}") + return 1 + sites = (d.get("data") or {}).get("sites") or [] + print(f" Sites visible to token: {len(sites)}") + for s in sites[:5]: + print(f" id={s.get('id')} name={s.get('name')} " + f"state={s.get('state')}") + if not sites: + print(" FATAL: no sites visible -- token has no scope here") + return 1 + site_id = sites[0]["id"] + print(f" --> deploying into site_id={site_id} " + f"({sites[0].get('name')})\n") + + # --- 1. tenant schema probe ------------------------------------------ + print("--- Step 1: probe US tenant telemetry (last 24 h) --------------") + probes = { + "event.type='Process Creation'": + "event.type='Process Creation'", + "endpoint.os='windows'": + "endpoint.os='windows'", + "tgt.process.cmdline non-empty": + "tgt.process.cmdline!=''", + "src.process.image.path non-empty": + "src.process.image.path!=''", + } + for label, q in probes.items(): + n = pq_count(q) + print(f" {label:<45}{n}") + print() + + # --- 2. smoke-test 10 rules ------------------------------------------ + print("--- Step 2: smoke-test 10 faithful PQ bodies -------------------") + test_results = [] + for i, (tactic, tech, fname) in enumerate(RULES, 1): + pq_path = ART / fname + if not pq_path.exists(): + print(f" [{i:>2}] {tactic:<18}{tech:<32} MISSING {fname}") + test_results.append((i, tactic, tech, fname, None, None)) + continue + body = pq_path.read_text() + code, msg, rows = pq(body) + print(f" [{i:>2}] {tactic:<18}{tech:<32} HTTP {code} rows={rows}") + if code != 200: + print(f" err: {msg[:160]}") + test_results.append((i, tactic, tech, fname, code, rows)) + print() + + # --- 3. deploy -------------------------------------------------------- + print("--- Step 3: deploy each valid PQ as SDL Scheduled rule ---------") + deployed: list[tuple[int, str, str, str, str]] = [] # i, tactic, tech, fname, id + for (i, tactic, tech, fname, code, rows) in test_results: + if code != 200: + print(f" [{i:>2}] SKIP (smoke-test failed)") + continue + body = (ART / fname).read_text() + name = f"[Sigma->PQ USEA1] {tactic} / {tech} ({pathlib.Path(fname).stem})"[:128] + desc = (f"Auto-converted Sigma rule. " + f"Source: /tmp/sigma_converted_v4/{fname}. " + f"Faithful S1 DV schema.") + dc, dmsg = deploy_rule(site_id, name, desc, body) + verdict = (f"id={dmsg}" if dc == 200 else f"FAIL HTTP {dc} " + f"{dmsg[:160]}") + print(f" [{i:>2}] DEPLOY HTTP {dc} {verdict}") + if dc == 200: + deployed.append((i, tactic, tech, fname, dmsg)) + print() + + # --- 4. PUT verification --------------------------------------------- + if deployed: + print("--- Step 4: PUT-existence verification --------------------") + exists = 0; gone = 0 + for (i, tactic, tech, fname, rid) in deployed: + body = (ART / fname).read_text() + name = f"[Sigma->PQ USEA1 verify] {tactic} / {tech}"[:128] + pc, pmsg = put_rule(site_id, rid, name, body) + verdict = ("EXISTS" if pc in (200, 204) + else "NOT FOUND" if pc == 404 + else f"HTTP {pc} {pmsg[:80]}") + print(f" [{i:>2}] id={rid} PUT HTTP {pc} {verdict}") + if pc in (200, 204): + exists += 1 + elif pc == 404: + gone += 1 + + # --- summary ---------------------------------------------------------- + print(f"\n{'='*78}\n SUMMARY\n{'='*78}") + valid = sum(1 for (_, _, _, _, c, _) in test_results if c == 200) + print(f" Smoke-test passed : {valid}/10") + print(f" Rules deployed : {len(deployed)}/10") + if deployed: + ids_file = HERE / "deployed_rule_ids.json" + ids_file.write_text(json.dumps( + {"tenant": S1_CONS, "site_id": site_id, + "rules": [{"rule_id": rid, "pq_file": fname, + "tactic": tactic, "tech": tech} + for (_, tactic, tech, fname, rid) in deployed]}, + indent=2)) + print(f" Deployed IDs : {ids_file}") + print(f" PUT-verified exists : (see Step 4 above)") + print(f"\n Console: {S1_CONS}/#/cloud-detection/rules\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tenant_config.example.json b/tenant_config.example.json new file mode 100644 index 0000000..c5380fb --- /dev/null +++ b/tenant_config.example.json @@ -0,0 +1,8 @@ +{ + "_comment_": "Copy to tenant_config.json and fill in. tenant_config.json is gitignored. See README_sigma_pipeline.md for setup. All five keys are required for end-to-end Sigma->PQ deploys.", + "S1_CONSOLE_URL": "https://-.example", + "S1_CONSOLE_API_TOKEN": " Users -> Service Users>", + "SDL_XDR_URL": "https://xdr..example", + "SDL_LOG_READ_KEY": " Integrations -> Data Lake API Keys>", + "SDL_CONFIG_READ_KEY": "" +} diff --git a/verify_deployed_sigma_rules.py b/verify_deployed_sigma_rules.py new file mode 100644 index 0000000..f79ebcf --- /dev/null +++ b/verify_deployed_sigma_rules.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +verify_deployed_sigma_rules.py (formerly _v3) + +Diagnostic for the RBAC visibility quirk: when a service-user role has +`cloudDetectionRulesCreateEdit` but not `cloudDetectionRulesView`, POST +succeeds and returns rule IDs, but GET /rules silently hides those rules. + +This script probes several scope-filter variants to characterise what +the token CAN see: + - direct GET /rules/{id} + - list with ?ids= + - list with siteIds=, accountIds=, tenant=true, no scope + - list with queryType= filter + +Reads tenant credentials from tenant_config.json and the rule IDs from +deployed_rule_ids.json (both next to this script). Set SIEM_TOOLKIT_CONFIG +or DEPLOYED_IDS_FILE env vars to override. +""" +from __future__ import annotations +import json +import os +import pathlib +import urllib.error +import urllib.parse +import urllib.request + +HERE = pathlib.Path(__file__).resolve().parent +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +BASE = CFG["S1_CONSOLE_URL"].rstrip("/") +TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".") + +_IDS_PATH = pathlib.Path(os.environ.get( + "DEPLOYED_IDS_FILE", str(HERE / "deployed_rule_ids.json"))) +if not _IDS_PATH.exists(): + raise SystemExit(f"{_IDS_PATH} not found. " + f"Run convert_test_deploy_sigma.py --deploy first.") +_STATE = json.loads(_IDS_PATH.read_text()) +SITE = _STATE.get("site_id") or os.environ.get("SITE_ID") or "" +DEPLOYED_IDS = [r["rule_id"] for r in (_STATE.get("rules") or [])] + + +def get_json(path: str): + req = urllib.request.Request(f"{BASE}{path}") + req.add_header("Authorization", f"ApiToken {TOK}") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return r.status, json.loads(r.read()) + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {"_raw": "(non-json)"} + return e.code, body + + +def main() -> int: + print(f"\n{'='*78}\n Verify deployed rules via `ids=` filter\n" + f"{'='*78}\n Tenant : {BASE}\n Site : {SITE or '(unset)'}\n" + f" IDs : {len(DEPLOYED_IDS)} rules from {_IDS_PATH.name}\n") + + # --- 1. token / user identity ----------------------------------- + print("--- Step 1: token identity -------------------------------------") + code, d = get_json("/web/api/v2.1/users/api-token-details") + if code == 200: + data = d.get("data") or {} + print(f" user : {data.get('email') or data.get('fullName')}") + print(f" scope : {data.get('scope')}") + print(f" scope id : {data.get('scopeId')}") + print(f" expires : {data.get('expiresAt') or 'never'}") + else: + # Service-user JWT often can't introspect itself + code2, d2 = get_json("/web/api/v2.1/user") + if code2 == 200: + data = d2.get("data") or {} + print(f" user : {data.get('email')}") + print(f" scope : {data.get('scope')}") + else: + print(f" HTTP {code} / {code2} cannot introspect token " + "(common for service-user JWTs)") + + if not DEPLOYED_IDS: + print(" No deployed rule IDs to verify.") + return 0 + + # --- 2. list with ids= filter, NO scope filter ------------------ + print("\n--- Step 2: list with `ids=` (no scope filter) -----------") + ids = ",".join(DEPLOYED_IDS) + code, d = get_json(f"/web/api/v2.1/cloud-detection/rules?ids={ids}") + if code != 200: + print(f" HTTP {code} {json.dumps(d)[:300]}") + else: + rules = d.get("data") or [] + print(f" Returned : {len(rules)} of {len(DEPLOYED_IDS)} requested") + for r in rules: + scope = (((r.get("scope") or {}) + or {}).get("scopeName") or + r.get("siteName") or r.get("accountName") or "?") + print(f" id={r.get('id')} status={r.get('status'):<10} " + f"scope={scope} name={(r.get('name') or '')[:65]}") + + # --- 3. list ids= AND siteIds= ---------------------------------- + print("\n--- Step 3: list with `ids=` AND `siteIds=` -------------------") + code, d = get_json( + f"/web/api/v2.1/cloud-detection/rules?ids={ids}&siteIds={SITE}") + if code != 200: + print(f" HTTP {code} {json.dumps(d)[:300]}") + else: + print(f" Returned : {len(d.get('data') or [])} of " + f"{len(DEPLOYED_IDS)}") + + # --- 4. list all visible scheduled rules without scope ---------- + print("\n--- Step 4: list with queryType= filter ---------------------") + code, d = get_json( + "/web/api/v2.1/cloud-detection/rules" + "?queryType=scheduled&limit=200") + if code != 200: + print(f" HTTP {code} {json.dumps(d)[:300]}") + else: + rules = d.get("data") or [] + sigma = [r for r in rules + if "[Sigma->PQ]" in (r.get("name") or "")] + print(f" visible scheduled rules : {len(rules)}") + print(f" of which [Sigma->PQ] : {len(sigma)}") + for r in sigma: + print(f" id={r.get('id')} status={r.get('status'):<10} " + f"{(r.get('name') or '')[:70]}") + + print(f"\n Console:\n {BASE}/#/cloud-detection/rules\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/verify_rule_exists_via_put.py b/verify_rule_exists_via_put.py new file mode 100644 index 0000000..bc6c461 --- /dev/null +++ b/verify_rule_exists_via_put.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +""" +verify_rule_exists_via_put.py + +Service-user tokens often have `cloudDetectionRulesCreateEdit` but lack +`cloudDetectionRulesView`. Result: POST/PUT/DELETE on a rule succeed, +but GET /rules and GET /rules/{id} silently filter the rule out. PUT +is the definitive existence test -- it returns 200/204 when the rule +exists and 404 when it does not. + +Reads the (rule_id, pq_file) map produced by convert_test_deploy_sigma.py +in deployed_rule_ids.json next to this script. + +Outputs: + EXISTS / NOT_FOUND verdict per rule, plus a summary. +""" +from __future__ import annotations +import json +import os +import pathlib +import urllib.error +import urllib.request + +HERE = pathlib.Path(__file__).resolve().parent + +_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG", + str(HERE / "tenant_config.json")) +CFG = json.load(open(_CFG_PATH)) +BASE = CFG["S1_CONSOLE_URL"].rstrip("/") +TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".") + +IDS_FILE = pathlib.Path(os.environ.get( + "DEPLOYED_IDS_FILE", str(HERE / "deployed_rule_ids.json"))) +ART_DIR = pathlib.Path(os.environ.get( + "SIGMA_OUT_DIR", "/tmp/sigma_converted_v4")) + + +def put_rule(site_id: str, rule_id: str, name: str, + body: str) -> tuple[int, str]: + payload = { + "data": {"name": name, + "description": f"verify-by-PUT for {name}", + "severity": "Medium", + "expirationMode": "Permanent", + "queryType": "scheduled", + "queryLang": "2.0", + "status": "Draft", + "treatAsThreat": "UNDEFINED", + "networkQuarantine": False, + "coolOffSettings": {"renotifyMinutes": 60}, + "scheduledParams": {"query": body, + "lookbackWindowMinutes": 30, + "runIntervalMinutes": 5, + "threshold": {"value": 0, + "operator": "Greater"}}}, + "filter": {"siteIds": [site_id]}} + req = urllib.request.Request( + f"{BASE}/web/api/v2.1/cloud-detection/rules/{rule_id}", + data=json.dumps(payload).encode(), method="PUT") + req.add_header("Authorization", f"ApiToken {TOK}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return r.status, r.read().decode()[:240] + except urllib.error.HTTPError as e: + return e.code, e.read().decode()[:240] + + +def main() -> int: + print(f"\n{'='*78}\n Verify rules via PUT-existence test\n{'='*78}") + print(f" Tenant : {BASE}") + print(f" IDs file : {IDS_FILE}") + print(f" Artefacts: {ART_DIR}\n") + + if not IDS_FILE.exists(): + print(f" FATAL: {IDS_FILE} not found.\n" + f" Run convert_test_deploy_sigma.py --deploy first.") + return 1 + + state = json.loads(IDS_FILE.read_text()) + rules = state.get("rules") or [] + site = state.get("site_id") or os.environ.get("SITE_ID", "") + if not site: + print(" FATAL: site_id missing in deployed_rule_ids.json") + return 1 + print(f" Site : {site}") + print(f" Rules : {len(rules)} deployed entries\n") + + print(f" {'#':>3} {'rule':<32}{'id':<22}{'http':>5} result") + print(" " + "-" * 100) + exists = gone = other = 0 + for i, r in enumerate(rules, 1): + rid = r["rule_id"] + label = f"{r['tactic']} {r['tech']}" + pq_path = ART_DIR / r["pq_file"] + if not pq_path.exists(): + print(f" {i:>3} {label[:32]:<32}{rid:<22} -- " + f"pq file missing: {pq_path.name}") + continue + code, msg = put_rule(site, rid, f"[Sigma->PQ verify] {label}", + pq_path.read_text()) + if code in (200, 204): + verdict = "EXISTS"; exists += 1 + elif code == 404: + verdict = "NOT FOUND"; gone += 1 + else: + verdict = f"HTTP {code} {msg[:80]}"; other += 1 + print(f" {i:>3} {label[:32]:<32}{rid:<22}{code:>5} {verdict}") + + print(f"\n Summary:") + print(f" EXISTS (PUT 200/204) : {exists}/{len(rules)}") + print(f" 404 NOT FOUND : {gone}/{len(rules)}") + print(f" Other (auth/RBAC) : {other}/{len(rules)}") + if exists > 0: + print(f"\n Rules ARE deployed. If GET /rules can't see them,") + print(f" the service-user role lacks `cloudDetectionRulesView`.") + print(f" Open the console UI (wider RBAC):") + print(f" {BASE}/#/cloud-detection/rules\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())