Auto-load detection library from S1 API, improve coverage map accuracy

- Fetch detection library rules from platform-rules API at startup (falls
  back to extracted.json); adds Sync Detection Library button for refresh
- Parser column simplified to ✓ Parsed / ✗ Not Parsed
- Detection counts now use library rules only (exclude custom STAR rules)
- Add close-match suggestions for dataSource.name mismatches (e.g. CloudTrail
  → AWS CloudTrail, Microsoft 365 Collaboration → Microsoft O365)
- Exclude SentinelOne Ranger AD from coverage map (native S1 source)
- Add success feedback banners to Load SDL Parsers and Sync Library buttons
- Remove rule_counts.json manual override; extracted.json is source of truth
- Remove Load Detections button; rules auto-import on backend startup
- Add get_account_id() and get_platform_rules() to s1_client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mick
2026-05-20 15:14:10 -04:00
parent 6e137438b1
commit 6cd9da82da
8 changed files with 580 additions and 90 deletions
+208 -30
View File
@@ -1,4 +1,5 @@
import json
import os
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -6,6 +7,8 @@ from datetime import datetime
from db import get_db, ParsedRule, ParserField, ActiveSource
from services import s1_client, rule_parser
DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
router = APIRouter()
@@ -40,22 +43,12 @@ def _star_query_texts(rule: dict) -> list[str]:
@router.post("/load-star-rules")
async def load_star_rules(library_only: bool = None, db: Session = Depends(get_db)):
"""Fetch STAR rules from SentinelOne and index their fields.
library_only defaults to the STAR_LIBRARY_ONLY env var (default true).
Pass ?library_only=false to include custom tenant rules as well.
"""
import os
if library_only is None:
library_only = os.environ.get("STAR_LIBRARY_ONLY", "true").lower() != "false"
async def load_star_rules(db: Session = Depends(get_db)):
"""Fetch all STAR rules from the Management Console API and index their fields."""
try:
rules = await s1_client.get_star_rules()
except Exception as e:
raise HTTPException(502, f"S1 API error: {e}")
if library_only:
rules = [r for r in rules if str(r.get("creator", "")).lower().endswith("@sentinelone.com")]
raise HTTPException(502, f"S1 API error: {type(e).__name__}: {e}")
# Replace all existing STAR rules cleanly to avoid duplicate key errors
db.query(ParsedRule).filter_by(rule_type="star").delete()
@@ -81,6 +74,118 @@ async def load_star_rules(library_only: bool = None, db: Session = Depends(get_d
return {"loaded": len(loaded), "rules": loaded}
_EXCLUDED_PATHS = ("/rules/silent/", "/rules/dev/")
def _import_from_api_rules(db, rules: list) -> int:
"""
Import platform rules fetched directly from the S1 API into the database.
Each rule has a 'sources' list — the authoritative dataSource.name values.
"""
db.query(ParsedRule).filter_by(rule_type="library").delete()
db.commit()
loaded = 0
seen_ids: set = set()
for rule in rules:
rule_id = str(rule.get("id", f"lib_{loaded}"))
if rule_id in seen_ids:
continue
seen_ids.add(rule_id)
sources = rule.get("sources") or []
db.add(ParsedRule(
rule_id=rule_id,
name=rule.get("name", "unnamed"),
rule_type="library",
fields_used=[], # API rules don't expose field-level info
raw=json.dumps({"data_sources": sources}),
))
loaded += 1
if loaded % 500 == 0:
db.flush()
db.commit()
return loaded
def _import_detections(db, detections_file: str) -> int:
"""
Import library detection rules from extracted.json into the database.
Replaces any existing library rules. Returns the count of rules loaded.
"""
with open(detections_file, "r", encoding="utf-8") as fh:
data = json.load(fh)
results = data.get("results", [])
results = [r for r in results if not any(r.get("file", "").startswith(p) for p in _EXCLUDED_PATHS)]
db.query(ParsedRule).filter_by(rule_type="library").delete()
db.commit()
loaded = 0
seen_ids: set = set()
for rule in results:
all_fields: set = set()
data_sources: list[str] = []
for q in rule.get("queries", []):
all_fields.update(q.get("keys", []))
ds_vals = q.get("pairs", {}).get("dataSource.name", [])
for v in ds_vals:
if isinstance(v, str):
data_sources.append(v)
elif isinstance(v, list):
data_sources.extend(str(x) for x in v)
rule_id = str(rule.get("id", f"lib_{loaded}"))
if rule_id in seen_ids:
continue
seen_ids.add(rule_id)
db.add(ParsedRule(
rule_id=rule_id,
name=rule.get("name", "unnamed"),
rule_type="library",
fields_used=list(all_fields),
raw=json.dumps({"data_sources": list(set(data_sources))}),
))
loaded += 1
if loaded % 500 == 0:
db.flush()
db.commit()
return loaded
@router.post("/load-detections")
async def load_detections(db: Session = Depends(get_db)):
"""
Reload detection library rules.
Tries the live S1 API first (platform-rules endpoint); falls back to extracted.json.
"""
# Prefer the live API — gives accurate 'sources' and is always up to date
try:
rules = await s1_client.get_platform_rules()
if rules:
loaded = _import_from_api_rules(db, rules)
return {"loaded": loaded, "source": "api"}
except Exception:
pass
# Fall back to local extracted.json
if not os.path.exists(DETECTIONS_FILE):
raise HTTPException(
404,
"S1 API unavailable and no detections file found — "
"ensure the data/ volume is mounted with detections.json"
)
try:
loaded = _import_detections(db, DETECTIONS_FILE)
except Exception as e:
raise HTTPException(500, f"Failed to import detections: {e}")
return {"loaded": loaded, "source": "file"}
@router.post("/upload-sigma")
async def upload_sigma(files: list[UploadFile] = File(...), db: Session = Depends(get_db)):
"""Upload one or more Sigma YAML files and index their fields."""
@@ -216,11 +321,21 @@ async def load_parser_content(payload: ParserContentPayload, db: Session = Depen
return {"parser": payload.parser_name, "fields": list(fields), "field_count": len(fields)}
# Native SentinelOne platform sources — parsed by the system, not by SDL parsers.
# Excluded from the coverage map as they do not require custom parser coverage.
_S1_NATIVE_SOURCES = {
"SentinelOne", "asset", "alert", "vulnerability",
"ActivityFeed", "indicator", "misconfiguration",
"SentinelOne Ranger AD",
}
@router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
"""Pull active dataSource.names from the SDL and store them.
Also detects whether a parser is already producing structured fields
for each source by checking if event.type is populated in the data lake.
Native S1 platform sources are excluded as they do not require SDL parsers.
"""
import asyncio
from datetime import datetime, timedelta
@@ -255,7 +370,7 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
seen = 0
for row in rows:
name = row.get("dataSource.name")
if name:
if name and name not in _S1_NATIVE_SOURCES:
db.add(ActiveSource(
source_name=name,
event_count=row.get("events", 0),
@@ -264,7 +379,7 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
))
seen += 1
db.commit()
return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name")]}
return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]}
def _build_parser_ds_index() -> dict[str, dict]:
@@ -367,19 +482,28 @@ def get_coverage_map(db: Session = Depends(get_db)):
# Build rule index: source_name → rules that reference it
rule_by_source: dict[str, list] = {}
for rule in rules:
query_texts = _star_query_texts(json.loads(rule.raw)) if rule.rule_type == "star" else []
data_sources = rule_parser.extract_data_sources(query_texts)
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
if rule.rule_type == "library":
# Library rules store pre-extracted data_sources list in raw
data_sources = raw_data.get("data_sources", [])
else:
query_texts = _star_query_texts(raw_data)
data_sources = rule_parser.extract_data_sources(query_texts)
for ds in data_sources:
rule_by_source.setdefault(ds, []).append({"rule": rule.name, "type": rule.rule_type})
if not data_sources:
# Rule with no explicit source filter — applies to all
rule_by_source.setdefault("__any__", []).append({"rule": rule.name, "type": rule.rule_type})
# Fields to ignore when computing "missing" — these are metadata/schema fields
# always present in events regardless of the parser
_SCHEMA_FIELDS = {
"dataSource.name", "dataSource.vendor", "dataSource.category",
"event.type", "timestamp", "src.endpoint.ip", "src.endpoint.name",
# Endpoint agent fields — populated by the SentinelOne agent, not by SDL parsers
"cmdScript.content", "endpoint.os", "endpoint.name", "endpoint.uid",
}
sources_out = []
@@ -414,22 +538,75 @@ def get_coverage_map(db: Session = Depends(get_db)):
else:
needed_count += 1
rules_for_src = rule_by_source.get(src.source_name, []) + rule_by_source.get("__any__", [])
rules_for_src: list = [r for r in rule_by_source.get(src.source_name, []) if r["type"] == "library"]
# Fields all associated rules need, minus schema fields always present
rule_fields_needed: set = set()
# Close-match suggestions — shown when there are no library rules for this source.
close_matches: list = []
if not rules_for_src:
import re as _re
def _word_tokens(s: str) -> set:
"""Split on non-alphanumeric boundaries, lowercase, drop single chars."""
return {t for t in _re.split(r"[^a-z0-9]+", s.lower()) if len(t) >= 2}
def _is_close(a: str, b: str) -> bool:
na, nb = _normalize(a), _normalize(b)
# 1. Simple substring match
if na in nb or nb in na:
return True
# 2. Token-level: handles "Microsoft 365 Collaboration" vs "Microsoft O365"
# — "365" is inside "o365", and they share "microsoft"
ta, tb = _word_tokens(a), _word_tokens(b)
shared_exact = ta & tb
if not shared_exact:
return False # Must share at least one word exactly
# Check that a DISTINCTIVE (non-shared) token from one name
# appears as a substring inside a token from the other.
# This avoids matching "Azure AD" to "Azure Platform" on "azure" alone.
unique_a = ta - shared_exact
unique_b = tb - shared_exact
return any(
ua in ub or ub in ua
for ua in unique_a for ub in unique_b
if len(ua) >= 2 and len(ub) >= 2
)
sn = _normalize(src.source_name)
for lib_ds, lib_rules in rule_by_source.items():
lib_only = [r for r in lib_rules if r["type"] == "library"]
if not lib_only:
continue
if _is_close(src.source_name, lib_ds):
close_matches.append({
"library_name": lib_ds,
"rule_count": len(lib_only),
})
close_matches.sort(key=lambda x: x["rule_count"], reverse=True)
close_matches = close_matches[:3]
# Count how many rules reference each field (frequency)
field_freq: dict[str, int] = {}
for r in rules_for_src:
rule_fields_needed |= rule_fields_index.get(r["rule"], set())
rule_fields_needed -= _SCHEMA_FIELDS
for f in rule_fields_index.get(r["rule"], set()):
field_freq[f] = field_freq.get(f, 0) + 1
# Fields the parser provides
parser_provides = parser_index.get(matched_parser, set()) if matched_parser and matched_parser != "detected in data" else set()
# Missing = fields rules need that the parser doesn't provide.
# Only consider dotted-path fields (e.g. src.ip, winEventLog.channel) —
# single-word tokens are typically correlation variables or rule metadata.
rule_fields_dotted = {f for f in rule_fields_needed if "." in f}
missing_fields = sorted(rule_fields_dotted - parser_provides)
# Minimum number of rules that must reference a field before we flag it.
# Scales with rule count so single-rule oddities don't dominate.
rule_count = len(rules_for_src)
min_rules = max(2, round(rule_count * 0.05)) if rule_count >= 10 else 2
# Missing = dotted-path fields needed by >= min_rules rules,
# not in schema constants, not provided by the parser.
missing_fields = sorted(
f for f, count in field_freq.items()
if count >= min_rules
and "." in f
and f not in _SCHEMA_FIELDS
and f not in parser_provides
)
sources_out.append({
"source_name": src.source_name,
@@ -441,6 +618,7 @@ def get_coverage_map(db: Session = Depends(get_db)):
"parser_detected": src.parser_detected or 0,
"rules": rules_for_src,
"rule_count": len(rules_for_src),
"close_matches": close_matches,
"missing_fields": missing_fields,
"missing_fields_count": len(missing_fields),
"synced_at": src.synced_at.isoformat() if src.synced_at else None,
-3
View File
@@ -15,9 +15,6 @@ FIELDS = [
{"key": "SDL_XDR_URL", "label": "SDL XDR URL", "secret": False, "placeholder": "https://xdr.us1.sentinelone.net"},
{"key": "SDL_LOG_READ_KEY", "label": "SDL Log Read Key", "secret": True, "placeholder": "1DnK0Y4e..."},
{"key": "ANTHROPIC_API_KEY", "label": "Anthropic API Key", "secret": True, "placeholder": "sk-ant-..."},
{"key": "STAR_LIBRARY_ONLY", "label": "STAR Rules — Library Only", "secret": False, "placeholder": "true",
"type": "select", "options": ["true", "false"],
"hint": "true = load only SentinelOne Library rules (@sentinelone.com creators). false = include custom tenant rules as well."},
]
FIELD_KEYS = {f["key"] for f in FIELDS}