4 Commits

Author SHA1 Message Date
marc 99d63837b5 Add tools/sync-upstream.sh: safe upstream-sync workflow
Wraps the recurring 'fetch upstream, rebase, verify invariants, push'
workflow into a single command with safety nets:

- creates a tag snapshot before mutating the branch
- aborts on dirty tree
- rebase by default (--merge for merge-commit instead)
- after sync, rebuilds the backend container and verifies 5 fork-only
  invariants are still met (parser dropdown filtered, mitre_pct <= 100,
  cache endpoints present, /sample-unlabelled present, prewarmer task
  scheduled when opted in)
- exits non-zero with the recovery command if invariants regress
- optional --dry-run / --no-rebuild / --no-push for ad-hoc inspection
2026-05-22 20:50:28 +02:00
marc fec356829c Ingest Dashboard: optional background cache pre-warmer
Adds an asyncio background task that re-runs the heavy Ingest Dashboard
queries every ~4 min (just under the 5 min TTL) so the in-process cache
is always populated. First user hit on any dashboard widget then returns
from cache (single-digit ms) instead of waiting 30-60s for SDL.

Components:
  - backend/services/prewarmer.py: standalone module, opt-in via
    INGEST_PREWARM=1; configurable windows via INGEST_PREWARM_HOURS /
    INGEST_PREWARM_DAYS / INGEST_PREWARM_DAILY_VOLUME_DAYS and interval
    via INGEST_PREWARM_INTERVAL_SECONDS. Logs through the uvicorn logger
    so cycles are visible in 'docker logs'.
  - backend/main.py: spawn the task on FastAPI startup.
  - docker-compose.yml: forward INGEST_PREWARM* env vars to the
    backend service (default off).

Measured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on Purple fMeasured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on  (INGEST_PREWARM=0) so non-opt-in
users see no behaviour change.
2026-05-22 20:41:36 +02:00
marc 0a01a56218 Ingest Dashboard: 5min TTL cache + days->hours normalisation
Dashboard reloads on multi-day windows could take 30-60s and sometimes
returned HTTP 502 ('internal Scalyr error') when the SDL window was
expressed in days. Two-part fix:

1. In-process async TTL cache (services/async_cache.py)
   - 5 min TTL on top-sources, by-event-type, daily-volume.
   - Single-flight lock per cache key (no thundering herd).
   - Optional ?nocache=1 query param to force a refresh.
   - New endpoints: GET /api/ingest/cache-stats, DELETE /api/ingest/cache.

2. Normalise days -> hours upstream of the PowerQuery
   - SDL is unstable on day-scale windows for large group-by counts on
     this tenant but stable on the equivalent hour-scale window.
   - top-sources?days=1 used to 502; now works.

Measured on Purple AI tenant:
  top-sources?days=7  cold 55.7s -> warm 13ms (~4300x)
  t  t  t  t  t  t  t  t  t    -> 4ms (cold) / 1.4ms (warm)
2026-05-22 20:10:03 +02:00
marc f82115143c Health Score: cap MITRE Coverage at 100% by canonicalising tactics
STAR rules sometimes label tactics with non-canonical names (e.g. 'Stealth',
'Defense Impairment') which were counted as distinct tactics on top of the
14 canonical ATT&CK Enterprise ones, producing percentages > 100%
(observed 15/14 = 107.1% on Purple AI tenant).

Fix in get_health_score():
  - Restrict covered_tactics to the 14 canonical ATT&CK Enterprise tactics.
  - Map known STAR aliases ('Stealth', 'Defense Impairment') -> 'Defense Evasion'.
  - Derive TOTAL_TACTICS from the canonical set (single source of truth).

Result: tactics_covered = 14, mitre_pct = 100.0 (was 15 / 107.1).
2026-05-22 19:41:48 +02:00
10 changed files with 0 additions and 1882 deletions
-6
View File
@@ -11,9 +11,3 @@ data/
# Parsers ARE committed in this fork (snapshot of the demo tenant).
# .env still excluded for safety.
tools/stormshield-verify/config.json
# Sigma->PowerQuery pipeline: real tenant credentials live here.
# Use tenant_config.example.json as the template.
tenant_config.json
deployed_rule_ids.json
-241
View File
@@ -1,241 +0,0 @@
# Sigma → SentinelOne PowerQuery pipeline
End-to-end workflow that turns SigmaHQ rules into SentinelOne SDL
Scheduled custom-detection rules, **starting from the coverage gaps the
SIEM-toolkit identifies**.
## TL;DR
1. **SIEM-toolkit** provides the coverage map to find what's thin —
MITRE ATT&CK heatmap across all detection library rules, rule firing
status (active vs never-fired).
2. **Pick Sigma rules** ([SigmaHQ/sigma](https://github.com/SigmaHQ/sigma))
that target those tactics.
3. **Convert** the Sigma rules to PowerQuery with
[`pysigma-backend-sentinelone-pq`](https://pypi.org/project/pysigma-backend-sentinelone-pq/).
4. **Smoke-test** against your tenant's `/api/powerQuery`, **deploy**
via `/web/api/v2.1/cloud-detection/rules` as Scheduled PQ rules in
Draft.
5. **Re-running on a different tenant** is just re-pointing the
credentials — the converted `.pq` bodies travel as-is.
## Setup (once)
```bash
# 1. Tooling
python3 -m venv /tmp/sigma_venv
/tmp/sigma_venv/bin/pip install pysigma pysigma-backend-sentinelone-pq
brew install gh && gh auth login # avoids GitHub rate limits
# 2. Credentials
cp tenant_config.example.json tenant_config.json
$EDITOR tenant_config.json # fill in 5 keys
# tenant_config.json is gitignored.
```
`tenant_config.json` shape:
```json
{
"S1_CONSOLE_URL": "https://<region>-<tenant>.example",
"S1_CONSOLE_API_TOKEN": "<S1 Mgmt API token>",
"SDL_XDR_URL": "https://xdr.<region>.example",
"SDL_LOG_READ_KEY": "<SDL Log Read scope>",
"SDL_CONFIG_READ_KEY": "<SDL Configuration Read scope>"
}
```
Optional environment overrides:
| Variable | Default | Purpose |
|---|---|---|
| `SIEM_TOOLKIT_CONFIG` | `./tenant_config.json` | path to credentials |
| `SIGMA_OUT_DIR` | `/tmp/sigma_converted_v4` | where `.pq` artefacts land |
| `SIGMA_VENV_PY` | `/tmp/sigma_venv/bin/python3` | Python that hosts pysigma |
| `GH_BIN` | `gh` | GitHub CLI binary |
| `SITE_ID` | (auto-discovered) | force-deploy into a specific site |
| `DEPLOYED_IDS_FILE` | `./deployed_rule_ids.json` | input for verify scripts |
## The 5-step workflow
### Step 1 — Find thin tactics
```bash
python3 recommend_sigma_imports.py
```
Reads the SIEM-toolkit coverage endpoints (`/api/coverage/health`,
`/api/coverage/mitre`, `/api/coverage/map`) and prints, in order:
- Tenant **health row** (`health_score`, `firing_pct`, active sources).
- **Active log sources** ranked by event volume — only import Sigma
rules whose `logsource` matches a source that actually produces
events here.
- **MITRE tactic depth** — tactics with `rule_count < 100` and a high
`technique_count` are the THIN ones. Typical findings:
Reconnaissance, Discovery, Lateral Movement, Collection, Exfiltration.
- **Recommended SigmaHQ folders** with GitHub-verified rule counts.
- A curated **14-rule shortlist** for the thinnest gaps.
### Step 2 — Pick Sigma rules
The picker in `convert_test_deploy_sigma.py` matches filename-stem
keywords against the SigmaHQ tree it lists via `gh api`. Edit the
`WANTED` table to change the 10 rules. Each row is
`(tactic, technique_label, [keywords], allow_powershell_folder)`.
The default list covers:
| Tactic | Technique | Sigma file |
|---|---|---|
| Lateral Movement | T1021.006 WinRM (evil-winrm) | `proc_creation_win_hktl_evil_winrm.yml` |
| Collection | T1113 Screen Capture (Psr.exe) | `proc_creation_win_psr_capture_screenshots.yml` |
| Collection | T1115 Clipboard (Get-Clipboard) | `proc_creation_win_powershell_get_clipboard.yml` |
| Exfiltration | T1560.001 RAR (.dmp files) | `proc_creation_win_winrar_exfil_dmp_files.yml` |
| Exfiltration | T1567.002 rclone | `proc_creation_win_pua_rclone_execution.yml` |
| Reconnaissance | T1016 netsh portproxy | `proc_creation_win_netsh_port_forwarding.yml` |
| Discovery | T1087/T1033 whoami /priv | `proc_creation_win_whoami_priv_discovery.yml` |
| Discovery | T1087/T1482 SharpHound | `proc_creation_win_hktl_bloodhound_sharphound.yml` |
| Credential Access | T1003.001 Mimikatz cmd-line | `proc_creation_win_hktl_mimikatz_command_line.yml` |
| Credential Access | T1003.001 ProcDump LSASS | `proc_creation_win_sysinternals_procdump_lsass.yml` |
### Step 3 — Convert + smoke-test + deploy
Optional preliminary: probe what fields the tenant's WEL parser
actually emits so the WEL-mapped variant queries land on real columns:
```bash
python3 probe_wel_schema.py
```
Then run the master pipeline:
```bash
# Convert + smoke-test only:
python3 convert_test_deploy_sigma.py
# Convert + smoke-test + create SDL Scheduled rules in Draft:
python3 convert_test_deploy_sigma.py --deploy
```
For each of the 10 rules the script writes **three** PowerQuery variants:
| File | Purpose |
|---|---|
| `<stem>.pq` | **faithful** — S1 DV schema (production form) |
| `<stem>.relaxed.pq` | strips `endpoint.os` and `event.type` clauses (useful on tenants where those fields are null) |
| `<stem>.wel.pq` | rewritten onto the `microsoft_windows_eventlog-latest` parser fields (`CommandLine`, `Image`, `ParentImage`, `EventID=4688\|1`, `dataSource.name='Windows Event Logs'`) |
Each variant is smoke-tested against `POST {SDL_XDR_URL}/api/powerQuery`
(last 24 h). HTTP 200 is what we want; rows=0 simply means no telemetry
matched in the window.
With `--deploy`, the **faithful** variant is also POSTed to
`/web/api/v2.1/cloud-detection/rules` as a `Scheduled` rule in `Draft`
status, then `deployed_rule_ids.json` is written next to the script
mapping each rule ID back to its source.
#### Edge cases the converter handles
- **Unsupported Sigma fields** (e.g. `OriginalFileName`) cause the
backend to print its known-field list as the error.
`fixup_rules_6_7.py` strips those keys from the YAML and re-converts.
The rule remains semantic because `Image|endswith:` is the primary
selector.
- **Wrong folder** — some rules live under `rules/windows/powershell/`
not `process_creation/`. The picker can expand its scope.
- **`event.type='Process Creation'` and `endpoint.os='windows'`** are
often empty on real tenants — that's why the **relaxed** and **WEL**
variants exist.
### Step 4 — Verify
The service-user role that can POST a rule often **cannot** GET it
back (`cloudDetectionRulesView` missing). The collection endpoint
silently filters the rule out, and `GET /rules/{id}` returns HTTP 405
on this API version. PUT is the definitive existence test:
```bash
python3 verify_rule_exists_via_put.py
```
Reads `deployed_rule_ids.json` and PUTs each rule ID. 200/204 = EXISTS,
404 = NOT FOUND. Optional deeper diagnostic:
```bash
python3 verify_deployed_sigma_rules.py
```
Probes the list endpoint with several scope-filter variants so you can
see exactly which RBAC layer is hiding what.
### Step 5 — Run on another tenant
The 30 `.pq` files in `SIGMA_OUT_DIR` are tenant-agnostic. Point the
credentials at a different tenant and re-run only Step 3's deploy +
Step 4:
```bash
# Option A: replace tenant_config.json
cp tenant_config.example.json tenant_config.json && $EDITOR tenant_config.json
python3 run_sigma_on_tenant.py
# Option B: keep separate config files
SIEM_TOOLKIT_CONFIG=./tenant_prod.json python3 run_sigma_on_tenant.py
SIEM_TOOLKIT_CONFIG=./tenant_lab.json python3 run_sigma_on_tenant.py
```
`run_sigma_on_tenant.py` is a single-shot probe → smoke-test → deploy
→ PUT-verify, useful when you already have the converted bodies and
just want to land them on a new tenant.
## Files
| File | Role |
|---|---|
| `recommend_sigma_imports.py` | Reads coverage endpoints, recommends folders + curated rule list |
| `probe_wel_schema.py` | Discovers WEL parser field schema on the tenant |
| `convert_test_deploy_sigma.py` | Master pipeline: pick + convert (3 variants) + smoke + `--deploy` |
| `fixup_rules_6_7.py` | Handles Sigma rules with backend-unsupported keys (e.g. `OriginalFileName`) |
| `run_sigma_on_tenant.py` | Re-deploys already-converted bodies to another tenant |
| `verify_rule_exists_via_put.py` | PUT-existence test (definitive when GET is RBAC-blocked) |
| `verify_deployed_sigma_rules.py` | Probes scope/filter variants to diagnose RBAC |
| `tenant_config.example.json` | Template — copy to `tenant_config.json` (gitignored) |
## Where it fits in the SIEM-toolkit story
```
SIEM-toolkit Threat Coverage map
recommend_sigma_imports.py ──┐
│ (suggests SigmaHQ folders) │
▼ │
convert_test_deploy_sigma.py ├── single workflow
│ (Sigma → PQ → SDL) │
▼ │
verify_rule_exists_via_put.py ──┘
Activate rules in console UI
Re-run SIEM-toolkit Threat Coverage → firing_pct grows
```
## Pitfalls collected so far
- **`event.type='Process Creation'`** has near-zero population unless a
live S1 EDR agent is reporting; relax variant works around it.
- **`endpoint.os='windows'`** is `null` on many tenants; always strip
for the relaxed variant.
- **GitHub anonymous rate limit** (60 req/h) kills the listing step —
use `gh auth login`.
- **Service-user RBAC** without `cloudDetectionRulesView` makes POSTed
rules invisible to GET. PUT confirms they exist.
- **`OriginalFileName`** in Sigma YAML breaks the S1-PQ backend; strip
with the pre-processor.
- **PowerQuery parser quirks** — bare `*` as a query is rejected;
comments with `/`, `-`, or non-ASCII characters cause Load Failed at
rule-validation time even when the body POSTs fine to
`/api/powerQuery`. Keep comments out of any body that will be
deployed as a Scheduled rule.
-406
View File
@@ -1,406 +0,0 @@
#!/usr/bin/env python3
"""
convert_test_deploy_sigma.py -- Sigma -> PowerQuery -> SDL Scheduled Rule.
Master pipeline that addresses every TODO from the v3 review:
(a) Fixes rule #6 (netsh) by trying multiple candidate filenames AND by
catching the pipeline error so the loop continues. Fixes rule #7
(AdsiSearcher) by also searching rules/windows/powershell/.
(b) Adds a WEL-mapping post-processor that rewrites the S1 EDR/DV PQ
fields to the microsoft_windows_eventlog-latest parser schema so
the queries can fire against Windows Event Log telemetry.
(c) Deploys every PQ that passes the live /api/powerQuery smoke test
as an SDL Scheduled rule via the S1 Mgmt API (POST
/web/api/v2.1/cloud-detection/rules). Requires --deploy + a valid
S1_CONSOLE_API_TOKEN in config.json.
For each rule we emit THREE PowerQuery variants and smoke-test each:
<stem>.pq -- faithful Sigma -> S1-PQ conversion (DV schema)
<stem>.relaxed.pq -- faithful minus the endpoint.os and event.type
clauses (DV schema but null-os-tolerant)
<stem>.wel.pq -- field-mapped onto microsoft_windows_eventlog-
latest (CommandLine, Image, ParentImage, ...)
Usage:
python3 convert_test_deploy_sigma.py # convert + test only
python3 convert_test_deploy_sigma.py --deploy # also create SDL rules
"""
from __future__ import annotations
import argparse
import json
import os
import pathlib
import re
import subprocess
import time
import urllib.error
import urllib.request
from typing import Any
HERE = pathlib.Path(__file__).resolve().parent
VENV_PY = os.environ.get("SIGMA_VENV_PY", "/tmp/sigma_venv/bin/python3")
GH = os.environ.get("GH_BIN", "gh")
OUT = pathlib.Path(os.environ.get(
"SIGMA_OUT_DIR", "/tmp/sigma_converted_v4")); OUT.mkdir(exist_ok=True)
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/")
SDL_KEY = CFG["SDL_LOG_READ_KEY"]
S1_CONS = CFG.get("S1_CONSOLE_URL", "").rstrip("/")
S1_TOK = CFG.get("S1_CONSOLE_API_TOKEN", "").rstrip(".")
# Site id is discovered at runtime from /sites?limit=10 (first active site).
# Override with SITE_ID env var if you have multiple sites and want a
# specific one.
SITE_ID = os.environ.get("SITE_ID", "")
SIGMA_RAW = "https://raw.githubusercontent.com/SigmaHQ/sigma/master"
# 10 desired (tactic, technique, keyword_list, allow_powershell_folder)
WANTED: list[tuple[str, str, list[str], bool]] = [
("Lateral Movement", "T1021.006 WinRM",
["winrm", "winrs"], False),
("Collection", "T1113 Screen Capture",
["screen_capture", "screencapture", "screenshot"], False),
("Collection", "T1115 Clipboard Data",
["clipboard"], False),
("Exfiltration", "T1560.001 Archive via RAR",
["winrar_compress", "winrar", "rar_compress"], False),
("Exfiltration", "T1567.002 Exfil via rclone",
["rclone"], False),
("Reconnaissance", "T1016 netsh port-fwd",
["netsh_allowed_ports", "netsh_port_proxy", "netsh_port_fwd",
"netsh_fw", "netsh_portproxy"], False),
("Discovery", "T1087.002 AdsiSearcher",
["adsisearcher", "adsi_searcher"], True), # in powershell/
("Discovery", "T1087/T1482 SharpHound",
["sharphound", "bloodhound"], False),
("Credential Access", "T1003.001 Mimikatz cmdline",
["mimikatz_command_line", "mimikatz_cli", "mimikatz"], False),
("Credential Access", "T1003.001 ProcDump LSASS",
["procdump_lsass", "procdump", "comsvcs_lsass"], False),
]
# ============================================================ helpers ======
def gh_api(path: str) -> Any:
r = subprocess.run([GH, "api", path], capture_output=True, text=True,
timeout=60)
if r.returncode != 0:
raise RuntimeError(f"gh api {path}: {r.stderr.strip()[:300]}")
return json.loads(r.stdout)
def fetch(url: str) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"})
with urllib.request.urlopen(req, timeout=30) as r:
return r.read()
def list_sigma_rules(allow_powershell: bool) -> list[str]:
tree = gh_api("repos/SigmaHQ/sigma/git/trees/master?recursive=1")
prefixes = ["rules/windows/process_creation/"]
if allow_powershell:
prefixes.append("rules/windows/powershell/")
return sorted(
e["path"] for e in tree.get("tree", [])
if e.get("type") == "blob"
and e.get("path", "").endswith(".yml")
and any(e["path"].startswith(p) for p in prefixes)
)
def pick(paths: list[str], keywords: list[str]) -> str | None:
for kw in keywords:
for p in paths:
if kw in pathlib.Path(p).stem.lower():
return p
return None
def convert(yaml_text: str) -> str:
code = (
"import sys\n"
"from sigma.rule import SigmaRule\n"
"from sigma.backends.sentinelone_pq import SentinelOnePQBackend\n"
"r = SigmaRule.from_yaml(sys.stdin.read())\n"
"print(SentinelOnePQBackend().convert_rule(r)[0])\n")
res = subprocess.run([VENV_PY, "-c", code], input=yaml_text, text=True,
capture_output=True, timeout=90)
if res.returncode != 0:
# last line of the trace is usually the most informative
err = res.stderr.strip().splitlines()
msg = err[-1] if err else "(no stderr)"
raise RuntimeError(msg[:300])
return res.stdout.strip()
def relax(pq_body: str) -> str:
"""Strip endpoint.os and event.type filter clauses."""
body = pq_body
body = re.sub(r'endpoint\.os\s*=\s*"[^"]*"\s+and\s+', '', body)
body = re.sub(r'\s+and\s+endpoint\.os\s*=\s*"[^"]*"', '', body)
body = re.sub(r'event\.type\s*=\s*"[^"]*"\s+and\s+', '', body)
body = re.sub(r'\s+and\s+event\.type\s*=\s*"[^"]*"', '', body)
body = re.sub(r'^\(\s*(.*)\s*\)$', r'\1', body.strip())
return body.strip()
# DV schema -> WEL parser schema (microsoft_windows_eventlog-latest).
# Sysmon (EID=1) and Security (EID=4688) channels use slightly different
# field names; the WEL parser exposes Sysmon-style Image/ParentImage AND
# Security-style NewProcessName/ParentProcessName. We rewrite onto the
# more permissive Sysmon names because they're closer to S1 DV.
DV_TO_WEL = [
(r'\btgt\.process\.cmdline\b', 'CommandLine'),
(r'\btgt\.process\.image\.path\b', 'Image'),
(r'\btgt\.process\.displayName\b', 'OriginalFileName'),
(r'\btgt\.process\.publisher\b', 'Company'),
(r'\bsrc\.process\.image\.path\b', 'ParentImage'),
(r'\bsrc\.process\.cmdline\b', 'ParentCommandLine'),
(r'\bsrc\.process\.user\.name\b', 'User'),
]
def wel_map(pq_body: str) -> str:
"""Rewrite a faithful DV-schema PQ body to query the
microsoft_windows_eventlog-latest parser instead. Strategy:
- replace tgt.process.* / src.process.* with WEL field names
- replace `event.type="Process Creation"` with EID filter
- replace `endpoint.os="windows"` with dataSource.name='Windows Event Logs'
- prepend a parser-name pin so the filter narrows fast
"""
body = pq_body
for pat, repl in DV_TO_WEL:
body = re.sub(pat, repl, body)
body = re.sub(r'event\.type\s*=\s*"Process Creation"',
"(EventID=4688 or EventID=1)", body)
body = re.sub(r'endpoint\.os\s*=\s*"windows"',
"dataSource.name='Windows Event Logs'", body)
# Drop any leftover DV-only field comparisons that didn't map (would
# otherwise null-filter every row). Only one we've seen: integrityLevel.
body = re.sub(r'(?:\(\s*)?[\w.]+\.integrityLevel\s*=\s*"[^"]*"'
r'\s+(?:and|or)\s+', '', body)
return body.strip()
def pq(query: str, hours: int = 24) -> tuple[int, str, int]:
end = int(time.time() * 1000); start = end - hours * 3600 * 1000
payload = {"token": SDL_KEY, "query": query,
"startTime": str(start), "endTime": str(end)}
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as r:
d = json.loads(r.read())
return 200, "ok", len(d.get("values") or [])
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:250], 0
def deploy_rule(name: str, description: str, pq_body: str) -> tuple[int, str]:
"""POST a Scheduled-PQ rule to S1 Mgmt API."""
if not (S1_CONS and S1_TOK):
return 0, "no S1_CONSOLE_URL or S1_CONSOLE_API_TOKEN in config"
payload = {
"data": {
"name": name,
"description": description,
"severity": "Medium",
"expirationMode": "Permanent",
"queryType": "scheduled",
"queryLang": "2.0",
"status": "Draft",
"treatAsThreat": "UNDEFINED",
"networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {
"query": pq_body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0, "operator": "Greater"},
},
},
"filter": {"siteIds": [SITE_ID]},
}
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules",
data=json.dumps(payload).encode(), method="POST")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
d = json.loads(r.read())
rid = (d.get("data") or {}).get("id") or "?"
return 200, f"created id={rid}"
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:300]
# ============================================================ main =========
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--deploy", action="store_true",
help="Also create each valid PQ as an SDL Scheduled rule.")
args = ap.parse_args()
print(f"\n{'='*78}\n Sigma -> PowerQuery (faithful + relaxed + WEL) "
f"-> SDL rule\n{'='*78}\n")
print(f" Backend : pysigma-backend-sentinelone-pq")
print(f" Tenant SDL : {SDL_BASE}")
print(f" Tenant Mgmt API : {S1_CONS}")
print(f" Deploy rules : {'YES' if args.deploy else 'no (use --deploy)'}")
print(f" Output : {OUT}\n")
# Site-id auto-discovery (only needed for --deploy).
global SITE_ID
if args.deploy and not SITE_ID:
try:
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/sites?limit=10")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Accept", "application/json")
with urllib.request.urlopen(req, timeout=20) as r:
sites = ((json.loads(r.read()).get("data") or {})
.get("sites") or [])
if not sites:
print(" FATAL: --deploy requested but no sites visible "
"to this token.")
return 1
SITE_ID = sites[0]["id"]
print(f" Site discovered : {SITE_ID} "
f"({sites[0].get('name')})\n")
except urllib.error.HTTPError as e:
print(f" FATAL site discovery: HTTP {e.code} "
f"{e.read().decode()[:200]}")
return 1
# Pre-fetch the two relevant trees once
print("--- listing sigmahq/sigma rule paths via gh api ---")
pc_only = list_sigma_rules(allow_powershell=False)
pc_and_pwsh = list_sigma_rules(allow_powershell=True)
print(f" process_creation/ : {len(pc_only)} rules")
print(f" process_creation/ + powershell/ : {len(pc_and_pwsh)} rules\n")
summary: list[dict[str, Any]] = []
for i, (tactic, tech, kws, allow_pwsh) in enumerate(WANTED, 1):
paths = pc_and_pwsh if allow_pwsh else pc_only
rec: dict[str, Any] = {"i": i, "tactic": tactic, "tech": tech}
print(f"[{i:02d}/10] {tactic} :: {tech}")
path = pick(paths, kws)
if not path:
print(f" PICK : no match for {kws}\n")
rec["status"] = "no_match"; summary.append(rec); continue
print(f" PICK : {path}")
rec["path"] = path
try:
raw = fetch(f"{SIGMA_RAW}/{path}").decode("utf-8")
except Exception as e:
print(f" FETCH : FAIL {e}\n")
rec["status"] = "fetch_failed"; summary.append(rec); continue
stem = pathlib.Path(path).stem
(OUT / f"{stem}.yml").write_text(raw)
try:
pq_body = convert(raw)
except Exception as e:
print(f" CONVERT : FAIL {e}\n")
rec["status"] = "convert_failed"; rec["err"] = str(e)
summary.append(rec); continue
relaxed_body = relax(pq_body)
wel_body = wel_map(pq_body)
(OUT / f"{stem}.pq").write_text(pq_body)
(OUT / f"{stem}.relaxed.pq").write_text(relaxed_body)
(OUT / f"{stem}.wel.pq").write_text(wel_body)
rec["pq_chars"] = len(pq_body)
rec["relaxed_chars"] = len(relaxed_body)
rec["wel_chars"] = len(wel_body)
print(f" CONVERT : OK faithful={len(pq_body)}c "
f"relaxed={len(relaxed_body)}c wel={len(wel_body)}c")
# smoke test all three
c1, _, r1 = pq(pq_body)
c2, _, r2 = pq(relaxed_body)
c3, e3, r3 = pq(wel_body)
rec.update({"fa_http": c1, "fa_rows": r1,
"re_http": c2, "re_rows": r2,
"wel_http": c3, "wel_rows": r3,
"wel_err": e3 if c3 != 200 else ""})
print(f" TEST FA : HTTP {c1} rows={r1}")
print(f" TEST RE : HTTP {c2} rows={r2}")
print(f" TEST WEL: HTTP {c3} rows={r3}"
f"{' err=' + e3[:120] if c3 != 200 else ''}")
valid = (c1 == 200) or (c3 == 200)
rec["status"] = ("FIRES" if (r1 > 0 or r2 > 0 or r3 > 0)
else "valid_no_data" if valid
else "PQ_ERROR")
# deploy faithful (only) if requested + valid
if args.deploy and c1 == 200:
rule_name = (f"[Sigma->PQ] {tactic} / {tech} "
f"({pathlib.Path(path).stem})")[:128]
desc = (f"Auto-converted from SigmaHQ/sigma "
f"{path} via pysigma-backend-sentinelone-pq. "
f"Faithful S1 DV schema.")
dc, dmsg = deploy_rule(rule_name, desc, pq_body)
rec["deploy_http"] = dc; rec["deploy_msg"] = dmsg
if dc == 200:
# dmsg shape is "created id=<id>"; extract just the id
rec["rule_id"] = dmsg.split("id=")[-1].strip()
rec["pq_file"] = f"{pathlib.Path(path).stem}.pq"
print(f" DEPLOY : HTTP {dc} {dmsg[:160]}")
print()
summary.append(rec)
# --- summary ---
print(f"{'='*78}\n SUMMARY (rows = events matched in last 24 h)\n"
f"{'='*78}")
hdr = (f" {'#':>3} {'tactic':<18}{'technique':<26}"
f"{'fa':>5}{'re':>5}{'wel':>5} status")
print(hdr); print(" " + "-" * (len(hdr) - 2))
for s in summary:
print(f" {s['i']:>3} {s['tactic']:<18}{s['tech']:<26}"
f"{s.get('fa_rows','-')!s:>5}{s.get('re_rows','-')!s:>5}"
f"{s.get('wel_rows','-')!s:>5} {s.get('status','-')}")
fires = sum(1 for s in summary
if any(s.get(k, 0) and s[k] > 0
for k in ('fa_rows', 're_rows', 'wel_rows')))
valid = sum(1 for s in summary
if s.get('status') in ('valid_no_data', 'FIRES'))
failed = sum(1 for s in summary
if s.get('status') in ('no_match', 'fetch_failed',
'convert_failed', 'PQ_ERROR'))
print(f"\n Rules with any matches : {fires}/10")
print(f" Syntactically valid : {valid}/10")
print(f" Failed / not matched : {failed}/10")
if args.deploy:
deployed = [s for s in summary if s.get('deploy_http') == 200]
print(f" SDL rules created : {len(deployed)}/10")
# Persist the (rule_id, pq_file) map for verify scripts.
ids_file = HERE / "deployed_rule_ids.json"
ids_file.write_text(json.dumps(
{"tenant": S1_CONS,
"site_id": SITE_ID,
"rules": [{"rule_id": s["rule_id"],
"pq_file": s["pq_file"],
"tactic": s["tactic"],
"tech": s["tech"]}
for s in deployed]}, indent=2))
print(f" Deployed IDs : {ids_file}")
print(f" Artefacts : {OUT}/")
print(f"\n Next steps:")
print(f" - inspect {OUT}/*.wel.pq for WEL variants")
print(f" - re-run with --deploy to create SDL Scheduled rules")
print(f" - verify with verify_rule_exists_via_put.py")
print(f" - check console UI: {S1_CONS}/#/cloud-detection/rules\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
-243
View File
@@ -1,243 +0,0 @@
#!/usr/bin/env python3
"""
fixup_rules_6_7.py
Re-runs the convert -> test -> deploy pipeline for ONLY the 2 rules that
failed in convert_test_deploy_sigma.py:
#6 Reconnaissance T1016 -- netsh port forwarding (the original
`netsh_fw_add_rule.yml` uses a Sigma `|fieldref` modifier the
S1-PQ backend doesn't support; switch to
`netsh_port_forwarding.yml`).
#7 Discovery T1087.002 -- AdsiSearcher (no .yml under
rules/windows/process_creation/ or rules/windows/powershell/ is
named adsisearcher; replace with `whoami /priv` which covers
T1033 + T1087 Account Discovery and is highly diagnostic).
Runs the same 3-variant pipeline (faithful, relaxed, WEL-mapped),
smoke-tests each, and POSTs the faithful PQ as an SDL Scheduled rule.
"""
from __future__ import annotations
import json, os, pathlib, re, subprocess, sys, time
import urllib.error, urllib.request
HERE = pathlib.Path(__file__).resolve().parent
VENV_PY = os.environ.get("SIGMA_VENV_PY", "/tmp/sigma_venv/bin/python3")
OUT = pathlib.Path(os.environ.get(
"SIGMA_OUT_DIR", "/tmp/sigma_converted_v4"))
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/")
SDL_KEY = CFG["SDL_LOG_READ_KEY"]
S1_CONS = CFG["S1_CONSOLE_URL"].rstrip("/")
S1_TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".")
SITE_ID = os.environ.get("SITE_ID", "") # auto-discovered in main()
SIGMA_RAW = "https://raw.githubusercontent.com/SigmaHQ/sigma/master"
# (tactic, technique, sigmahq/sigma path)
REPLACEMENTS = [
("Reconnaissance", "T1016 netsh port forwarding",
"rules/windows/process_creation/"
"proc_creation_win_netsh_port_forwarding.yml"),
("Discovery", "T1087/T1033 whoami /priv",
"rules/windows/process_creation/"
"proc_creation_win_whoami_priv_discovery.yml"),
]
def strip_unsupported_sigma_fields(yaml_text: str) -> str:
"""Remove Sigma fields that the S1-PQ backend doesn't map.
The backend errors with a `{CommandLine}, {Company}, ...` field list
whenever it sees a key it has no mapping for. The only one we hit in
practice is `OriginalFileName`, which most LOLBins-style rules use as
an alternate way to fingerprint a process; the rule remains semantic
once removed because `Image|endswith:` is the primary selector.
Strategy: drop any selection block that ONLY contains OriginalFileName,
OR delete the lone OriginalFileName line from a mixed list.
"""
out: list[str] = []
skip_block = False
for line in yaml_text.splitlines():
s = line.strip()
# Lone OriginalFileName key in a flow style ("- OriginalFileName: 'netsh.exe'")
if s.startswith("- OriginalFileName:") or s.startswith("OriginalFileName:"):
continue
out.append(line)
return "\n".join(out)
def fetch(url: str) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"})
with urllib.request.urlopen(req, timeout=30) as r:
return r.read()
def convert(yaml_text: str) -> str:
code = (
"import sys\n"
"from sigma.rule import SigmaRule\n"
"from sigma.backends.sentinelone_pq import SentinelOnePQBackend\n"
"r = SigmaRule.from_yaml(sys.stdin.read())\n"
"print(SentinelOnePQBackend().convert_rule(r)[0])\n")
res = subprocess.run([VENV_PY, "-c", code], input=yaml_text, text=True,
capture_output=True, timeout=90)
if res.returncode != 0:
err = res.stderr.strip().splitlines()
raise RuntimeError((err[-1] if err else "(no stderr)")[:300])
return res.stdout.strip()
def relax(pq_body: str) -> str:
b = pq_body
b = re.sub(r'endpoint\.os\s*=\s*"[^"]*"\s+and\s+', '', b)
b = re.sub(r'\s+and\s+endpoint\.os\s*=\s*"[^"]*"', '', b)
b = re.sub(r'event\.type\s*=\s*"[^"]*"\s+and\s+', '', b)
b = re.sub(r'\s+and\s+event\.type\s*=\s*"[^"]*"', '', b)
return re.sub(r'^\(\s*(.*)\s*\)$', r'\1', b.strip()).strip()
DV_TO_WEL = [
(r'\btgt\.process\.cmdline\b', 'CommandLine'),
(r'\btgt\.process\.image\.path\b', 'Image'),
(r'\btgt\.process\.displayName\b', 'OriginalFileName'),
(r'\btgt\.process\.publisher\b', 'Company'),
(r'\bsrc\.process\.image\.path\b', 'ParentImage'),
(r'\bsrc\.process\.cmdline\b', 'ParentCommandLine'),
(r'\bsrc\.process\.user\.name\b', 'User'),
]
def wel_map(pq_body: str) -> str:
b = pq_body
for pat, repl in DV_TO_WEL:
b = re.sub(pat, repl, b)
b = re.sub(r'event\.type\s*=\s*"Process Creation"',
"(EventID=4688 or EventID=1)", b)
b = re.sub(r'endpoint\.os\s*=\s*"windows"',
"dataSource.name='Windows Event Logs'", b)
return b.strip()
def pq(query: str, hours: int = 24) -> tuple[int, str, int]:
end = int(time.time() * 1000); start = end - hours * 3600 * 1000
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps({"token": SDL_KEY, "query": query,
"startTime": str(start),
"endTime": str(end)}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as r:
return 200, "ok", len(
(json.loads(r.read()).get("values") or []))
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:250], 0
def deploy(name: str, desc: str, body: str) -> tuple[int, str]:
payload = {
"data": {"name": name, "description": desc, "severity": "Medium",
"expirationMode": "Permanent", "queryType": "scheduled",
"queryLang": "2.0", "status": "Draft",
"treatAsThreat": "UNDEFINED", "networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [SITE_ID]}}
if not SITE_ID:
return 0, "SITE_ID not set / discoverable"
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules",
data=json.dumps(payload).encode(), method="POST")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
d = json.loads(r.read())
return 200, f"id={(d.get('data') or {}).get('id', '?')}"
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:300]
def main() -> int:
global SITE_ID
print(f"\n{'='*78}\n Fix-up: re-convert + deploy rules #6 and #7"
f"\n{'='*78}\n")
if not SITE_ID:
try:
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/sites?limit=10")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Accept", "application/json")
sites = ((json.loads(urllib.request.urlopen(req, timeout=20).read()
).get("data") or {}).get("sites") or [])
if sites:
SITE_ID = sites[0]["id"]
print(f" Site discovered : {SITE_ID} "
f"({sites[0].get('name')})\n")
else:
print(" FATAL: no sites visible to this token.")
return 1
except urllib.error.HTTPError as e:
print(f" FATAL site discovery: HTTP {e.code} "
f"{e.read().decode()[:200]}")
return 1
for i, (tactic, tech, path) in enumerate(REPLACEMENTS, start=6):
idx = "06" if i == 6 else "07"
print(f"[{idx}/10] {tactic} :: {tech}")
print(f" SIGMA : {path}")
try:
raw = fetch(f"{SIGMA_RAW}/{path}").decode("utf-8")
except Exception as e:
print(f" FETCH : FAIL {e}\n"); continue
stem = pathlib.Path(path).stem
(OUT / f"{stem}.yml").write_text(raw)
cleaned = strip_unsupported_sigma_fields(raw)
if cleaned != raw:
(OUT / f"{stem}.cleaned.yml").write_text(cleaned)
removed = len(raw.splitlines()) - len(cleaned.splitlines())
print(f" PREP : stripped {removed} OriginalFileName "
f"line(s) the S1-PQ backend can't map")
try:
body = convert(cleaned)
except Exception as e:
print(f" CONVERT : FAIL {e}\n"); continue
re_body = relax(body)
wel_body = wel_map(body)
(OUT / f"{stem}.pq").write_text(body)
(OUT / f"{stem}.relaxed.pq").write_text(re_body)
(OUT / f"{stem}.wel.pq").write_text(wel_body)
print(f" CONVERT : OK faithful={len(body)}c "
f"relaxed={len(re_body)}c wel={len(wel_body)}c")
print(f" FA : {body[:160]}{'...' if len(body)>160 else ''}")
print(f" WEL : {wel_body[:160]}"
f"{'...' if len(wel_body)>160 else ''}")
c1, _, r1 = pq(body)
c2, _, r2 = pq(re_body)
c3, e3, r3 = pq(wel_body)
print(f" TEST FA : HTTP {c1} rows={r1}")
print(f" TEST RE : HTTP {c2} rows={r2}")
print(f" TEST WEL: HTTP {c3} rows={r3}"
f"{' err=' + e3[:100] if c3 != 200 else ''}")
if c1 == 200:
rule_name = f"[Sigma->PQ] {tactic} / {tech} ({stem})"[:128]
dc, dmsg = deploy(rule_name,
f"Auto-converted from SigmaHQ/sigma {path}",
body)
print(f" DEPLOY : HTTP {dc} {dmsg[:160]}")
print()
return 0
if __name__ == "__main__":
raise SystemExit(main())
-98
View File
@@ -1,98 +0,0 @@
#!/usr/bin/env python3
"""
probe_wel_schema.py
Probe the tenant's Singularity Data Lake to discover what fields the
`microsoft_windows_eventlog-latest` parser emits. Output guides the WEL
mapping pipeline in convert_test_deploy_sigma.py.
Runs a series of read-only PowerQuery probes for the last 24 h. No state
changes -- safe to re-run.
"""
from __future__ import annotations
import json
import os
import pathlib
import time
import urllib.request
import urllib.error
HERE = pathlib.Path(__file__).resolve().parent
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
BASE = CFG["SDL_XDR_URL"].rstrip("/")
TOK = CFG["SDL_LOG_READ_KEY"]
def pq(query: str, hours: int = 24) -> tuple[str, list, list[str]]:
end = int(time.time() * 1000); start = end - hours * 3600 * 1000
req = urllib.request.Request(
f"{BASE}/api/powerQuery",
data=json.dumps({"token": TOK, "query": query,
"startTime": str(start),
"endTime": str(end)}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
d = json.loads(urllib.request.urlopen(req, timeout=60).read())
return ("OK", d.get("values") or [],
[c.get("name") for c in (d.get("columns") or [])])
except urllib.error.HTTPError as e:
return (f"HTTP{e.code}", [e.read().decode()[:250]], [])
except Exception as e:
return (f"{type(e).__name__}", [str(e)], [])
PROBES: list[tuple[str, str]] = [
("WEL distribution by EventID",
"parser.name='microsoft_windows_eventlog-latest' "
"| group n=count() by EventID | sort -n | limit 20"),
("WEL channel / provider distribution",
"parser.name='microsoft_windows_eventlog-latest' "
"| group n=count() by Channel | sort -n | limit 15"),
("WEL ProviderName distribution",
"parser.name='microsoft_windows_eventlog-latest' "
"| group n=count() by ProviderName | sort -n | limit 15"),
("WEL EID=4688 row sample (Security: process creation)",
"parser.name='microsoft_windows_eventlog-latest' EventID=4688 "
"| columns CommandLine, NewProcessName, ParentProcessName, "
"SubjectUserName, ProcessId | limit 3"),
("WEL EID=1 row sample (Sysmon: process creation)",
"parser.name='microsoft_windows_eventlog-latest' EventID=1 "
"| columns CommandLine, Image, ParentImage, User, ProcessGuid | limit 3"),
("Probe alternate camelCase fields on the WEL parser",
"parser.name='microsoft_windows_eventlog-latest' "
"| columns commandLine, image, parentImage, eventId | limit 3"),
("Probe nested process.* fields on the WEL parser",
"parser.name='microsoft_windows_eventlog-latest' "
"| columns process.cmdLine, process.image.path, "
"process.parentImage.path, event.id | limit 3"),
("EID=4688 count alone (volume sanity)",
"parser.name='microsoft_windows_eventlog-latest' EventID=4688 "
"| group n=count() | limit 1"),
("EID=1 count alone",
"parser.name='microsoft_windows_eventlog-latest' EventID=1 "
"| group n=count() | limit 1"),
("Any cmdline-bearing record sample (raw)",
"parser.name='microsoft_windows_eventlog-latest' "
"| columns rawMessage | limit 1"),
]
def main() -> int:
print(f"\n{'='*78}\n WEL parser schema probe -- last 24 h\n "
f"endpoint: {BASE}/api/powerQuery\n{'='*78}")
for label, query in PROBES:
status, rows, cols = pq(query)
oneline = query.replace("\n", " ")
print(f"\n--- {label} ---")
print(f" query : {oneline[:160]}{'...' if len(oneline)>160 else ''}")
print(f" status: {status} cols: {cols}")
for r in rows[:10]:
r_str = str(r)
print(f" {r_str[:240]}{'...' if len(r_str)>240 else ''}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
-324
View File
@@ -1,324 +0,0 @@
#!/usr/bin/env python3
"""
recommend_sigma_imports.py
Reads the local Threat Coverage state from the SIEM-toolkit-patched backend
(http://localhost:8001) and recommends concrete Sigma rules from
https://github.com/sigmahq/sigma to import.
Strategy
--------
Sigma rules only add value when:
1. The targeted log source is ACTIVELY ingested by your tenant.
2. The MITRE technique is currently weak (low rule_count) or missing.
The script therefore:
- Lists every active source the backend has detected (with event counts).
- Lists every covered MITRE technique and per-tactic rule counts.
- Maps each active source -> the Sigma folder(s) under sigmahq/sigma that
target that telemetry.
- Queries the Sigma repo's directory listing on GitHub to confirm the
folders exist and to count available rules.
- Prints a prioritised import list, plus the exact `git sparse-checkout`
commands you can copy/paste.
Usage
-----
python3 recommend_sigma_imports.py
python3 recommend_sigma_imports.py --backend http://localhost:8001
"""
from __future__ import annotations
import argparse
import json
import sys
import urllib.request
from typing import Any
GITHUB_API = "https://api.github.com/repos/SigmaHQ/sigma/contents"
SIGMA_REPO = "https://github.com/SigmaHQ/sigma"
# Each active SDL source -> ordered list of (sigma_folder, why_this_folder).
# The folder path is RELATIVE to the sigmahq/sigma repo root.
SOURCE_TO_SIGMA: dict[str, list[tuple[str, str]]] = {
"Windows Event Logs": [
("rules/windows/builtin/security",
"Direct match: rules keyed on EventID against Security channel."),
("rules/windows/builtin/system",
"System channel: service install, driver load, time tampering."),
("rules/windows/builtin/application",
"Application channel: MSI installs, app crashes used as TTPs."),
("rules/windows/process_creation",
"Process creation (EID 4688 / Sysmon 1). Highest-value Windows folder."),
("rules/windows/powershell",
"PowerShell Operational/Script-block (EID 4103/4104)."),
("rules/windows/registry",
"Sysmon registry events for persistence and config tampering."),
("rules/windows/network_connection",
"Sysmon 3 / 5156 outbound connections from suspicious processes."),
("rules/windows/file",
"Sysmon 11/15 file create + raw-access read (LSASS dump)."),
("rules-emerging-threats/2024/Exploits",
"Recent CVE detections, many Windows-targeted."),
],
"Azure Platform": [
("rules/cloud/azure/activity_logs",
"Azure Activity Log -- subscription/resource manager events."),
("rules/cloud/azure/microsoft365",
"M365 Unified Audit Log."),
("rules/cloud/azure/signinlogs",
"Azure AD / Entra ID sign-in logs."),
("rules/cloud/azure/auditlogs",
"Entra ID directory audit (role assignments, app consent)."),
],
"Identity": [
("rules/cloud/azure/signinlogs",
"Same Entra ID sign-in folder -- maps Identity source."),
("rules/cloud/azure/auditlogs",
"Entra ID directory audit."),
("rules/category/authentication",
"Cross-vendor authentication category."),
],
"Mimecast": [
("rules/category/proxy",
"Sigma generic proxy category covers email-gateway URL events."),
("rules-emerging-threats/2024/Malware",
"Recent phishing / malware lure detections."),
],
"Stormshield": [
("rules/network/firewall",
"Vendor-neutral firewall log rules -- works on Stormshield once "
"field-mapped via your existing stormshield parser."),
("rules/network/cisco",
"Borrow Cisco ASA rules as templates -- many TTPs translate 1:1."),
],
"Prompt Security": [
# No first-party Sigma coverage yet; recommend hunting category.
("rules-threat-hunting/application",
"Generic application hunting rules -- closest fit for LLM prompt-"
"abuse signals until a vendor-specific Sigma category lands."),
],
}
# Tactics where rule_count is small enough to be a clear gap. Tuned to the
# Mitre coverage observed on this tenant (Reconnaissance=11, Lateral=83,
# Collection=77, Exfiltration=91, Discovery=86).
GAP_TACTICS = {"Reconnaissance", "Lateral Movement", "Collection",
"Exfiltration", "Discovery"}
def http_json(url: str, timeout: int = 30) -> Any:
req = urllib.request.Request(url, headers={"User-Agent": "siem-toolkit"})
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read())
def github_dir_count(path: str) -> tuple[int, str]:
"""Return (rule_count, http_status) for a sigma repo subdir."""
url = f"{GITHUB_API}/{path}"
try:
data = http_json(url)
if isinstance(data, list):
yml = sum(1 for e in data if isinstance(e, dict)
and e.get("name", "").endswith((".yml", ".yaml")))
sub = sum(1 for e in data if isinstance(e, dict)
and e.get("type") == "dir")
return yml + sub * 0, "OK" # files at top level only here
return 0, "no-list"
except urllib.error.HTTPError as e:
return 0, f"HTTP {e.code}"
except Exception as e:
return 0, f"err {type(e).__name__}"
def github_recursive_count(path: str) -> int:
"""Walk the tree under `path` and count *.yml files (1 level deep is
enough for Sigma's flat-folder convention; we descend 2 to be safe)."""
total = 0
try:
listing = http_json(f"{GITHUB_API}/{path}")
if not isinstance(listing, list):
return 0
for e in listing:
if not isinstance(e, dict):
continue
if e.get("type") == "file" and e["name"].endswith((".yml", ".yaml")):
total += 1
elif e.get("type") == "dir":
sub = http_json(f"{GITHUB_API}/{path}/{e['name']}")
if isinstance(sub, list):
total += sum(1 for s in sub if isinstance(s, dict)
and s.get("type") == "file"
and s["name"].endswith((".yml", ".yaml")))
except Exception:
return total
return total
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--backend", default="http://localhost:8001",
help="SIEM-toolkit-patched backend URL")
ap.add_argument("--no-github", action="store_true",
help="Skip GitHub API calls (offline / rate-limited).")
args = ap.parse_args()
print(f"\n{'='*78}\n SIGMA IMPORT RECOMMENDATIONS\n{'='*78}")
print(f" Backend : {args.backend}")
print(f" Sigma repo : {SIGMA_REPO}")
print(f" GitHub lookups : {'disabled' if args.no_github else 'enabled'}")
# 1) Coverage health
try:
health = http_json(f"{args.backend}/api/coverage/health")
except Exception as e:
print(f"\n[FATAL] cannot reach backend: {e}")
return 1
print(f"\n--- Current coverage health ---")
print(f" health_score : {health['health_score']}")
print(f" parser_pct : {health['parser_pct']}")
print(f" mitre_pct : {health['mitre_pct']}")
print(f" firing_pct : {health['firing_pct']} "
f"(only {health['rules_fired']} of {health['rules_loaded']} "
f"have fired -- importing rules without verifying they fire is "
f"the #1 source of dashboard noise)")
print(f" active_sources : {health['active_sources']}")
print(f" tactics_covered : {health['tactics_covered']}/15")
print(f" techniques cov. : {health['techniques_covered']}")
# 2) Active sources
cov_map = http_json(f"{args.backend}/api/coverage/map")
print(f"\n--- Active log sources (ordered by event volume) ---")
print(f" {'source':<24}{'events':>10} {'parser':<32} rule_count")
sources = sorted(cov_map["sources"], key=lambda s: -s["event_count"])
for s in sources:
print(f" {s['source_name']:<24}{s['event_count']:>10} "
f"{(s.get('parser') or '-'):<32}{s.get('rule_count', '-')}")
# 3) MITRE tactic gaps
mitre = http_json(f"{args.backend}/api/coverage/mitre")
print(f"\n--- MITRE tactic depth (rules / techniques per tactic) ---")
print(f" {'tactic':<26}{'rules':>8}{'techs':>8} gap?")
for t in mitre["tactics"]:
gap = " <-- THIN" if t["tactic"] in GAP_TACTICS else ""
print(f" {t['tactic']:<26}{t['rule_count']:>8}"
f"{t['technique_count']:>8}{gap}")
# 4) Recommended Sigma folders, prioritised by active-source volume
print(f"\n{'='*78}\n RECOMMENDED SIGMA FOLDERS TO IMPORT\n{'='*78}")
print(" Priority order = which active source has the most events.\n"
" Only folders for sources that are ACTIVELY producing telemetry\n"
" appear below -- rules for sources you don't ingest add zero\n"
" detection value and pollute the rule library.\n")
seen = set()
sparse_paths: list[str] = []
for s in sources:
name = s["source_name"]
evt = s["event_count"]
folders = SOURCE_TO_SIGMA.get(name, [])
if not folders:
print(f"--- {name} ({evt:,} events) -- no Sigma mapping curated")
continue
print(f"\n--- {name} ({evt:,} events) ---")
for folder, why in folders:
if folder in seen:
continue
seen.add(folder)
sparse_paths.append(folder)
count_str = ""
if not args.no_github:
n = github_recursive_count(folder)
count_str = f" [~{n} rules]"
print(f" * {folder}{count_str}")
print(f" {why}")
# 5) Concrete import commands
print(f"\n{'='*78}\n COPY/PASTE: import these folders only\n{'='*78}\n")
print(" # 1. clone Sigma with sparse-checkout (no full 5GB history)")
print(" git clone --filter=blob:none --no-checkout "
f"{SIGMA_REPO}.git /tmp/sigma")
print(" cd /tmp/sigma")
print(" git sparse-checkout init --cone")
print(" git sparse-checkout set \\")
for p in sparse_paths:
print(f" {p} \\")
print(" # end of folder list")
print(" git checkout main")
print()
print(" # 2. push each .yml file into SIEM-toolkit-patched via the")
print(" # backend's /api/coverage/upload-sigma endpoint (one POST")
print(" # per file, multipart/form-data):")
print(f"""
find . -path './rules*' -name '*.yml' | while read f ; do
curl -sS -F "file=@$f" {args.backend}/api/coverage/upload-sigma \\
-w "%{{http_code}} $f\\n" -o /dev/null
done
""")
# 6) High-value individual rules (curated -- always worth importing)
print(f"{'='*78}\n HIGH-PRIORITY INDIVIDUAL RULES (curated)\n{'='*78}")
must_have = [
# Lateral Movement -- weak tactic (83 rules)
("rules/windows/builtin/security/win_security_admin_rdp_login.yml",
"Lateral Movement", "T1021.001 RDP"),
("rules/windows/builtin/security/"
"win_security_susp_smb_share_object_access_lateral_movement.yml",
"Lateral Movement", "T1021.002 SMB"),
("rules/windows/process_creation/"
"proc_creation_win_winrm_lateral_movement.yml",
"Lateral Movement", "T1021.006 WinRM"),
# Collection -- weak tactic (77 rules)
("rules/windows/process_creation/"
"proc_creation_win_susp_screenshot.yml",
"Collection", "T1113 Screen Capture"),
("rules/windows/process_creation/"
"proc_creation_win_powershell_clipboard.yml",
"Collection", "T1115 Clipboard Data"),
# Exfiltration -- weak tactic (91 rules)
("rules/windows/network_connection/"
"net_connection_win_rclone.yml",
"Exfiltration", "T1567.002 Exfil to Cloud Storage"),
("rules/windows/process_creation/"
"proc_creation_win_rar_compress_data.yml",
"Exfiltration", "T1560.001 Archive via Utility"),
# Reconnaissance -- THINNEST tactic (11 rules)
("rules/windows/process_creation/"
"proc_creation_win_susp_netsh_dump_config.yml",
"Reconnaissance", "T1016 System Network Config Discovery"),
("rules/windows/process_creation/"
"proc_creation_win_susp_adsisearcher.yml",
"Reconnaissance", "T1087.002 Domain Account Discovery"),
# Discovery
("rules/windows/process_creation/"
"proc_creation_win_susp_bloodhound_sharphound.yml",
"Discovery", "T1087/T1482 BloodHound/SharpHound"),
# Credential Access (already 217 rules but always topical)
("rules/windows/process_creation/"
"proc_creation_win_susp_mimikatz_command_line.yml",
"Credential Access", "T1003.001 LSASS Memory"),
("rules/windows/process_creation/"
"proc_creation_win_susp_lsass_dump.yml",
"Credential Access", "T1003.001 LSASS Memory"),
# Azure -- broad coverage gap
("rules/cloud/azure/signinlogs/"
"azure_aad_sign_ins_from_noninteractive_devices.yml",
"Initial Access", "T1078.004 Cloud Account abuse"),
("rules/cloud/azure/auditlogs/"
"azure_aad_role_assigned.yml",
"Privilege Escalation", "T1098 Account Manipulation"),
]
print(f" {'tactic':<22}{'technique':<35}rule")
for path, tactic, tech in must_have:
print(f" {tactic:<22}{tech:<35}{path}")
print(f"\n These 14 rules close the thinnest gaps surfaced by the")
print(f" Threat Coverage map above. Import them FIRST, then iterate")
print(f" through the bulk folders.\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
-295
View File
@@ -1,295 +0,0 @@
#!/usr/bin/env python3
"""
run_sigma_on_tenant.py
Re-runs the same 10 Sigma->PowerQuery rules against ANY tenant by
re-pointing the credentials. The 10 converted .pq bodies in
SIGMA_OUT_DIR (default /tmp/sigma_converted_v4) are tenant-agnostic --
they only depend on the SDL DV schema, not on the specific tenant URL.
Pipeline:
Step 0 -- discover sites via /sites?limit=10 (token introspection)
Step 1 -- probe tenant telemetry: last 24 h volume on the EDR/DV
fields the converted rules query
(event.type, endpoint.os, tgt.process.cmdline, ...)
Step 2 -- smoke-test each of the 10 faithful .pq bodies against the
tenant's /api/powerQuery
Step 3 -- deploy each as an SDL Scheduled rule via the Mgmt API
POST /web/api/v2.1/cloud-detection/rules
Step 4 -- verify the deployed rules via PUT-existence test
Reads tenant credentials from tenant_config.json next to this script.
Override with the SIEM_TOOLKIT_CONFIG env var. Override the artefact
location with SIGMA_OUT_DIR. Override the target site with SITE_ID.
"""
from __future__ import annotations
import json
import os
import pathlib
import time
import urllib.error
import urllib.request
from typing import Any
HERE = pathlib.Path(__file__).resolve().parent
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
ART = pathlib.Path(os.environ.get(
"SIGMA_OUT_DIR", "/tmp/sigma_converted_v4"))
SDL_BASE = CFG["SDL_XDR_URL"].rstrip("/")
SDL_KEY = CFG["SDL_LOG_READ_KEY"]
S1_CONS = CFG["S1_CONSOLE_URL"].rstrip("/")
S1_TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".")
RULES: list[tuple[str, str, str]] = [
("Lateral Movement", "T1021.006 WinRM (evil-winrm)",
"proc_creation_win_hktl_evil_winrm.pq"),
("Collection", "T1113 Screen Capture (Psr.exe)",
"proc_creation_win_psr_capture_screenshots.pq"),
("Collection", "T1115 Clipboard (Get-Clipboard)",
"proc_creation_win_powershell_get_clipboard.pq"),
("Exfiltration", "T1560.001 RAR (.dmp files)",
"proc_creation_win_winrar_exfil_dmp_files.pq"),
("Exfiltration", "T1567.002 rclone",
"proc_creation_win_pua_rclone_execution.pq"),
("Reconnaissance", "T1016 netsh portproxy",
"proc_creation_win_netsh_port_forwarding.pq"),
("Discovery", "T1087/T1033 whoami /priv",
"proc_creation_win_whoami_priv_discovery.pq"),
("Discovery", "T1087/T1482 SharpHound",
"proc_creation_win_hktl_bloodhound_sharphound.pq"),
("Credential Access", "T1003.001 Mimikatz cmd-line",
"proc_creation_win_hktl_mimikatz_command_line.pq"),
("Credential Access", "T1003.001 ProcDump LSASS",
"proc_creation_win_sysinternals_procdump_lsass.pq"),
]
# ----------------------------------------------------- helpers --------------
def pq(query: str, hours: int = 24) -> tuple[int, str, int]:
end = int(time.time() * 1000); start = end - hours * 3600 * 1000
body = {"token": SDL_KEY, "query": query,
"startTime": str(start), "endTime": str(end)}
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps(body).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as r:
d = json.loads(r.read())
return 200, "ok", len(d.get("values") or [])
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:250], 0
def pq_count(query: str) -> int:
wrapped = f"{query} | group n=count() | limit 1"
code, _, rows = pq(wrapped)
if code != 200 or rows == 0:
return 0
end = int(time.time() * 1000); start = end - 24 * 3600 * 1000
req = urllib.request.Request(
f"{SDL_BASE}/api/powerQuery",
data=json.dumps({"token": SDL_KEY, "query": wrapped,
"startTime": str(start),
"endTime": str(end)}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
d = json.loads(urllib.request.urlopen(req, timeout=60).read())
v = (d.get("values") or [[None]])[0]
return int(v[0]) if v and v[0] is not None else 0
except Exception:
return 0
def mgmt_get(path: str) -> tuple[int, dict]:
req = urllib.request.Request(f"{S1_CONS}{path}")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, json.loads(r.read())
except urllib.error.HTTPError as e:
try:
return e.code, json.loads(e.read())
except Exception:
return e.code, {"_body": "(non-json)"}
def deploy_rule(site_id: str, name: str, desc: str,
body: str) -> tuple[int, str]:
payload = {
"data": {"name": name, "description": desc, "severity": "Medium",
"expirationMode": "Permanent", "queryType": "scheduled",
"queryLang": "2.0", "status": "Draft",
"treatAsThreat": "UNDEFINED", "networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [site_id]}}
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules",
data=json.dumps(payload).encode(), method="POST")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
d = json.loads(r.read())
return 200, str((d.get("data") or {}).get("id") or "?")
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:300]
def put_rule(site_id: str, rule_id: str, name: str,
body: str) -> tuple[int, str]:
payload = {
"data": {"name": name, "description": f"verify-by-PUT for {name}",
"severity": "Medium", "expirationMode": "Permanent",
"queryType": "scheduled", "queryLang": "2.0",
"status": "Draft", "treatAsThreat": "UNDEFINED",
"networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [site_id]}}
req = urllib.request.Request(
f"{S1_CONS}/web/api/v2.1/cloud-detection/rules/{rule_id}",
data=json.dumps(payload).encode(), method="PUT")
req.add_header("Authorization", f"ApiToken {S1_TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, "ok"
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:200]
# ----------------------------------------------------- main -----------------
def main() -> int:
print(f"\n{'='*78}\n Sigma -> PowerQuery -> SDL on US tenant\n"
f"{'='*78}")
print(f" Mgmt API : {S1_CONS}")
print(f" SDL : {SDL_BASE}")
print(f" Artefact : {ART}\n")
# --- 0. discover sites on US tenant ----------------------------------
print("--- Step 0: discover sites + token identity ---------------------")
code, d = mgmt_get("/web/api/v2.1/sites?limit=10")
if code != 200:
print(f" HTTP {code} {str(d)[:300]}")
return 1
sites = (d.get("data") or {}).get("sites") or []
print(f" Sites visible to token: {len(sites)}")
for s in sites[:5]:
print(f" id={s.get('id')} name={s.get('name')} "
f"state={s.get('state')}")
if not sites:
print(" FATAL: no sites visible -- token has no scope here")
return 1
site_id = sites[0]["id"]
print(f" --> deploying into site_id={site_id} "
f"({sites[0].get('name')})\n")
# --- 1. tenant schema probe ------------------------------------------
print("--- Step 1: probe US tenant telemetry (last 24 h) --------------")
probes = {
"event.type='Process Creation'":
"event.type='Process Creation'",
"endpoint.os='windows'":
"endpoint.os='windows'",
"tgt.process.cmdline non-empty":
"tgt.process.cmdline!=''",
"src.process.image.path non-empty":
"src.process.image.path!=''",
}
for label, q in probes.items():
n = pq_count(q)
print(f" {label:<45}{n}")
print()
# --- 2. smoke-test 10 rules ------------------------------------------
print("--- Step 2: smoke-test 10 faithful PQ bodies -------------------")
test_results = []
for i, (tactic, tech, fname) in enumerate(RULES, 1):
pq_path = ART / fname
if not pq_path.exists():
print(f" [{i:>2}] {tactic:<18}{tech:<32} MISSING {fname}")
test_results.append((i, tactic, tech, fname, None, None))
continue
body = pq_path.read_text()
code, msg, rows = pq(body)
print(f" [{i:>2}] {tactic:<18}{tech:<32} HTTP {code} rows={rows}")
if code != 200:
print(f" err: {msg[:160]}")
test_results.append((i, tactic, tech, fname, code, rows))
print()
# --- 3. deploy --------------------------------------------------------
print("--- Step 3: deploy each valid PQ as SDL Scheduled rule ---------")
deployed: list[tuple[int, str, str, str, str]] = [] # i, tactic, tech, fname, id
for (i, tactic, tech, fname, code, rows) in test_results:
if code != 200:
print(f" [{i:>2}] SKIP (smoke-test failed)")
continue
body = (ART / fname).read_text()
name = f"[Sigma->PQ USEA1] {tactic} / {tech} ({pathlib.Path(fname).stem})"[:128]
desc = (f"Auto-converted Sigma rule. "
f"Source: /tmp/sigma_converted_v4/{fname}. "
f"Faithful S1 DV schema.")
dc, dmsg = deploy_rule(site_id, name, desc, body)
verdict = (f"id={dmsg}" if dc == 200 else f"FAIL HTTP {dc} "
f"{dmsg[:160]}")
print(f" [{i:>2}] DEPLOY HTTP {dc} {verdict}")
if dc == 200:
deployed.append((i, tactic, tech, fname, dmsg))
print()
# --- 4. PUT verification ---------------------------------------------
if deployed:
print("--- Step 4: PUT-existence verification --------------------")
exists = 0; gone = 0
for (i, tactic, tech, fname, rid) in deployed:
body = (ART / fname).read_text()
name = f"[Sigma->PQ USEA1 verify] {tactic} / {tech}"[:128]
pc, pmsg = put_rule(site_id, rid, name, body)
verdict = ("EXISTS" if pc in (200, 204)
else "NOT FOUND" if pc == 404
else f"HTTP {pc} {pmsg[:80]}")
print(f" [{i:>2}] id={rid} PUT HTTP {pc} {verdict}")
if pc in (200, 204):
exists += 1
elif pc == 404:
gone += 1
# --- summary ----------------------------------------------------------
print(f"\n{'='*78}\n SUMMARY\n{'='*78}")
valid = sum(1 for (_, _, _, _, c, _) in test_results if c == 200)
print(f" Smoke-test passed : {valid}/10")
print(f" Rules deployed : {len(deployed)}/10")
if deployed:
ids_file = HERE / "deployed_rule_ids.json"
ids_file.write_text(json.dumps(
{"tenant": S1_CONS, "site_id": site_id,
"rules": [{"rule_id": rid, "pq_file": fname,
"tactic": tactic, "tech": tech}
for (_, tactic, tech, fname, rid) in deployed]},
indent=2))
print(f" Deployed IDs : {ids_file}")
print(f" PUT-verified exists : (see Step 4 above)")
print(f"\n Console: {S1_CONS}/#/cloud-detection/rules\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
-8
View File
@@ -1,8 +0,0 @@
{
"_comment_": "Copy to tenant_config.json and fill in. tenant_config.json is gitignored. See README_sigma_pipeline.md for setup. All five keys are required for end-to-end Sigma->PQ deploys.",
"S1_CONSOLE_URL": "https://<region>-<tenant>.example",
"S1_CONSOLE_API_TOKEN": "<S1 Mgmt API token: Settings -> Users -> Service Users>",
"SDL_XDR_URL": "https://xdr.<region>.example",
"SDL_LOG_READ_KEY": "<SDL Log Read scope key: Settings -> Integrations -> Data Lake API Keys>",
"SDL_CONFIG_READ_KEY": "<SDL Configuration Read scope key (only needed for parser sync)>"
}
-137
View File
@@ -1,137 +0,0 @@
#!/usr/bin/env python3
"""
verify_deployed_sigma_rules.py (formerly _v3)
Diagnostic for the RBAC visibility quirk: when a service-user role has
`cloudDetectionRulesCreateEdit` but not `cloudDetectionRulesView`, POST
succeeds and returns rule IDs, but GET /rules silently hides those rules.
This script probes several scope-filter variants to characterise what
the token CAN see:
- direct GET /rules/{id}
- list with ?ids=<csv>
- list with siteIds=, accountIds=, tenant=true, no scope
- list with queryType= filter
Reads tenant credentials from tenant_config.json and the rule IDs from
deployed_rule_ids.json (both next to this script). Set SIEM_TOOLKIT_CONFIG
or DEPLOYED_IDS_FILE env vars to override.
"""
from __future__ import annotations
import json
import os
import pathlib
import urllib.error
import urllib.parse
import urllib.request
HERE = pathlib.Path(__file__).resolve().parent
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
BASE = CFG["S1_CONSOLE_URL"].rstrip("/")
TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".")
_IDS_PATH = pathlib.Path(os.environ.get(
"DEPLOYED_IDS_FILE", str(HERE / "deployed_rule_ids.json")))
if not _IDS_PATH.exists():
raise SystemExit(f"{_IDS_PATH} not found. "
f"Run convert_test_deploy_sigma.py --deploy first.")
_STATE = json.loads(_IDS_PATH.read_text())
SITE = _STATE.get("site_id") or os.environ.get("SITE_ID") or ""
DEPLOYED_IDS = [r["rule_id"] for r in (_STATE.get("rules") or [])]
def get_json(path: str):
req = urllib.request.Request(f"{BASE}{path}")
req.add_header("Authorization", f"ApiToken {TOK}")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, json.loads(r.read())
except urllib.error.HTTPError as e:
try:
body = json.loads(e.read())
except Exception:
body = {"_raw": "(non-json)"}
return e.code, body
def main() -> int:
print(f"\n{'='*78}\n Verify deployed rules via `ids=` filter\n"
f"{'='*78}\n Tenant : {BASE}\n Site : {SITE or '(unset)'}\n"
f" IDs : {len(DEPLOYED_IDS)} rules from {_IDS_PATH.name}\n")
# --- 1. token / user identity -----------------------------------
print("--- Step 1: token identity -------------------------------------")
code, d = get_json("/web/api/v2.1/users/api-token-details")
if code == 200:
data = d.get("data") or {}
print(f" user : {data.get('email') or data.get('fullName')}")
print(f" scope : {data.get('scope')}")
print(f" scope id : {data.get('scopeId')}")
print(f" expires : {data.get('expiresAt') or 'never'}")
else:
# Service-user JWT often can't introspect itself
code2, d2 = get_json("/web/api/v2.1/user")
if code2 == 200:
data = d2.get("data") or {}
print(f" user : {data.get('email')}")
print(f" scope : {data.get('scope')}")
else:
print(f" HTTP {code} / {code2} cannot introspect token "
"(common for service-user JWTs)")
if not DEPLOYED_IDS:
print(" No deployed rule IDs to verify.")
return 0
# --- 2. list with ids= filter, NO scope filter ------------------
print("\n--- Step 2: list with `ids=<csv>` (no scope filter) -----------")
ids = ",".join(DEPLOYED_IDS)
code, d = get_json(f"/web/api/v2.1/cloud-detection/rules?ids={ids}")
if code != 200:
print(f" HTTP {code} {json.dumps(d)[:300]}")
else:
rules = d.get("data") or []
print(f" Returned : {len(rules)} of {len(DEPLOYED_IDS)} requested")
for r in rules:
scope = (((r.get("scope") or {})
or {}).get("scopeName") or
r.get("siteName") or r.get("accountName") or "?")
print(f" id={r.get('id')} status={r.get('status'):<10} "
f"scope={scope} name={(r.get('name') or '')[:65]}")
# --- 3. list ids= AND siteIds= ----------------------------------
print("\n--- Step 3: list with `ids=` AND `siteIds=` -------------------")
code, d = get_json(
f"/web/api/v2.1/cloud-detection/rules?ids={ids}&siteIds={SITE}")
if code != 200:
print(f" HTTP {code} {json.dumps(d)[:300]}")
else:
print(f" Returned : {len(d.get('data') or [])} of "
f"{len(DEPLOYED_IDS)}")
# --- 4. list all visible scheduled rules without scope ----------
print("\n--- Step 4: list with queryType= filter ---------------------")
code, d = get_json(
"/web/api/v2.1/cloud-detection/rules"
"?queryType=scheduled&limit=200")
if code != 200:
print(f" HTTP {code} {json.dumps(d)[:300]}")
else:
rules = d.get("data") or []
sigma = [r for r in rules
if "[Sigma->PQ]" in (r.get("name") or "")]
print(f" visible scheduled rules : {len(rules)}")
print(f" of which [Sigma->PQ] : {len(sigma)}")
for r in sigma:
print(f" id={r.get('id')} status={r.get('status'):<10} "
f"{(r.get('name') or '')[:70]}")
print(f"\n Console:\n {BASE}/#/cloud-detection/rules\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
-124
View File
@@ -1,124 +0,0 @@
#!/usr/bin/env python3
"""
verify_rule_exists_via_put.py
Service-user tokens often have `cloudDetectionRulesCreateEdit` but lack
`cloudDetectionRulesView`. Result: POST/PUT/DELETE on a rule succeed,
but GET /rules and GET /rules/{id} silently filter the rule out. PUT
is the definitive existence test -- it returns 200/204 when the rule
exists and 404 when it does not.
Reads the (rule_id, pq_file) map produced by convert_test_deploy_sigma.py
in deployed_rule_ids.json next to this script.
Outputs:
EXISTS / NOT_FOUND verdict per rule, plus a summary.
"""
from __future__ import annotations
import json
import os
import pathlib
import urllib.error
import urllib.request
HERE = pathlib.Path(__file__).resolve().parent
_CFG_PATH = os.environ.get("SIEM_TOOLKIT_CONFIG",
str(HERE / "tenant_config.json"))
CFG = json.load(open(_CFG_PATH))
BASE = CFG["S1_CONSOLE_URL"].rstrip("/")
TOK = CFG["S1_CONSOLE_API_TOKEN"].rstrip(".")
IDS_FILE = pathlib.Path(os.environ.get(
"DEPLOYED_IDS_FILE", str(HERE / "deployed_rule_ids.json")))
ART_DIR = pathlib.Path(os.environ.get(
"SIGMA_OUT_DIR", "/tmp/sigma_converted_v4"))
def put_rule(site_id: str, rule_id: str, name: str,
body: str) -> tuple[int, str]:
payload = {
"data": {"name": name,
"description": f"verify-by-PUT for {name}",
"severity": "Medium",
"expirationMode": "Permanent",
"queryType": "scheduled",
"queryLang": "2.0",
"status": "Draft",
"treatAsThreat": "UNDEFINED",
"networkQuarantine": False,
"coolOffSettings": {"renotifyMinutes": 60},
"scheduledParams": {"query": body,
"lookbackWindowMinutes": 30,
"runIntervalMinutes": 5,
"threshold": {"value": 0,
"operator": "Greater"}}},
"filter": {"siteIds": [site_id]}}
req = urllib.request.Request(
f"{BASE}/web/api/v2.1/cloud-detection/rules/{rule_id}",
data=json.dumps(payload).encode(), method="PUT")
req.add_header("Authorization", f"ApiToken {TOK}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, r.read().decode()[:240]
except urllib.error.HTTPError as e:
return e.code, e.read().decode()[:240]
def main() -> int:
print(f"\n{'='*78}\n Verify rules via PUT-existence test\n{'='*78}")
print(f" Tenant : {BASE}")
print(f" IDs file : {IDS_FILE}")
print(f" Artefacts: {ART_DIR}\n")
if not IDS_FILE.exists():
print(f" FATAL: {IDS_FILE} not found.\n"
f" Run convert_test_deploy_sigma.py --deploy first.")
return 1
state = json.loads(IDS_FILE.read_text())
rules = state.get("rules") or []
site = state.get("site_id") or os.environ.get("SITE_ID", "")
if not site:
print(" FATAL: site_id missing in deployed_rule_ids.json")
return 1
print(f" Site : {site}")
print(f" Rules : {len(rules)} deployed entries\n")
print(f" {'#':>3} {'rule':<32}{'id':<22}{'http':>5} result")
print(" " + "-" * 100)
exists = gone = other = 0
for i, r in enumerate(rules, 1):
rid = r["rule_id"]
label = f"{r['tactic']} {r['tech']}"
pq_path = ART_DIR / r["pq_file"]
if not pq_path.exists():
print(f" {i:>3} {label[:32]:<32}{rid:<22} -- "
f"pq file missing: {pq_path.name}")
continue
code, msg = put_rule(site, rid, f"[Sigma->PQ verify] {label}",
pq_path.read_text())
if code in (200, 204):
verdict = "EXISTS"; exists += 1
elif code == 404:
verdict = "NOT FOUND"; gone += 1
else:
verdict = f"HTTP {code} {msg[:80]}"; other += 1
print(f" {i:>3} {label[:32]:<32}{rid:<22}{code:>5} {verdict}")
print(f"\n Summary:")
print(f" EXISTS (PUT 200/204) : {exists}/{len(rules)}")
print(f" 404 NOT FOUND : {gone}/{len(rules)}")
print(f" Other (auth/RBAC) : {other}/{len(rules)}")
if exists > 0:
print(f"\n Rules ARE deployed. If GET /rules can't see them,")
print(f" the service-user role lacks `cloudDetectionRulesView`.")
print(f" Open the console UI (wider RBAC):")
print(f" {BASE}/#/cloud-detection/rules\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())