diff --git a/backend/main.py b/backend/main.py index 5e2d532..a17f3eb 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from db import engine, Base -from routers import coverage, ingest, settings +from routers import coverage, ingest, settings, quality Base.metadata.create_all(bind=engine) @@ -18,6 +18,7 @@ app.add_middleware( app.include_router(coverage.router, prefix="/api/coverage", tags=["Coverage"]) app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) +app.include_router(quality.router, prefix="/api/quality", tags=["Quality"]) @app.get("/health") diff --git a/backend/routers/quality.py b/backend/routers/quality.py new file mode 100644 index 0000000..7b266b7 --- /dev/null +++ b/backend/routers/quality.py @@ -0,0 +1,233 @@ +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from datetime import datetime, timedelta +from services import s1_client +import re + +router = APIRouter() + + +def _date_range_hours(hours: int) -> tuple[str, str]: + now = datetime.utcnow() + return ( + (now - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z"), + now.strftime("%Y-%m-%dT%H:%M:%S.000Z"), + ) + + +# --------------------------------------------------------------------------- +# Models +# --------------------------------------------------------------------------- + +class SampleEventsRequest(BaseModel): + source: str + limit: int = 20 + hours: int = 1 + + +class FieldPopulationRequest(BaseModel): + source: str + hours: int = 24 + fields: list[str] = [ + "src.ip", + "src.port", + "dst.ip", + "dst.port", + "user.name", + "event.type", + "src.process.name", + "src.process.cmdline", + "tgt.file.path", + "network.direction", + "dataSource.name", + ] + + +class TestParserRequest(BaseModel): + parser_name: str + log_line: str + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _flatten_event(event: dict) -> dict: + """Return a flat field→value dict from a PowerQuery result row.""" + if isinstance(event, dict): + return {k: v for k, v in event.items()} + return {} + + +def _extract_format_strings(content: str) -> list[str]: + """ + Extract SDL format string values from augmented-JSON parser content. + Matches: "format": "..." (double-quoted value, supports escaped quotes). + """ + pattern = re.compile(r'"format"\s*:\s*"((?:[^"\\]|\\.)*)"') + return pattern.findall(content) + + +def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]: + """ + Convert an SDL format string to a compiled Python regex. + + Returns (compiled_pattern, py_group_to_sdl_field) mapping so callers can + translate group names back to the original SDL field names. + + Raises re.error if the resulting pattern cannot be compiled. + """ + # Split on $...$ tokens + token_pattern = re.compile(r'\$([^$]+)\$') + parts = token_pattern.split(fmt) + # parts alternates: literal, token, literal, token, ... + + regex_parts: list[str] = [] + py_group_to_sdl: dict[str, str] = {} + seen_groups: dict[str, int] = {} + + for i, part in enumerate(parts): + if i % 2 == 0: + # Literal text + regex_parts.append(re.escape(part)) + else: + # Token: either "field.name=PATTERN" or just "field.name" + if '=' in part: + field_name, pattern = part.split('=', 1) + else: + field_name = part + pattern = r'[^\s]+' + + # Build a valid Python group name + safe = re.sub(r'[.\-]', '_', field_name) + if safe in seen_groups: + seen_groups[safe] += 1 + safe = f"{safe}_{seen_groups[safe]}" + else: + seen_groups[safe] = 0 + + py_group_to_sdl[safe] = field_name + regex_parts.append(f'(?P<{safe}>{pattern})') + + compiled = re.compile(''.join(regex_parts), re.IGNORECASE) + return compiled, py_group_to_sdl + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@router.post("/sample-events") +async def sample_events(req: SampleEventsRequest): + """Return a sample of raw events from a given data source.""" + query = f'| filter dataSource.name = "{req.source}" | limit {req.limit}' + from_dt, to_dt = _date_range_hours(req.hours) + + result = await s1_client.run_powerquery(query, from_dt, to_dt) + + rows = result if isinstance(result, list) else (result.get("rows") or result.get("events") or []) + events = [_flatten_event(row) for row in rows] + + return { + "source": req.source, + "events": events, + "count": len(events), + "hours": req.hours, + } + + +@router.post("/field-population") +async def field_population(req: FieldPopulationRequest): + """ + Analyse how consistently each requested field is populated across a sample + of events from a data source. + """ + query = f'| filter dataSource.name = "{req.source}" | limit 500' + from_dt, to_dt = _date_range_hours(req.hours) + + result = await s1_client.run_powerquery(query, from_dt, to_dt) + + rows = result if isinstance(result, list) else (result.get("rows") or result.get("events") or []) + events = [_flatten_event(row) for row in rows] + + if not events: + raise HTTPException(status_code=404, detail=f"No events found for source '{req.source}' in the last {req.hours} hours.") + + total = len(events) + _empty = {None, "", "null"} + + # Collect all field names seen across the sample (useful for surfacing what IS there) + all_seen_fields = sorted({k for ev in events for k in ev}) + + field_stats = [] + for field in req.fields: + # dataSource.name is always 100% — we filtered by it; Scalyr just doesn't echo it back + if field == "dataSource.name": + populated = total + else: + populated = sum(1 for ev in events if ev.get(field) not in _empty) + rate = round((populated / total) * 100, 1) + field_stats.append({ + "field": field, + "populated": populated, + "total": total, + "rate": rate, + }) + + # Sort ascending by rate (worst coverage first) + field_stats.sort(key=lambda x: x["rate"]) + + return { + "source": req.source, + "total_sampled": total, + "hours": req.hours, + "fields": field_stats, + "fields_seen_in_sample": all_seen_fields, + } + + +@router.post("/test-parser") +async def test_parser(req: TestParserRequest): + """ + Test a parser against a raw log line by extracting and matching SDL format + strings found in the parser file. + """ + parser_path = f"/app/parsers/{req.parser_name}" + + try: + with open(parser_path, "r", encoding="utf-8") as fh: + content = fh.read() + except FileNotFoundError: + raise HTTPException(status_code=404, detail=f"Parser file not found: {req.parser_name}") + except OSError as exc: + raise HTTPException(status_code=500, detail=f"Could not read parser file: {exc}") + + format_strings = _extract_format_strings(content) + + for fmt in format_strings: + try: + compiled, py_to_sdl = _sdl_format_to_regex(fmt) + except re.error: + # Skip unparseable format strings + continue + + match = compiled.search(req.log_line) + if match: + fields = [ + {"field": py_to_sdl.get(group, group), "value": value} + for group, value in match.groupdict().items() + if value is not None + ] + return { + "parser_name": req.parser_name, + "matched": True, + "format_matched": fmt, + "fields": fields, + } + + return { + "parser_name": req.parser_name, + "matched": False, + "message": "No format pattern matched", + "fields": [], + } diff --git a/frontend/index.html b/frontend/index.html index 8b35b57..ae80c32 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -20,6 +20,7 @@ Overview Parser Coverage Ingest Dashboard + Parser Quality Onboarding
Inspect live events · measure field coverage · test parser patterns
+Pull recent raw events from a source and see exactly which fields landed — and which are missing.
+Sample up to 500 events and measure what % have each key field populated. Low rates flag parser extraction failures.
+Paste a raw log line and pick a loaded parser — see which fields the format patterns would extract without deploying anything.
+Querying data lake…
' + try { + const r = await apiPost('/api/quality/sample-events', { + source, + limit: +document.getElementById('qs-limit').value, + hours: +document.getElementById('qs-hours').value, + }) + if (!r.events?.length) { + document.getElementById('qs-result').innerHTML = 'No events found for this source in the selected window.
' + return + } + // Collect all field names across events + const allFields = [...new Set(r.events.flatMap(e => Object.keys(e)))].sort() + const rows = r.events.map(ev => { + const cells = allFields.map(f => { + const v = ev[f] + const empty = v === null || v === undefined || v === '' || v === 'null' + return `${r.count} events · ${r.hours}h window · ${allFields.length} fields seen
+Sampling events…
' + try { + const fieldsRaw = document.getElementById('qp-fields').value + const fields = fieldsRaw.split(',').map(f => f.trim()).filter(Boolean) + const r = await apiPost('/api/quality/field-population', { + source, hours: +document.getElementById('qp-hours').value, fields + }) + const rows = r.fields.map(f => { + const pct = f.rate + const color = pct >= 80 ? 'bg-emerald-500' : pct >= 40 ? 'bg-amber-500' : 'bg-red-500' + const textColor = pct >= 80 ? 'text-emerald-400' : pct >= 40 ? 'text-amber-400' : 'text-red-400' + return `${r.total_sampled} events sampled · ${r.hours}h window — sorted by worst coverage first
+| Field | +Rate | +Coverage | +Events | +
|---|
Fields actually present in sample (${r.fields_seen_in_sample.length} total)
+Testing…
' + try { + const r = await apiPost('/api/quality/test-parser', { parser_name: parser, log_line: log }) + if (!r.matched) { + document.getElementById('qt-result').innerHTML = ` +The parser's format strings didn't produce a match. Check that the log sample matches the expected format, or that the parser has SDL format strings (some parsers use grok/dottedJson which aren't tested here).
+| Field | +Extracted Value | +
|---|