Files
marcredhat-siem-toolkit-pat…/backend/main.py
T
Mick d0299e0f23 Add health score, coverage trends, dependency map, PowerQuery playground, onboarding tracker
Tenant Health Score:
- CoverageSnapshot table stores daily health metrics (parser %, MITRE %, firing %)
- _compute_health() weighted formula: 40% parser coverage + 35% MITRE + 25% firing
  (reweighted 55/45 when firing cache empty)
- GET /api/coverage/health returns score + delta vs previous snapshot
- GET /api/coverage/snapshots returns chronological history for sparklines
- POST /api/coverage/snapshot for manual recording
- Auto-snapshot recorded at end of every sync-sources call
- Overview dashboard: prominent health score card with color coding, component
  breakdown, delta indicator, and inline SVG sparkline (last 30 points)

Rule Dependency Map:
- GET /api/coverage/dependency-map flips the coverage map — rule → required sources
- Each source flagged healthy/inactive/no_parser; at_risk = any source missing
- New section on Threat Coverage tab with at-risk filter toggle

PowerQuery Playground:
- New query.py router: GET /presets (7 curated queries) + POST /run
- New Query nav tab with time-range pills, preset buttons, localStorage history,
  monospace textarea, auto-column results table, client-side CSV export

Onboarding Tracker:
- GET /api/coverage/onboarding-status returns per-source pipeline progress
  across 6 stages: Data Received → Parser File → Parser Active → Source
  Labeled → Detection Rules → Rules Firing
- New section on Onboarding tab with emoji stage dots, progress bars,
  collapsed completed sources with show/hide toggle

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:09:43 -04:00

99 lines
3.3 KiB
Python

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from db import engine, Base, get_db, ParsedRule, RuleFiringCache, CoverageSnapshot
from routers import coverage, ingest, settings, quality, query
Base.metadata.create_all(bind=engine)
# Runtime migration: add columns that didn't exist in earlier schema versions
from sqlalchemy import text
with engine.connect() as _conn:
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
))
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS unlabelled BOOLEAN DEFAULT FALSE"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS rule_firing_cache ("
"id SERIAL PRIMARY KEY, "
"rule_name VARCHAR UNIQUE, "
"alert_count INTEGER DEFAULT 0, "
"period_days INTEGER DEFAULT 30, "
"checked_at TIMESTAMP"
")"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS coverage_snapshots ("
"id SERIAL PRIMARY KEY, "
"recorded_at TIMESTAMP, "
"health_score FLOAT DEFAULT 0, "
"parser_pct FLOAT DEFAULT 0, "
"mitre_pct FLOAT DEFAULT 0, "
"firing_pct FLOAT DEFAULT 0, "
"active_sources INTEGER DEFAULT 0, "
"covered_sources INTEGER DEFAULT 0, "
"rules_loaded INTEGER DEFAULT 0, "
"tactics_covered INTEGER DEFAULT 0, "
"techniques_covered INTEGER DEFAULT 0, "
"rules_with_mitre INTEGER DEFAULT 0, "
"rules_fired INTEGER DEFAULT 0"
")"
))
_conn.commit()
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
@app.on_event("startup")
async def auto_load_detections():
"""
Auto-load detection library rules on startup.
Tries the live S1 API first (accurate 'sources' field); falls back to extracted.json.
Skips if rules are already loaded — use the 'Sync Library' button to force a refresh.
"""
import os
from sqlalchemy.orm import Session
from services import s1_client
db: Session = next(get_db())
try:
existing = db.query(ParsedRule).filter_by(rule_type="library").count()
if existing > 0:
return # Already loaded — skip until user manually refreshes
# Try live API first
try:
rules = await s1_client.get_platform_rules()
if rules:
coverage._import_from_api_rules(db, rules)
return
except Exception:
pass
# Fall back to local file
detections_file = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
if os.path.exists(detections_file):
coverage._import_detections(db, detections_file)
finally:
db.close()
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3001"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(coverage.router, prefix="/api/coverage", tags=["Coverage"])
app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(quality.router, prefix="/api/quality", tags=["Quality"])
app.include_router(query.router, prefix="/api/query", tags=["Query"])
@app.get("/health")
def health():
return {"status": "ok"}