Improve coverage map matching, bar chart gradients, and add 1h time filter

- Coverage map: replace filename fuzzy-match with exact dataSource.name
  lookup read directly from parser file attributes; grok/dottedJson parsers
  now flagged as "parser_needed" with format type shown in the UI
- Bar chart: SVG linearGradient (light purple → deep violet) replaces flat fill
- Ingest dashboard: add 1h button (first option) backed by new backend
  hours= query param on /api/ingest/top-sources; daily-volume chart shows
  informational message when in 1h mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mick
2026-05-19 12:43:10 -04:00
parent f0bd56aee8
commit ac97196435
3 changed files with 142 additions and 36 deletions
+67 -16
View File
@@ -239,35 +239,72 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name")]}
def _build_parser_ds_index() -> dict[str, dict]:
"""
Read all parser files from /app/parsers/ and build an index:
dataSource.name (exact, from parser attributes) → {parser_name, format_type}
Format type is "grok", "dottedJson", or "custom".
Sources with grok/dottedJson parsers are flagged as needing a proper parser.
"""
import os, re
parsers_dir = "/app/parsers"
_DS_NAME_RE = re.compile(r'"dataSource\.name"\s*:\s*"([^"]+)"')
_FORMAT_TYPE_RE = re.compile(r'"type"\s*:\s*"([^"]+)"')
index: dict[str, dict] = {}
try:
entries = [e for e in os.scandir(parsers_dir) if e.is_file() and not e.name.startswith(".")]
except FileNotFoundError:
return index
for entry in entries:
try:
with open(entry.path, "r", encoding="utf-8", errors="replace") as fh:
content = fh.read()
except Exception:
continue
# Extract dataSource.name (may appear multiple times — take first)
ds_match = _DS_NAME_RE.search(content)
if not ds_match:
continue
ds_name = ds_match.group(1).strip()
# Determine format type — look for grok/dottedJson/custom in "type" values
format_types = {m.group(1).lower() for m in _FORMAT_TYPE_RE.finditer(content)}
if "grok" in format_types:
fmt = "grok"
elif "dottedjson" in format_types:
fmt = "dottedJson"
else:
fmt = "custom"
index[ds_name] = {"parser_name": entry.name, "format_type": fmt}
return index
@router.get("/map")
def get_coverage_map(db: Session = Depends(get_db)):
"""
Source-centric coverage map.
For each active dataSource.name in the SDL:
- covered = a parser is loaded for it
- parser_needed = no parser loaded
- covered = a custom parser is loaded for it (dataSource.name matches)
- parser_needed = no parser, OR parser uses grok/dottedJson format
Also surfaces which STAR rules reference each source.
"""
active_sources = db.query(ActiveSource).order_by(ActiveSource.event_count.desc()).all()
parser_fields_rows = db.query(ParserField).all()
rules = db.query(ParsedRule).all()
# parser_name → set of field names
# parser_name → set of field names (for field count display)
parser_index: dict[str, set] = {}
for pf in parser_fields_rows:
parser_index.setdefault(pf.parser_name, set()).add(pf.field_name)
# Build a fuzzy match: dataSource.name → parser_name
# Parser names like "paloalto", "palo", "okta_authentication-latest" need to match
# "Palo Alto Networks Firewall", "Okta", etc.
def _find_parser(source_name: str) -> str | None:
sn = source_name.lower().replace(" ", "").replace("-", "").replace("_", "")
for pname in parser_index:
pn = pname.lower().replace(" ", "").replace("-", "").replace("_", "")
# Direct substring match in either direction
if pn in sn or sn in pn:
return pname
return None
# Build exact dataSource.name → {parser_name, format_type} index from parser files
ds_index = _build_parser_ds_index()
# Build rule index: source_name → rules that reference it
rule_by_source: dict[str, list] = {}
@@ -285,8 +322,21 @@ def get_coverage_map(db: Session = Depends(get_db)):
needed_count = 0
for src in active_sources:
matched_parser = _find_parser(src.source_name)
status = "covered" if matched_parser else "parser_needed"
parser_info = ds_index.get(src.source_name)
if parser_info and parser_info["format_type"] == "custom":
status = "covered"
matched_parser = parser_info["parser_name"]
format_type = "custom"
elif parser_info:
# grok or dottedJson — flag as needing a proper parser
status = "parser_needed"
matched_parser = parser_info["parser_name"]
format_type = parser_info["format_type"]
else:
status = "parser_needed"
matched_parser = None
format_type = None
if status == "covered":
covered_count += 1
else:
@@ -299,6 +349,7 @@ def get_coverage_map(db: Session = Depends(get_db)):
"event_count": src.event_count,
"status": status,
"parser": matched_parser,
"format_type": format_type,
"parser_fields": len(parser_index.get(matched_parser, set())) if matched_parser else 0,
"rules": rules_for_src,
"rule_count": len(rules_for_src),