mirror of
https://github.com/marcredhat/SIEM-toolkit-patched
synced 2026-06-09 12:57:13 +00:00
Fix coverage map matching: three-tier lookup for parser-to-source mapping
1. Exact dataSource.name match 2. Normalized substring on parser's dataSource.name attribute 3. Normalized substring on parser filename (catches files with wrong ds name) Fixes CloudTrail (filename aws_cloudtrail-latest matches "cloudtrail") and Palo Alto Networks Firewall (ds name "Palo Alto Networks" matches via substring). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -303,9 +303,34 @@ def get_coverage_map(db: Session = Depends(get_db)):
|
||||
for pf in parser_fields_rows:
|
||||
parser_index.setdefault(pf.parser_name, set()).add(pf.field_name)
|
||||
|
||||
# Build exact dataSource.name → {parser_name, format_type} index from parser files
|
||||
# Build dataSource.name → {parser_name, format_type} index from parser files
|
||||
ds_index = _build_parser_ds_index()
|
||||
|
||||
def _normalize(s: str) -> str:
|
||||
return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "")
|
||||
|
||||
def _find_parser_info(source_name: str) -> dict | None:
|
||||
"""
|
||||
Match priority:
|
||||
1. Exact dataSource.name match
|
||||
2. Normalized substring: active source name ↔ parser dataSource.name
|
||||
3. Normalized substring: active source name ↔ parser filename
|
||||
(catches cases where the parser file has a wrong dataSource.name)
|
||||
"""
|
||||
# 1. Exact match on dataSource.name
|
||||
if source_name in ds_index:
|
||||
return ds_index[source_name]
|
||||
sn = _normalize(source_name)
|
||||
# 2. Normalized ds_name substring
|
||||
for ds_name, info in ds_index.items():
|
||||
if _normalize(ds_name) in sn or sn in _normalize(ds_name):
|
||||
return info
|
||||
# 3. Normalized filename substring
|
||||
for info in ds_index.values():
|
||||
if _normalize(info["parser_name"]) in sn or sn in _normalize(info["parser_name"]):
|
||||
return info
|
||||
return None
|
||||
|
||||
# Build rule index: source_name → rules that reference it
|
||||
rule_by_source: dict[str, list] = {}
|
||||
for rule in rules:
|
||||
@@ -322,7 +347,7 @@ def get_coverage_map(db: Session = Depends(get_db)):
|
||||
needed_count = 0
|
||||
|
||||
for src in active_sources:
|
||||
parser_info = ds_index.get(src.source_name)
|
||||
parser_info = _find_parser_info(src.source_name)
|
||||
if parser_info and parser_info["format_type"] == "custom":
|
||||
status = "covered"
|
||||
matched_parser = parser_info["parser_name"]
|
||||
|
||||
Reference in New Issue
Block a user