diff --git a/backend/db.py b/backend/db.py
index 15708aa..aaa93fa 100644
--- a/backend/db.py
+++ b/backend/db.py
@@ -36,6 +36,7 @@ class ActiveSource(Base):
source_name = Column(String, unique=True, index=True)
event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow)
+ parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
class IngestSnapshot(Base):
diff --git a/backend/main.py b/backend/main.py
index a17f3eb..9a7fec9 100644
--- a/backend/main.py
+++ b/backend/main.py
@@ -5,6 +5,14 @@ from routers import coverage, ingest, settings, quality
Base.metadata.create_all(bind=engine)
+# Runtime migration: add columns that didn't exist in earlier schema versions
+from sqlalchemy import text
+with engine.connect() as _conn:
+ _conn.execute(text(
+ "ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
+ ))
+ _conn.commit()
+
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
app.add_middleware(
diff --git a/backend/routers/coverage.py b/backend/routers/coverage.py
index 534d34e..1f34757 100644
--- a/backend/routers/coverage.py
+++ b/backend/routers/coverage.py
@@ -208,21 +208,38 @@ async def load_parser_content(payload: ParserContentPayload, db: Session = Depen
@router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
- """Pull active dataSource.names from the SDL and store them."""
+ """Pull active dataSource.names from the SDL and store them.
+ Also detects whether a parser is already producing structured fields
+ for each source by checking if event.type is populated in the data lake.
+ """
+ import asyncio
from datetime import datetime, timedelta
now = datetime.utcnow()
from_dt = (now - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_dt = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
+
try:
- result = await s1_client.run_powerquery(
- "| group events=count() by dataSource.name | sort -events | limit 200",
- from_dt, to_dt
+ volume_result, parsed_result = await asyncio.gather(
+ s1_client.run_powerquery(
+ "| group events=count() by dataSource.name | sort -events | limit 200",
+ from_dt, to_dt
+ ),
+ s1_client.run_powerquery(
+ "| filter event.type != '' | group parsed=count() by dataSource.name | limit 200",
+ from_dt, to_dt
+ ),
)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
- rows = result.get("events", [])
- # Clear old and insert fresh
+ # Build lookup: source_name → count of parsed events seen
+ parsed_by_source: dict[str, int] = {}
+ for row in parsed_result.get("events", []):
+ name = row.get("dataSource.name")
+ if name:
+ parsed_by_source[name] = row.get("parsed", 0)
+
+ rows = volume_result.get("events", [])
db.query(ActiveSource).delete()
synced_at = datetime.utcnow()
seen = 0
@@ -233,6 +250,7 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
source_name=name,
event_count=row.get("events", 0),
synced_at=synced_at,
+ parser_detected=parsed_by_source.get(name, 0),
))
seen += 1
db.commit()
@@ -348,15 +366,22 @@ def get_coverage_map(db: Session = Depends(get_db)):
for src in active_sources:
parser_info = _find_parser_info(src.source_name)
+ parser_in_data = (src.parser_detected or 0) > 0
+
if parser_info and parser_info["format_type"] == "custom":
status = "covered"
matched_parser = parser_info["parser_name"]
format_type = "custom"
- elif parser_info:
- # grok or dottedJson — flag as needing a proper parser
+ elif parser_info and parser_info["format_type"] in ("grok", "dottedJson") and not parser_in_data:
+ # Known parser but primitive format and no evidence of parsing in data
status = "parser_needed"
matched_parser = parser_info["parser_name"]
format_type = parser_info["format_type"]
+ elif parser_in_data:
+ # Parsed fields detected in the data lake — a parser is running
+ status = "covered"
+ matched_parser = parser_info["parser_name"] if parser_info else "detected in data"
+ format_type = parser_info["format_type"] if parser_info else "unknown"
else:
status = "parser_needed"
matched_parser = None
@@ -375,7 +400,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
"status": status,
"parser": matched_parser,
"format_type": format_type,
- "parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser else 0,
+ "parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser and matched_parser != "detected in data" else 0,
+ "parser_detected": src.parser_detected or 0,
"rules": rules_for_src,
"rule_count": len(rules_for_src),
"synced_at": src.synced_at.isoformat() if src.synced_at else None,
diff --git a/frontend/index.html b/frontend/index.html
index ae80c32..7ae3d96 100644
--- a/frontend/index.html
+++ b/frontend/index.html
@@ -272,7 +272,11 @@ function cvSetFilter(f) {
function parserCell(s) {
if (s.status === 'covered') {
- return `${esc(s.parser)} (${s.parser_fields} fields)`
+ if (s.parser === 'detected in data') {
+ return `✓ Parsed (${(s.parser_detected||0).toLocaleString()} typed events detected)`
+ }
+ const detail = s.parser_fields ? ` (${s.parser_fields} fields)` : ''
+ return `${esc(s.parser)}${detail}`
}
if (s.parser && s.format_type && s.format_type !== 'custom') {
return `⚠ ${esc(s.parser)} (${esc(s.format_type)} — needs custom parser)`