Add product grouping to rule displays across coverage and threat pages

- Extract product label from rule data_sources in coverage.py via new
  _product_from_data_sources() helper (prefers non-SentinelOne entries
  so product-specific rules get a meaningful label)
- Coverage Map detections column: rules now grouped by product with
  collapsible chevron headers showing fired/silent counts
- Threat Coverage Rule Firing Status: collapsible product group headers
  with active/silent summary; shows all 2066 rules across 30 products
- Threat Coverage Dependency Map: collapsible product groups, at-risk
  products sorted first with risk count in header
- Ingest Dashboard: fix source name truncation — table cells now wrap
  with break-all and title tooltip; bar chart labels extended to 16
  chars with ellipsis and full-name tooltip on hover

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mick
2026-05-22 11:56:27 -04:00
parent bb2c00f2fa
commit 7620d1fcc8
2 changed files with 192 additions and 55 deletions
+28 -1
View File
@@ -81,6 +81,17 @@ def _extract_mitre(rule: dict) -> tuple[list[str], list[dict]]:
return list(dict.fromkeys(tactics)), unique_techniques
def _product_from_data_sources(data_sources: list) -> str:
"""Derive a product label from a rule's data_sources list.
Prefers the first non-SentinelOne entry (e.g. 'AWS CloudTrail', 'Okta'),
falls back to 'SentinelOne' for generic endpoint rules.
"""
if not data_sources:
return "SentinelOne"
non_s1 = [d for d in data_sources if d.lower() not in ("sentinelone", "s1")]
return non_s1[0] if non_s1 else data_sources[0]
def _star_query_texts(rule: dict) -> list[str]:
"""
Extract all PowerQuery/filter strings from a STAR rule.
@@ -732,8 +743,10 @@ def get_coverage_map(db: Session = Depends(get_db)):
query_texts = _star_query_texts(raw_data)
data_sources = rule_parser.extract_data_sources(query_texts)
product = _product_from_data_sources(data_sources)
for ds in data_sources:
rule_by_source.setdefault(ds, []).append({"rule": rule.name, "type": rule.rule_type})
rule_by_source.setdefault(ds, []).append({"rule": rule.name, "type": rule.rule_type, "product": product})
# Fields to ignore when computing "missing" — these are metadata/schema fields
# always present in events regardless of the parser
@@ -789,6 +802,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
for r in rule_by_source.get(src.source_name, [])
if r["type"] == "library"
]
# Sort rules so grouped-by-product rendering is stable
rules_for_src.sort(key=lambda r: (r.get("product", ""), r["rule"]))
# Close-match suggestions — shown when there are no library rules for this source.
close_matches: list = []
@@ -1062,6 +1077,16 @@ def get_rule_firing_cache(db: Session = Depends(get_db)):
never_fired_count = total_rules - len(fired)
period_days = rows[0].period_days if rows else 30
checked_at = rows[0].checked_at.isoformat() if rows and rows[0].checked_at else None
# Build rule_name → product lookup from ParsedRule raw JSON
rule_product: dict[str, str] = {}
for rule in db.query(ParsedRule).filter_by(rule_type="library").all():
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
rule_product[rule.name] = _product_from_data_sources(raw_data.get("data_sources", []))
return {
"rules": [
{
@@ -1069,6 +1094,7 @@ def get_rule_firing_cache(db: Session = Depends(get_db)):
"alert_count": r.alert_count,
"period_days": r.period_days,
"checked_at": r.checked_at.isoformat() if r.checked_at else None,
"product": rule_product.get(r.rule_name, "SentinelOne"),
}
for r in rows
],
@@ -1279,6 +1305,7 @@ def get_dependency_map(db: Session = Depends(get_db)):
"generated_alerts": generated_alerts,
"at_risk": at_risk,
"no_sources": len(data_sources) == 0,
"product": _product_from_data_sources(data_sources),
})
# Sort: at-risk first, then by source count desc, then alphabetical