diff --git a/backend/db.py b/backend/db.py index 11004ee..d491e61 100644 --- a/backend/db.py +++ b/backend/db.py @@ -57,6 +57,23 @@ class RuleFiringCache(Base): checked_at = Column(DateTime, default=datetime.utcnow) +class CoverageSnapshot(Base): + __tablename__ = "coverage_snapshots" + id = Column(Integer, primary_key=True) + recorded_at = Column(DateTime, default=datetime.utcnow, index=True) + health_score = Column(Float, default=0.0) + parser_pct = Column(Float, default=0.0) # % sources with working parser + mitre_pct = Column(Float, default=0.0) # % ATT&CK tactics covered + firing_pct = Column(Float, default=0.0) # % rules that have fired + active_sources = Column(Integer, default=0) + covered_sources = Column(Integer, default=0) + rules_loaded = Column(Integer, default=0) + tactics_covered = Column(Integer, default=0) + techniques_covered = Column(Integer, default=0) + rules_with_mitre = Column(Integer, default=0) + rules_fired = Column(Integer, default=0) + + def get_db(): db = SessionLocal() try: diff --git a/backend/main.py b/backend/main.py index 762fc87..869f143 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from db import engine, Base, get_db, ParsedRule, RuleFiringCache -from routers import coverage, ingest, settings, quality +from db import engine, Base, get_db, ParsedRule, RuleFiringCache, CoverageSnapshot +from routers import coverage, ingest, settings, quality, query Base.metadata.create_all(bind=engine) @@ -23,6 +23,23 @@ with engine.connect() as _conn: "checked_at TIMESTAMP" ")" )) + _conn.execute(text( + "CREATE TABLE IF NOT EXISTS coverage_snapshots (" + "id SERIAL PRIMARY KEY, " + "recorded_at TIMESTAMP, " + "health_score FLOAT DEFAULT 0, " + "parser_pct FLOAT DEFAULT 0, " + "mitre_pct FLOAT DEFAULT 0, " + "firing_pct FLOAT DEFAULT 0, " + "active_sources INTEGER DEFAULT 0, " + "covered_sources INTEGER DEFAULT 0, " + "rules_loaded INTEGER DEFAULT 0, " + "tactics_covered INTEGER DEFAULT 0, " + "techniques_covered INTEGER DEFAULT 0, " + "rules_with_mitre INTEGER DEFAULT 0, " + "rules_fired INTEGER DEFAULT 0" + ")" + )) _conn.commit() app = FastAPI(title="SIEM Toolkit", version="1.0.0") @@ -73,6 +90,7 @@ app.include_router(coverage.router, prefix="/api/coverage", tags=["Coverage"]) app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) app.include_router(quality.router, prefix="/api/quality", tags=["Quality"]) +app.include_router(query.router, prefix="/api/query", tags=["Query"]) @app.get("/health") diff --git a/backend/routers/coverage.py b/backend/routers/coverage.py index 39b6cf0..db69729 100644 --- a/backend/routers/coverage.py +++ b/backend/routers/coverage.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, UploadFile, File, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from datetime import datetime -from db import get_db, ParsedRule, ParserField, ActiveSource, RuleFiringCache +from db import get_db, ParsedRule, ParserField, ActiveSource, RuleFiringCache, CoverageSnapshot from services import s1_client, rule_parser DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json") @@ -571,6 +571,27 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)): db.commit() synced_names = [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES] + + # Auto-record a coverage snapshot after every live-sources sync + try: + h = _compute_health(db) + db.add(CoverageSnapshot( + health_score=h["health_score"], + parser_pct=h["parser_pct"], + mitre_pct=h["mitre_pct"], + firing_pct=h["firing_pct"] or 0.0, + active_sources=h["active_sources"], + covered_sources=h["covered_sources"], + rules_loaded=h["rules_loaded"], + tactics_covered=h["tactics_covered"], + techniques_covered=h["techniques_covered"], + rules_with_mitre=h["rules_with_mitre"], + rules_fired=h["rules_fired"], + )) + db.commit() + except Exception: + pass # snapshot failure should never break sync + return {"synced": seen, "sources": synced_names} @@ -1061,6 +1082,292 @@ def get_rule_firing_cache(db: Session = Depends(get_db)): } +def _compute_health(db) -> dict: + """Compute current health score from DB state. + + Weights: + 40% parser coverage — what % of active sources have a working parser + 35% MITRE coverage — what % of the 14 standard ATT&CK tactics are covered + 25% rule firing — what % of library rules have fired (0 if cache empty) + """ + # --- Parser coverage --- + all_sources = db.query(ActiveSource).all() + total_sources = len(all_sources) + # "covered" = parser_detected > 0 (parser running in data lake) + covered_sources = sum(1 for s in all_sources if (s.parser_detected or 0) > 0) + parser_pct = round((covered_sources / total_sources * 100) if total_sources else 0.0, 1) + + # --- MITRE coverage --- + TOTAL_TACTICS = 14 # standard ATT&CK Enterprise tactic count + rules = db.query(ParsedRule).filter_by(rule_type="library").all() + total_rules = len(rules) + covered_tactics: set = set() + covered_techniques: set = set() + rules_with_mitre = 0 + for rule in rules: + try: + raw = json.loads(rule.raw) if rule.raw else {} + except Exception: + raw = {} + tactics = raw.get("tactics", []) + techniques = raw.get("techniques", []) + if tactics or techniques: + rules_with_mitre += 1 + for t in tactics: + if t and t != "Uncategorized": + covered_tactics.add(t) + for tech in techniques: + k = tech.get("id") or tech.get("name") + if k: + covered_techniques.add(k) + tactics_covered = len(covered_tactics) + techniques_covered = len(covered_techniques) + mitre_pct = round((tactics_covered / TOTAL_TACTICS * 100), 1) + + # --- Rule firing --- + firing_rows = db.query(RuleFiringCache).all() + cache_populated = len(firing_rows) > 0 + rules_fired = sum(1 for r in firing_rows if r.alert_count > 0) + if cache_populated and total_rules > 0: + firing_pct = round(rules_fired / total_rules * 100, 1) + else: + firing_pct = 0.0 + + # --- Weighted health score --- + if cache_populated: + score = round(0.40 * parser_pct + 0.35 * mitre_pct + 0.25 * firing_pct, 1) + else: + # Without firing data, reweight between parser + MITRE + score = round(0.55 * parser_pct + 0.45 * mitre_pct, 1) + + return { + "health_score": score, + "parser_pct": parser_pct, + "mitre_pct": mitre_pct, + "firing_pct": firing_pct if cache_populated else None, + "active_sources": total_sources, + "covered_sources": covered_sources, + "rules_loaded": total_rules, + "tactics_covered": tactics_covered, + "techniques_covered": techniques_covered, + "rules_with_mitre": rules_with_mitre, + "rules_fired": rules_fired, + "firing_cache_populated": cache_populated, + "components": { + "parser_coverage": {"value": parser_pct, "weight": 0.40 if cache_populated else 0.55, "label": "Parser Coverage"}, + "mitre_coverage": {"value": mitre_pct, "weight": 0.35 if cache_populated else 0.45, "label": "MITRE Coverage"}, + "rule_firing": {"value": firing_pct if cache_populated else None, "weight": 0.25 if cache_populated else 0.0, "label": "Rule Firing Rate"}, + } + } + + +@router.get("/health") +def get_health_score(db: Session = Depends(get_db)): + """Return the current tenant health score and component breakdown.""" + h = _compute_health(db) + # Most recent snapshot for trend comparison + prev = db.query(CoverageSnapshot).order_by(CoverageSnapshot.recorded_at.desc()).offset(1).first() + delta = None + if prev: + delta = round(h["health_score"] - prev.health_score, 1) + h["delta_from_previous"] = delta + return h + + +@router.post("/snapshot") +def record_snapshot(db: Session = Depends(get_db)): + """Record a coverage snapshot. Called automatically at end of sync-sources.""" + h = _compute_health(db) + snap = CoverageSnapshot( + health_score=h["health_score"], + parser_pct=h["parser_pct"], + mitre_pct=h["mitre_pct"], + firing_pct=h["firing_pct"] or 0.0, + active_sources=h["active_sources"], + covered_sources=h["covered_sources"], + rules_loaded=h["rules_loaded"], + tactics_covered=h["tactics_covered"], + techniques_covered=h["techniques_covered"], + rules_with_mitre=h["rules_with_mitre"], + rules_fired=h["rules_fired"], + ) + db.add(snap) + db.commit() + return {"recorded": True, "health_score": h["health_score"]} + + +@router.get("/snapshots") +def get_snapshots(limit: int = 30, db: Session = Depends(get_db)): + """Return the last N daily snapshots for sparkline charts.""" + rows = ( + db.query(CoverageSnapshot) + .order_by(CoverageSnapshot.recorded_at.desc()) + .limit(limit) + .all() + ) + return { + "snapshots": [ + { + "date": r.recorded_at.strftime("%Y-%m-%d"), + "health_score": r.health_score, + "parser_pct": r.parser_pct, + "mitre_pct": r.mitre_pct, + "firing_pct": r.firing_pct, + "active_sources": r.active_sources, + "covered_sources": r.covered_sources, + } + for r in reversed(rows) # chronological order + ] + } + + +@router.get("/dependency-map") +def get_dependency_map(db: Session = Depends(get_db)): + """ + Flip of the coverage map: for each detection library rule, show which + data sources it requires. Flags rules as 'at_risk' if any required + source has no parser or has zero recent events. + """ + rules = db.query(ParsedRule).filter_by(rule_type="library").all() + active_sources = {s.source_name: s for s in db.query(ActiveSource).all()} + ds_index, _ = _build_parser_ds_index() + + # Build set of source names that are "healthy" (have events + parser) + healthy_sources: set = set() + for name, src in active_sources.items(): + has_parser = name in ds_index or (src.parser_detected or 0) > 0 + if has_parser and (src.event_count or 0) > 0: + healthy_sources.add(name) + + out = [] + for rule in rules: + try: + raw_data = json.loads(rule.raw) if rule.raw else {} + except Exception: + raw_data = {} + + data_sources = raw_data.get("data_sources", []) + tactics = raw_data.get("tactics", []) + techniques = raw_data.get("techniques", []) + generated_alerts = raw_data.get("generated_alerts") + + source_statuses = [] + at_risk = False + for ds in data_sources: + src = active_sources.get(ds) + if src is None: + status = "inactive" + at_risk = True + elif ds not in healthy_sources: + status = "no_parser" + at_risk = True + else: + status = "healthy" + source_statuses.append({"source": ds, "status": status}) + + # Rules with no source requirements are not "at risk" (platform-wide rules) + if not data_sources: + at_risk = False + + out.append({ + "rule": rule.name, + "rule_id": rule.rule_id, + "sources": source_statuses, + "source_count": len(data_sources), + "tactics": tactics, + "techniques": [t.get("id", "") for t in techniques if t.get("id")], + "generated_alerts": generated_alerts, + "at_risk": at_risk, + "no_sources": len(data_sources) == 0, + }) + + # Sort: at-risk first, then by source count desc, then alphabetical + out.sort(key=lambda r: (not r["at_risk"], -r["source_count"], r["rule"])) + + at_risk_count = sum(1 for r in out if r["at_risk"]) + healthy_count = sum(1 for r in out if not r["at_risk"] and not r["no_sources"]) + + return { + "rules": out, + "total": len(out), + "at_risk": at_risk_count, + "healthy": healthy_count, + "no_source_requirements": sum(1 for r in out if r["no_sources"]), + } + + +@router.get("/onboarding-status") +def get_onboarding_status(db: Session = Depends(get_db)): + """ + Pipeline status for each active source across 6 lifecycle stages. + Returns per-source progress for the onboarding tracker view. + """ + import re as _re + active_sources = db.query(ActiveSource).order_by(ActiveSource.event_count.desc()).all() + ds_index, stub_parsers = _build_parser_ds_index() + stub_names = {s["parser_name"] for s in stub_parsers} + firing_cache = {r.rule_name: r.alert_count for r in db.query(RuleFiringCache).all()} + + # rule_by_source: source_name → list of rule names + rules = db.query(ParsedRule).filter_by(rule_type="library").all() + rule_by_source: dict = {} + for rule in rules: + try: + raw_data = json.loads(rule.raw) if rule.raw else {} + except Exception: + raw_data = {} + for ds in raw_data.get("data_sources", []): + rule_by_source.setdefault(ds, []).append(rule.name) + + def _normalize(s): + return _re.sub(r"[^a-z0-9]", "", s.lower()) + + def _find_parser(source_name): + if source_name in ds_index: + return ds_index[source_name] + sn = _normalize(source_name) + for ds_name, info in ds_index.items(): + if _normalize(ds_name) in sn or sn in _normalize(ds_name): + return info + return None + + out = [] + for src in active_sources: + parser_info = _find_parser(src.source_name) + parser_active = (src.parser_detected or 0) > 0 + has_ds_name = parser_info is not None and parser_info.get("parser_name") not in stub_names + rules_for_src = rule_by_source.get(src.source_name, []) + rules_firing = any(firing_cache.get(r, 0) > 0 for r in rules_for_src) + + stages = [ + {"stage": "Data Received", "done": (src.event_count or 0) > 0}, + {"stage": "Parser File Exists", "done": parser_info is not None}, + {"stage": "Parser Active", "done": parser_active}, + {"stage": "Source Labeled", "done": has_ds_name and parser_active}, + {"stage": "Detection Rules", "done": len(rules_for_src) > 0}, + {"stage": "Rules Firing", "done": rules_firing}, + ] + completed = sum(1 for s in stages if s["done"]) + out.append({ + "source": src.source_name, + "event_count": src.event_count, + "stages": stages, + "completed": completed, + "total": len(stages), + "pct": round(completed / len(stages) * 100), + }) + + # Sort: incomplete first, then by event volume + out.sort(key=lambda x: (x["completed"] == x["total"], -x["event_count"])) + + return { + "sources": out, + "fully_onboarded": sum(1 for s in out if s["completed"] == s["total"]), + "in_progress": sum(1 for s in out if 0 < s["completed"] < s["total"]), + "not_started": sum(1 for s in out if s["completed"] == 0), + } + + @router.delete("/reset") def reset_data(db: Session = Depends(get_db)): db.query(ParsedRule).delete() diff --git a/backend/routers/query.py b/backend/routers/query.py new file mode 100644 index 0000000..331f595 --- /dev/null +++ b/backend/routers/query.py @@ -0,0 +1,73 @@ +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from datetime import datetime, timedelta +from services import s1_client + +router = APIRouter() + + +def _date_range(hours: int | None = None, days: int | None = None) -> tuple[str, str]: + now = datetime.utcnow() + if hours: + delta = timedelta(hours=hours) + else: + delta = timedelta(days=days or 1) + return ( + (now - delta).strftime("%Y-%m-%dT%H:%M:%S.000Z"), + now.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + ) + + +PRESET_QUERIES = [ + {"label": "Top sources by volume", "query": "| group events=count() by dataSource.name | sort -events | limit 25"}, + {"label": "Unlabelled events", "query": "!(dataSource.name = *) !(source = 'scalyr') | group events=count() by source | sort -events | limit 25"}, + {"label": "Events by type", "query": "| group events=count() by dataSource.name, event.type | sort -events | limit 50"}, + {"label": "Failed logins", "query": "| filter event.type = 'Logon' | filter event.outcome = 'FAILED' | group count() by user.name, src.ip | sort -count() | limit 25"}, + {"label": "Process executions", "query": "| filter event.type = 'Process Creation' | group count() by src.process.name | sort -count() | limit 25"}, + {"label": "Network connections by dest", "query": "| filter event.type = 'IP Connect' | group count() by dst.ip | sort -count() | limit 25"}, + {"label": "Rules firing (30d)", "query": "| filter ruleName != '' | group alerts=count() by ruleName | sort -alerts | limit 50"}, +] + + +class QueryRequest(BaseModel): + query: str + hours: int | None = None + days: int | None = None + max_count: int = 1000 + + +@router.get("/presets") +def get_presets(): + return {"presets": PRESET_QUERIES} + + +@router.post("/run") +async def run_query(req: QueryRequest): + """Run a PowerQuery against the Singularity Data Lake.""" + if not req.query.strip(): + raise HTTPException(400, "Query cannot be empty") + if req.max_count > 10_000: + req.max_count = 10_000 + + from_dt, to_dt = _date_range(hours=req.hours, days=req.days) + + try: + result = await s1_client.run_powerquery(req.query, from_dt, to_dt, max_count=req.max_count) + except Exception as e: + raise HTTPException(502, f"PowerQuery error: {e}") + + err = result.get("error") if isinstance(result, dict) else None + if err: + raise HTTPException(502, f"PowerQuery error: {err}") + + events = result.get("events", []) + columns = sorted({k for row in events for k in row.keys()}) if events else [] + + return { + "rows": len(events), + "columns": columns, + "events": events, + "from": from_dt, + "to": to_dt, + "query": req.query, + } diff --git a/frontend/index.html b/frontend/index.html index ecdd6dd..1e65bb8 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -22,6 +22,10 @@ Ingest Dashboard Parser Quality Onboarding + + + Query + Threat Coverage
SentinelOne AI-SIEM · demo.sentinelone.net
Use Claude Code directly — no API key required
6-stage lifecycle tracker for every active data source
+Loading pipeline…
${esc(PROMPT)}
| Source | +Pipeline Stages | +Progress | +Events | +
|---|
No active sources found — sync sources on the Coverage Map first.
' : ''}` + } catch(e) { + if (tableEl) tableEl.innerHTML = `${esc(e.message)}
` + } +} + +function obToggleCompleted() { + _obShowCompleted = !_obShowCompleted + const rows = document.getElementById('ob-complete-rows') + const label = document.getElementById('ob-complete-toggle-label') + if (rows) rows.classList.toggle('hidden', !_obShowCompleted) + if (label) { + const count = rows?.querySelectorAll('tr').length || 0 + label.textContent = (_obShowCompleted ? 'Hide' : 'Show') + ` completed (${count})` + } +} + // ── Settings ────────────────────────────────────────────────────────────── async function renderSettings() { @@ -1407,6 +1607,201 @@ async function qtTest() { } finally { setBtn('btn-qt', false, 'Test') } } +// ── PowerQuery Playground ───────────────────────────────────────────────── + +let _pqResults = [] // last query results for CSV export +let _pqHistory = [] // localStorage history + +function _pqLoadHistory() { + try { _pqHistory = JSON.parse(localStorage.getItem('pq_history') || '[]') } catch { _pqHistory = [] } +} + +function _pqSaveHistory(q) { + _pqHistory = [q, ..._pqHistory.filter(h => h !== q)].slice(0, 10) + try { localStorage.setItem('pq_history', JSON.stringify(_pqHistory)) } catch {} +} + +function _pqRenderHistory() { + const el = document.getElementById('pq-history') + if (!el) return + if (!_pqHistory.length) { el.innerHTML = ''; return } + el.innerHTML = `Run Scalyr PowerQueries directly against the Singularity Data Lake
+Querying data lake…
' + document.getElementById('btn-pq-csv')?.classList.add('hidden') + document.getElementById('pq-row-info').textContent = '' + + try { + const body = { + query, + max_count: Math.min(+(document.getElementById('pq-max')?.value || 1000), 10000), + } + if (window._pqRangeUnit === 'hours') body.hours = window._pqRangeVal + else body.days = window._pqRangeVal + + const r = await apiPost('/api/query/run', body) + _pqResults = r.events || [] + _pqSaveHistory(query) + _pqRenderHistory() + + const rowInfoEl = document.getElementById('pq-row-info') + if (rowInfoEl) rowInfoEl.textContent = `${r.rows} row${r.rows !== 1 ? 's' : ''} · ${r.from?.slice(0,10)} → ${r.to?.slice(0,10)}` + + if (!_pqResults.length) { + document.getElementById('pq-results').innerHTML = 'No results returned.
' + return + } + + const cols = r.columns || [] + const MAX_ROWS = 500 + const displayRows = _pqResults.slice(0, MAX_ROWS) + + const headers = cols.map(c => `Which data sources each detection rule requires — flags rules whose sources are missing or have no parser.
+Loading dependency map…
Loading dependency map…
' + try { + _depMapData = await apiGet('/api/coverage/dependency-map') + if (statsEl) { + statsEl.innerHTML = ` + ${statCard('At Risk', _depMapData.at_risk, _depMapData.at_risk > 0 ? 'text-red-400' : 'text-gray-500')} + ${statCard('Healthy', _depMapData.healthy, 'text-emerald-400')} + ${statCard('No Source Requirements', _depMapData.no_source_requirements, 'text-slate-400')}` + } + depMapRender() + } catch(e) { + if (tableEl) tableEl.innerHTML = `${esc(e.message)}
` + } +} + +function depMapRender() { + const tableEl = document.getElementById('depmap-table') + if (!tableEl || !_depMapData) return + const atRiskOnly = document.getElementById('depmap-at-risk-only')?.checked ?? false + + let rules = _depMapData.rules || [] + if (atRiskOnly) rules = rules.filter(r => r.at_risk) + // When filtering, also hide no-source rules unless explicitly showing all + const display = atRiskOnly ? rules : rules.filter(r => !r.no_sources).concat(rules.filter(r => r.no_sources)) + + if (!display.length) { + tableEl.innerHTML = 'No rules match the current filter.
' + return + } + + const SOURCE_STATUS_STYLE = { + healthy: 'bg-emerald-900/50 text-emerald-300 border-emerald-700', + inactive: 'bg-red-900/50 text-red-300 border-red-700', + no_parser:'bg-amber-900/50 text-amber-300 border-amber-700', + } + + const rows = display.map((r, i) => { + const statusBadge = r.at_risk + ? `⚠ At Risk` + : `✓ Covered` + + const sourceBadges = r.no_sources + ? `—` + : r.sources.map(s => + `${esc(s.source)}` + ).join(' ') + + const alerts = r.generated_alerts != null + ? `${r.generated_alerts.toLocaleString()}` + : `—` + + return `| Rule Name | +Required Sources | +Status | +Alerts | +
|---|
Source badges: green = healthy · red = inactive · amber = no parser
` +} + // ── Router ──────────────────────────────────────────────────────────────── function set(html) { document.getElementById('main').innerHTML = html } @@ -1640,6 +2134,7 @@ function route() { else if (h === '#/ingest') { updateNav('ingest'); renderIngest() } else if (h === '#/quality') { updateNav('quality'); renderQuality() } else if (h === '#/onboarding') { updateNav('onboarding'); renderOnboarding() } + else if (h === '#/query') { updateNav('query'); renderQuery() } else if (h === '#/threat') { updateNav('threat'); renderThreat() } else if (h === '#/settings') { updateNav('settings'); renderSettings() } else { updateNav('home'); renderHome() }