From 81e3656c46a47f3667725fc721ca2c73437c05cb Mon Sep 17 00:00:00 2001 From: Mick <119439091+mickbrowns1@users.noreply.github.com> Date: Tue, 19 May 2026 12:56:51 -0400 Subject: [PATCH] Fix coverage map matching: three-tier lookup for parser-to-source mapping 1. Exact dataSource.name match 2. Normalized substring on parser's dataSource.name attribute 3. Normalized substring on parser filename (catches files with wrong ds name) Fixes CloudTrail (filename aws_cloudtrail-latest matches "cloudtrail") and Palo Alto Networks Firewall (ds name "Palo Alto Networks" matches via substring). Co-Authored-By: Claude Sonnet 4.6 --- backend/routers/coverage.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/backend/routers/coverage.py b/backend/routers/coverage.py index adb7b82..534d34e 100644 --- a/backend/routers/coverage.py +++ b/backend/routers/coverage.py @@ -303,9 +303,34 @@ def get_coverage_map(db: Session = Depends(get_db)): for pf in parser_fields_rows: parser_index.setdefault(pf.parser_name, set()).add(pf.field_name) - # Build exact dataSource.name → {parser_name, format_type} index from parser files + # Build dataSource.name → {parser_name, format_type} index from parser files ds_index = _build_parser_ds_index() + def _normalize(s: str) -> str: + return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "") + + def _find_parser_info(source_name: str) -> dict | None: + """ + Match priority: + 1. Exact dataSource.name match + 2. Normalized substring: active source name ↔ parser dataSource.name + 3. Normalized substring: active source name ↔ parser filename + (catches cases where the parser file has a wrong dataSource.name) + """ + # 1. Exact match on dataSource.name + if source_name in ds_index: + return ds_index[source_name] + sn = _normalize(source_name) + # 2. Normalized ds_name substring + for ds_name, info in ds_index.items(): + if _normalize(ds_name) in sn or sn in _normalize(ds_name): + return info + # 3. Normalized filename substring + for info in ds_index.values(): + if _normalize(info["parser_name"]) in sn or sn in _normalize(info["parser_name"]): + return info + return None + # Build rule index: source_name → rules that reference it rule_by_source: dict[str, list] = {} for rule in rules: @@ -322,7 +347,7 @@ def get_coverage_map(db: Session = Depends(get_db)): needed_count = 0 for src in active_sources: - parser_info = ds_index.get(src.source_name) + parser_info = _find_parser_info(src.source_name) if parser_info and parser_info["format_type"] == "custom": status = "covered" matched_parser = parser_info["parser_name"]