Files
marcredhat-siem-toolkit-pat…/backend/db.py
T
Mick 7922de315e Add MITRE ATT&CK heatmap and detection rule firing status
MITRE ATT&CK heatmap:
- _extract_mitre() helper extracts tactics/techniques from S1 API rules
  handling multiple field name conventions (tactic, mitreTechniques, etc.)
- _import_from_api_rules and _import_detections now store tactics/techniques
  in raw JSON alongside data_sources
- GET /api/coverage/mitre returns tactic/technique breakdown ordered by
  ATT&CK kill chain with coverage stats
- New "Threat Coverage" tab in frontend: stat cards (total rules, MITRE
  mapped, tactics covered, techniques covered), tactic cards grid with
  left-border color coding and technique chips with "+N more" expander

Detection rule firing status:
- RuleFiringCache table tracks alert_count per rule_name
- POST /api/coverage/sync-rule-firing queries SDL PowerQuery with 3
  field-name patterns to find rule firing data; upserts into cache
- GET /api/coverage/rule-firing-cache returns cache sorted by alert count
- /map now includes alert_count per rule and firing_cache_populated flag
- Coverage map Detections column: when cache populated, shows alert count
  in green or ⚠ amber for rules that have never fired

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:25:45 -04:00

66 lines
2.1 KiB
Python

import os
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Boolean
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://siem:siem@db:5432/siem")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class ParsedRule(Base):
__tablename__ = "parsed_rules"
id = Column(Integer, primary_key=True)
rule_id = Column(String, unique=True, index=True)
name = Column(String)
rule_type = Column(String) # 'star' or 'sigma'
fields_used = Column(JSONB)
raw = Column(Text)
cached_at = Column(DateTime, default=datetime.utcnow)
class ParserField(Base):
__tablename__ = "parser_fields"
id = Column(Integer, primary_key=True)
parser_name = Column(String, index=True)
field_name = Column(String)
field_type = Column(String)
class ActiveSource(Base):
__tablename__ = "active_sources"
id = Column(Integer, primary_key=True)
source_name = Column(String, unique=True, index=True)
event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow)
parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
unlabelled = Column(Boolean, default=False) # True = events had no dataSource.name
class IngestSnapshot(Base):
__tablename__ = "ingest_snapshots"
id = Column(Integer, primary_key=True)
period_days = Column(Integer)
data = Column(JSONB)
recorded_at = Column(DateTime, default=datetime.utcnow)
class RuleFiringCache(Base):
__tablename__ = "rule_firing_cache"
id = Column(Integer, primary_key=True)
rule_name = Column(String, unique=True, index=True)
alert_count = Column(Integer, default=0)
period_days = Column(Integer, default=30)
checked_at = Column(DateTime, default=datetime.utcnow)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()