Use parsed event detection in data lake as coverage signal

- sync-sources now runs a parallel PowerQuery checking for event.type
  population per source; count stored in new active_sources.parser_detected
- Coverage map marks a source as covered if parser_detected > 0, even
  without a matching local parser file (handles built-in/cloud parsers)
- UI parser cell shows "Parsed (N typed events detected)" for data-lake-
  detected parsers vs named local parser files
- Runtime ALTER TABLE migration adds parser_detected column to existing DBs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mick
2026-05-19 13:06:29 -04:00
parent 81e3656c46
commit 1b07a59991
4 changed files with 49 additions and 10 deletions
+1
View File
@@ -36,6 +36,7 @@ class ActiveSource(Base):
source_name = Column(String, unique=True, index=True)
event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow)
parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
class IngestSnapshot(Base):
+8
View File
@@ -5,6 +5,14 @@ from routers import coverage, ingest, settings, quality
Base.metadata.create_all(bind=engine)
# Runtime migration: add columns that didn't exist in earlier schema versions
from sqlalchemy import text
with engine.connect() as _conn:
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
))
_conn.commit()
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
app.add_middleware(
+35 -9
View File
@@ -208,21 +208,38 @@ async def load_parser_content(payload: ParserContentPayload, db: Session = Depen
@router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
"""Pull active dataSource.names from the SDL and store them."""
"""Pull active dataSource.names from the SDL and store them.
Also detects whether a parser is already producing structured fields
for each source by checking if event.type is populated in the data lake.
"""
import asyncio
from datetime import datetime, timedelta
now = datetime.utcnow()
from_dt = (now - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_dt = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
try:
result = await s1_client.run_powerquery(
"| group events=count() by dataSource.name | sort -events | limit 200",
from_dt, to_dt
volume_result, parsed_result = await asyncio.gather(
s1_client.run_powerquery(
"| group events=count() by dataSource.name | sort -events | limit 200",
from_dt, to_dt
),
s1_client.run_powerquery(
"| filter event.type != '' | group parsed=count() by dataSource.name | limit 200",
from_dt, to_dt
),
)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
rows = result.get("events", [])
# Clear old and insert fresh
# Build lookup: source_name → count of parsed events seen
parsed_by_source: dict[str, int] = {}
for row in parsed_result.get("events", []):
name = row.get("dataSource.name")
if name:
parsed_by_source[name] = row.get("parsed", 0)
rows = volume_result.get("events", [])
db.query(ActiveSource).delete()
synced_at = datetime.utcnow()
seen = 0
@@ -233,6 +250,7 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
source_name=name,
event_count=row.get("events", 0),
synced_at=synced_at,
parser_detected=parsed_by_source.get(name, 0),
))
seen += 1
db.commit()
@@ -348,15 +366,22 @@ def get_coverage_map(db: Session = Depends(get_db)):
for src in active_sources:
parser_info = _find_parser_info(src.source_name)
parser_in_data = (src.parser_detected or 0) > 0
if parser_info and parser_info["format_type"] == "custom":
status = "covered"
matched_parser = parser_info["parser_name"]
format_type = "custom"
elif parser_info:
# grok or dottedJson — flag as needing a proper parser
elif parser_info and parser_info["format_type"] in ("grok", "dottedJson") and not parser_in_data:
# Known parser but primitive format and no evidence of parsing in data
status = "parser_needed"
matched_parser = parser_info["parser_name"]
format_type = parser_info["format_type"]
elif parser_in_data:
# Parsed fields detected in the data lake — a parser is running
status = "covered"
matched_parser = parser_info["parser_name"] if parser_info else "detected in data"
format_type = parser_info["format_type"] if parser_info else "unknown"
else:
status = "parser_needed"
matched_parser = None
@@ -375,7 +400,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
"status": status,
"parser": matched_parser,
"format_type": format_type,
"parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser else 0,
"parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser and matched_parser != "detected in data" else 0,
"parser_detected": src.parser_detected or 0,
"rules": rules_for_src,
"rule_count": len(rules_for_src),
"synced_at": src.synced_at.isoformat() if src.synced_at else None,
+5 -1
View File
@@ -272,7 +272,11 @@ function cvSetFilter(f) {
function parserCell(s) {
if (s.status === 'covered') {
return `<span class="text-gray-400">${esc(s.parser)} <span class="text-gray-600">(${s.parser_fields} fields)</span></span>`
if (s.parser === 'detected in data') {
return `<span class="text-emerald-400">✓ Parsed <span class="text-emerald-700">(${(s.parser_detected||0).toLocaleString()} typed events detected)</span></span>`
}
const detail = s.parser_fields ? ` (${s.parser_fields} fields)` : ''
return `<span class="text-gray-400">${esc(s.parser)}${detail}</span>`
}
if (s.parser && s.format_type && s.format_type !== 'custom') {
return `<span class="text-amber-400 italic">⚠ ${esc(s.parser)} <span class="text-amber-600">(${esc(s.format_type)} — needs custom parser)</span></span>`