From 1b07a59991dcf97bd3c0d0789454fcaf27003b8e Mon Sep 17 00:00:00 2001 From: Mick <119439091+mickbrowns1@users.noreply.github.com> Date: Tue, 19 May 2026 13:06:29 -0400 Subject: [PATCH] Use parsed event detection in data lake as coverage signal - sync-sources now runs a parallel PowerQuery checking for event.type population per source; count stored in new active_sources.parser_detected - Coverage map marks a source as covered if parser_detected > 0, even without a matching local parser file (handles built-in/cloud parsers) - UI parser cell shows "Parsed (N typed events detected)" for data-lake- detected parsers vs named local parser files - Runtime ALTER TABLE migration adds parser_detected column to existing DBs Co-Authored-By: Claude Sonnet 4.6 --- backend/db.py | 1 + backend/main.py | 8 +++++++ backend/routers/coverage.py | 44 +++++++++++++++++++++++++++++-------- frontend/index.html | 6 ++++- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/backend/db.py b/backend/db.py index 15708aa..aaa93fa 100644 --- a/backend/db.py +++ b/backend/db.py @@ -36,6 +36,7 @@ class ActiveSource(Base): source_name = Column(String, unique=True, index=True) event_count = Column(Integer, default=0) synced_at = Column(DateTime, default=datetime.utcnow) + parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake class IngestSnapshot(Base): diff --git a/backend/main.py b/backend/main.py index a17f3eb..9a7fec9 100644 --- a/backend/main.py +++ b/backend/main.py @@ -5,6 +5,14 @@ from routers import coverage, ingest, settings, quality Base.metadata.create_all(bind=engine) +# Runtime migration: add columns that didn't exist in earlier schema versions +from sqlalchemy import text +with engine.connect() as _conn: + _conn.execute(text( + "ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0" + )) + _conn.commit() + app = FastAPI(title="SIEM Toolkit", version="1.0.0") app.add_middleware( diff --git a/backend/routers/coverage.py b/backend/routers/coverage.py index 534d34e..1f34757 100644 --- a/backend/routers/coverage.py +++ b/backend/routers/coverage.py @@ -208,21 +208,38 @@ async def load_parser_content(payload: ParserContentPayload, db: Session = Depen @router.post("/sync-sources") async def sync_sources(days: int = 7, db: Session = Depends(get_db)): - """Pull active dataSource.names from the SDL and store them.""" + """Pull active dataSource.names from the SDL and store them. + Also detects whether a parser is already producing structured fields + for each source by checking if event.type is populated in the data lake. + """ + import asyncio from datetime import datetime, timedelta now = datetime.utcnow() from_dt = (now - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z") to_dt = now.strftime("%Y-%m-%dT%H:%M:%S.000Z") + try: - result = await s1_client.run_powerquery( - "| group events=count() by dataSource.name | sort -events | limit 200", - from_dt, to_dt + volume_result, parsed_result = await asyncio.gather( + s1_client.run_powerquery( + "| group events=count() by dataSource.name | sort -events | limit 200", + from_dt, to_dt + ), + s1_client.run_powerquery( + "| filter event.type != '' | group parsed=count() by dataSource.name | limit 200", + from_dt, to_dt + ), ) except Exception as e: raise HTTPException(502, f"PowerQuery error: {e}") - rows = result.get("events", []) - # Clear old and insert fresh + # Build lookup: source_name → count of parsed events seen + parsed_by_source: dict[str, int] = {} + for row in parsed_result.get("events", []): + name = row.get("dataSource.name") + if name: + parsed_by_source[name] = row.get("parsed", 0) + + rows = volume_result.get("events", []) db.query(ActiveSource).delete() synced_at = datetime.utcnow() seen = 0 @@ -233,6 +250,7 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)): source_name=name, event_count=row.get("events", 0), synced_at=synced_at, + parser_detected=parsed_by_source.get(name, 0), )) seen += 1 db.commit() @@ -348,15 +366,22 @@ def get_coverage_map(db: Session = Depends(get_db)): for src in active_sources: parser_info = _find_parser_info(src.source_name) + parser_in_data = (src.parser_detected or 0) > 0 + if parser_info and parser_info["format_type"] == "custom": status = "covered" matched_parser = parser_info["parser_name"] format_type = "custom" - elif parser_info: - # grok or dottedJson — flag as needing a proper parser + elif parser_info and parser_info["format_type"] in ("grok", "dottedJson") and not parser_in_data: + # Known parser but primitive format and no evidence of parsing in data status = "parser_needed" matched_parser = parser_info["parser_name"] format_type = parser_info["format_type"] + elif parser_in_data: + # Parsed fields detected in the data lake — a parser is running + status = "covered" + matched_parser = parser_info["parser_name"] if parser_info else "detected in data" + format_type = parser_info["format_type"] if parser_info else "unknown" else: status = "parser_needed" matched_parser = None @@ -375,7 +400,8 @@ def get_coverage_map(db: Session = Depends(get_db)): "status": status, "parser": matched_parser, "format_type": format_type, - "parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser else 0, + "parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser and matched_parser != "detected in data" else 0, + "parser_detected": src.parser_detected or 0, "rules": rules_for_src, "rule_count": len(rules_for_src), "synced_at": src.synced_at.isoformat() if src.synced_at else None, diff --git a/frontend/index.html b/frontend/index.html index ae80c32..7ae3d96 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -272,7 +272,11 @@ function cvSetFilter(f) { function parserCell(s) { if (s.status === 'covered') { - return `${esc(s.parser)} (${s.parser_fields} fields)` + if (s.parser === 'detected in data') { + return `✓ Parsed (${(s.parser_detected||0).toLocaleString()} typed events detected)` + } + const detail = s.parser_fields ? ` (${s.parser_fields} fields)` : '' + return `${esc(s.parser)}${detail}` } if (s.parser && s.format_type && s.format_type !== 'custom') { return `⚠ ${esc(s.parser)} (${esc(s.format_type)} — needs custom parser)`