Add unlabelled event detection, stub parser quality, Sync All, and modern UI redesign

Key changes:
- Unlabelled event banner: shows count only after Sample Events is clicked; uses broad SDL filter expression; time window synced to sync-days dropdown
- Parser Quality: new "Attributes Missing" subsection listing all parsers without dataSource.name regardless of event volume
- Coverage map: filter buttons (All / Complete Parser / Attributes Missing); stat card renamed to "Incomplete Parser"; stub count excluded from sync when no active sources
- Sync All button: runs SDL parser sync → library sync → live sources sync in sequence
- Reset now clears ActiveSource table and resets unlabelled count cache
- run_powerquery: configurable max_count param (default 1000, 50M for count queries)
- _DS_NAME_RE: supports both quoted and unquoted dataSource.name keys in parser files
- Full modern UI redesign: slate palette, gradient cards, ring borders, pill nav, colored stat accents
- Updated 7 tracked parser files synced from SDL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mick
2026-05-22 10:00:21 -04:00
parent 0013adbe7e
commit c5a4f796a0
15 changed files with 3498 additions and 469 deletions
+2 -1
View File
@@ -1,5 +1,5 @@
import os
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Boolean
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
@@ -37,6 +37,7 @@ class ActiveSource(Base):
event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow)
parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
unlabelled = Column(Boolean, default=False) # True = events had no dataSource.name
class IngestSnapshot(Base):
+3
View File
@@ -11,6 +11,9 @@ with engine.connect() as _conn:
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
))
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS unlabelled BOOLEAN DEFAULT FALSE"
))
_conn.commit()
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
+173 -16
View File
@@ -207,16 +207,109 @@ async def upload_sigma(files: list[UploadFile] = File(...), db: Session = Depend
return {"loaded": len(loaded), "rules": loaded}
def _fetch_parsers_from_console(parsers_dir: str) -> dict:
"""
Fetch every parser under /logParsers/ from the SDL console and write them
to parsers_dir. Uses SDL_CONFIG_READ_KEY (needs 'Manage config files' permission)
and SDL_XDR_URL from the environment.
Returns {"fetched": N, "failed": [...], "skipped": reason_or_None}
"""
import urllib.request, urllib.error, json as _json, os as _os
# Read live from .env file so Settings-page saves are picked up without restart
def _env_val(key: str) -> str:
val = _os.environ.get(key, "")
if not val:
env_path = _os.environ.get("ENV_FILE_PATH", "/app/.env")
try:
for line in open(env_path).read().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
if k.strip() == key:
val = v.strip()
break
except Exception:
pass
return val
config_key = _env_val("SDL_CONFIG_READ_KEY")
base_url = _env_val("SDL_XDR_URL").rstrip("/")
if not config_key:
return {"fetched": 0, "failed": [], "skipped": "SDL_CONFIG_READ_KEY not set"}
if not base_url:
return {"fetched": 0, "failed": [], "skipped": "SDL_XDR_URL not set"}
def _post(path: str, params: dict) -> dict:
url = f"{base_url}{path}"
body = _json.dumps({**params, "token": config_key}).encode()
req = urllib.request.Request(url, data=body, headers={
"Authorization": f"Bearer {config_key}",
"Content-Type": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return _json.loads(r.read())
except urllib.error.HTTPError as e:
err_body = e.read().decode(errors="replace")[:300]
raise RuntimeError(f"HTTP {e.code} {path}: {err_body}")
# List all parser paths
res = _post("/api/listFiles", {"pathPrefix": "/logParsers/"})
# Support multiple response shapes: {"paths": [...]} or {"files": [...]}
raw_paths = res.get("paths") or res.get("files") or []
# Each element may be a plain string or a dict with a "path"/"name" key
paths = []
for p in raw_paths:
if isinstance(p, dict):
p = p.get("path") or p.get("name") or ""
if isinstance(p, str) and p.startswith("/logParsers/"):
paths.append(p)
_os.makedirs(parsers_dir, exist_ok=True)
fetched, failed = 0, []
for p in paths:
name = p.rsplit("/", 1)[-1] or "_unnamed"
try:
r = _post("/api/getFile", {"path": p})
content = r.get("content")
if content is None:
failed.append({"path": p, "error": "no content", "raw": r})
continue
with open(_os.path.join(parsers_dir, name), "w", encoding="utf-8") as fh:
fh.write(content)
fetched += 1
except Exception as e:
failed.append({"path": p, "error": str(e)})
# Surface the raw API response so callers can see exactly what was returned.
# Truncate paths list so the response stays readable (first 200).
debug_info = {
"response_keys": list(res.keys()),
"paths_found": len(paths),
"paths_listed": paths[:200],
}
return {"fetched": fetched, "failed": failed, "skipped": None, "debug": debug_info}
@router.post("/load-parsers-from-sdl")
async def load_parsers_from_sdl(db: Session = Depends(get_db)):
"""
Load SDL parsers from the local /app/parsers directory (mounted from ./parsers/).
Files are placed there by the MCP-based loader or by manual copy.
Falls back to a clear error if the directory is empty.
Sync SDL parsers from the console (if SDL_CONFIG_READ_KEY is set) then index
every file in the local /app/parsers directory into the DB.
"""
import os
parsers_dir = "/app/parsers"
# ── Step 1: fetch from console (best-effort) ────────────────────────────
fetch_result = _fetch_parsers_from_console(parsers_dir)
# ── Step 2: load whatever is on disk into the DB ─────────────────────────
try:
entries = [
e for e in os.scandir(parsers_dir)
@@ -225,12 +318,19 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
except FileNotFoundError:
raise HTTPException(503, "parsers/ directory not found — check Docker volume mount")
if not entries and fetch_result["skipped"]:
raise HTTPException(
422,
f"No parser files found in parsers/ directory and console sync was skipped "
f"({fetch_result['skipped']}). "
"Add SDL_CONFIG_READ_KEY in Settings (needs 'Manage config files' permission) "
"or upload a parser file manually."
)
if not entries:
raise HTTPException(
422,
"No parser files found in parsers/ directory. "
"Use 'Load SDL Parsers via MCP' in Claude Code to populate it, "
"or upload a parser file manually."
"No parser files found in parsers/ directory after console sync. "
"Check SDL_CONFIG_READ_KEY permissions ('Manage config files' required)."
)
loaded = []
@@ -258,7 +358,12 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
errors.append({"parser": entry.name, "error": str(e)})
db.commit()
return {"loaded": len(loaded), "parsers": loaded, "errors": errors}
return {
"loaded": len(loaded),
"parsers": loaded,
"errors": errors,
"console_fetch": fetch_result,
}
@router.post("/upload-parser")
@@ -329,6 +434,9 @@ _S1_NATIVE_SOURCES = {
"SentinelOne Ranger AD",
}
# Cached count of events with no dataSource.name — updated on each sync
_unlabelled_event_count: int = -1 # -1 = not yet queried
@router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
@@ -378,28 +486,34 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
parser_detected=parsed_by_source.get(name, 0),
))
seen += 1
db.commit()
return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]}
synced_names = [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]
return {"synced": seen, "sources": synced_names}
def _build_parser_ds_index() -> dict[str, dict]:
def _build_parser_ds_index() -> tuple[dict[str, dict], list[dict]]:
"""
Read all parser files from /app/parsers/ and build an index:
dataSource.name (exact, from parser attributes){parser_name, format_type}
Read all parser files from /app/parsers/ and build:
- index: dataSource.name → {parser_name, format_type} (complete parsers)
- stubs: list of {parser_name} for files with no dataSource.name attribute
Format type is "grok", "dottedJson", or "custom".
Sources with grok/dottedJson parsers are flagged as needing a proper parser.
"""
import os, re
parsers_dir = "/app/parsers"
_DS_NAME_RE = re.compile(r'"dataSource\.name"\s*:\s*"([^"]+)"')
_DS_NAME_RE = re.compile(r'"?dataSource\.name"?\s*:\s*"([^"]+)"')
_FORMAT_TYPE_RE = re.compile(r'"type"\s*:\s*"([^"]+)"')
# Only treat a file as a parser if it has a formats section — rules out dashboards/saved-searches
_HAS_FORMATS_RE = re.compile(r'\bformats\s*:', re.IGNORECASE)
index: dict[str, dict] = {}
stubs: list[dict] = []
try:
entries = [e for e in os.scandir(parsers_dir) if e.is_file() and not e.name.startswith(".")]
except FileNotFoundError:
return index
return index, stubs
for entry in entries:
try:
@@ -408,9 +522,15 @@ def _build_parser_ds_index() -> dict[str, dict]:
except Exception:
continue
# Skip files that have no formats section — they're dashboards/queries, not parsers
if not _HAS_FORMATS_RE.search(content):
continue
# Extract dataSource.name (may appear multiple times — take first)
ds_match = _DS_NAME_RE.search(content)
if not ds_match:
# Has formats but no dataSource.name — genuine stub parser
stubs.append({"parser_name": entry.name})
continue
ds_name = ds_match.group(1).strip()
@@ -425,7 +545,7 @@ def _build_parser_ds_index() -> dict[str, dict]:
index[ds_name] = {"parser_name": entry.name, "format_type": fmt}
return index
return index, stubs
@router.get("/map")
@@ -447,11 +567,20 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_index.setdefault(pf.parser_name, set()).add(pf.field_name)
# Build dataSource.name → {parser_name, format_type} index from parser files
ds_index = _build_parser_ds_index()
ds_index, stub_parsers = _build_parser_ds_index()
def _normalize(s: str) -> str:
return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "")
def _find_stub_match(source_name: str) -> dict | None:
"""Return stub parser info if a stub filename fuzzy-matches this source name."""
sn = _normalize(source_name)
for stub in stub_parsers:
fn = _normalize(stub["parser_name"])
if fn in sn or sn in fn:
return stub
return None
def _find_parser_info(source_name: str) -> dict | None:
"""
Match priority:
@@ -514,6 +643,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_info = _find_parser_info(src.source_name)
parser_in_data = (src.parser_detected or 0) > 0
stub_info = _find_stub_match(src.source_name) if not parser_info else None
if parser_info and parser_info["format_type"] == "custom":
status = "covered"
matched_parser = parser_info["parser_name"]
@@ -528,6 +659,12 @@ def get_coverage_map(db: Session = Depends(get_db)):
status = "covered"
matched_parser = parser_info["parser_name"] if parser_info else "detected in data"
format_type = parser_info["format_type"] if parser_info else "unknown"
elif stub_info:
# A parser file exists but has no dataSource.name — it's a stub/incomplete
status = "stub_parser"
matched_parser = stub_info["parser_name"]
format_type = None
stub_info["suggested_ds_name"] = src.source_name
else:
status = "parser_needed"
matched_parser = None
@@ -536,7 +673,7 @@ def get_coverage_map(db: Session = Depends(get_db)):
if status == "covered":
covered_count += 1
else:
needed_count += 1
needed_count += 1 # stub_parser and parser_needed both count as needing work
rules_for_src: list = [r for r in rule_by_source.get(src.source_name, []) if r["type"] == "library"]
@@ -614,6 +751,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
"status": status,
"parser": matched_parser,
"format_type": format_type,
"unlabelled": bool(src.unlabelled),
"stub_suggested_ds_name": stub_info.get("suggested_ds_name") if stub_info and status == "stub_parser" else None,
"parser_fields": len(parser_provides),
"parser_detected": src.parser_detected or 0,
"rules": rules_for_src,
@@ -624,13 +763,20 @@ def get_coverage_map(db: Session = Depends(get_db)):
"synced_at": src.synced_at.isoformat() if src.synced_at else None,
})
# Only surface stub parsers that matched an active source with real events —
# unmatched stubs with zero events are noise and are suppressed.
synced_at = active_sources[0].synced_at.isoformat() if active_sources else None
stub_count = sum(1 for s in sources_out if s["status"] == "stub_parser")
return {
"summary": {
"active_sources": len(active_sources),
"covered": covered_count,
"parser_needed": needed_count,
"stub_parsers": stub_count,
"unlabelled_events": _unlabelled_event_count,
"parsers_loaded": len(parser_index),
"rules_loaded": len(rules),
},
@@ -640,9 +786,20 @@ def get_coverage_map(db: Session = Depends(get_db)):
}
@router.get("/stub-parsers")
def get_stub_parsers():
"""Return all parser files that have a formats: section but no dataSource.name attribute.
Used by Parser Quality — Attributes Missing section. Independent of active sources."""
_, stubs = _build_parser_ds_index()
return {"stubs": stubs, "count": len(stubs)}
@router.delete("/reset")
def reset_data(db: Session = Depends(get_db)):
db.query(ParsedRule).delete()
db.query(ParserField).delete()
db.query(ActiveSource).delete()
db.commit()
global _unlabelled_event_count
_unlabelled_event_count = -1
return {"cleared": True}
+146 -28
View File
@@ -38,6 +38,7 @@ class SampleEventsRequest(BaseModel):
source: str
limit: int = 20
hours: int = 1
filter_mode: str = "broad" # reserved for future use
class FieldPopulationRequest(BaseModel):
@@ -107,21 +108,41 @@ def _flatten_event(event: dict) -> dict:
def _extract_format_strings(content: str) -> list[str]:
"""
Extract SDL format string values from augmented-JSON parser content.
Matches: "format": "..." (double-quoted value, supports escaped quotes).
Handles both:
- quoted keys: "format": "..." (valid JSON)
- unquoted keys: format: "..." (SDL augmented-JSON)
Skips commented-out lines (// ...).
"""
pattern = re.compile(r'"format"\s*:\s*"((?:[^"\\]|\\.)*)"')
return pattern.findall(content)
pattern = re.compile(r'(?<!//)\"?format\"?\s*:\s*"((?:[^"\\]|\\.)*)"')
results = []
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("//"):
continue
results.extend(pattern.findall(line))
return results
def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]:
"""
Convert an SDL format string to a compiled Python regex.
Returns (compiled_pattern, py_group_to_sdl_field) mapping so callers can
translate group names back to the original SDL field names.
SDL format strings may start with '.*,' to absorb a syslog header. When
used with re.search that prefix is redundant AND harmful (it forces a comma
before the first named field, which won't exist when the log starts with
the field directly). We strip the leading '.*,' so re.search can anchor
to the first real field at any position in the line.
Internal '.*' wildcards (field separators for skipped fields) are kept as
non-greedy '.*?' so they don't consume adjacent named-field values.
Returns (compiled_pattern, py_group_to_sdl_field).
Raises re.error if the resulting pattern cannot be compiled.
"""
# Strip leading/trailing .* wildcards — re.search handles positioning
fmt = re.sub(r'^(\.\*,?)+', '', fmt)
fmt = re.sub(r'(,?\.\*)+$', '', fmt)
# Split on $...$ tokens
token_pattern = re.compile(r'\$([^$]+)\$')
parts = token_pattern.split(fmt)
@@ -131,19 +152,25 @@ def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]:
py_group_to_sdl: dict[str, str] = {}
seen_groups: dict[str, int] = {}
def _escape_literal(s: str) -> str:
"""Escape literal text but keep internal .* as non-greedy wildcards."""
segments = re.split(r'(\.\*)', s)
return ''.join(r'.*?' if seg == '.*' else re.escape(seg) for seg in segments)
for i, part in enumerate(parts):
if i % 2 == 0:
# Literal text
regex_parts.append(re.escape(part))
# Literal text (possibly containing .* wildcards)
regex_parts.append(_escape_literal(part))
else:
# Token: either "field.name=PATTERN" or just "field.name"
if '=' in part:
field_name, pattern = part.split('=', 1)
else:
field_name = part
pattern = r'[^\s]+'
# Default: match any non-comma chars (SDL CSV fields)
pattern = r'[^,]*'
# Build a valid Python group name
# Build a valid Python named-group identifier
safe = re.sub(r'[.\-]', '_', field_name)
if safe in seen_groups:
seen_groups[safe] += 1
@@ -154,7 +181,7 @@ def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]:
py_group_to_sdl[safe] = field_name
regex_parts.append(f'(?P<{safe}>{pattern})')
compiled = re.compile(''.join(regex_parts), re.IGNORECASE)
compiled = re.compile(''.join(regex_parts), re.IGNORECASE | re.DOTALL)
return compiled, py_group_to_sdl
@@ -162,6 +189,45 @@ def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]:
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sample-unlabelled")
async def sample_unlabelled(req: SampleEventsRequest):
"""Return a sample of events that have no dataSource.name — these need parsers.
Also runs a count query so the caller can update the banner with the real total.
"""
import asyncio
from routers import coverage as _coverage
filter_expr = "!(dataSource.name = *) !(source = 'scalyr')"
from_dt, to_dt = _date_range_hours(req.hours)
sample_result, count_result = await asyncio.gather(
s1_client.run_powerquery(f"{filter_expr} | limit {req.limit}", from_dt, to_dt),
s1_client.run_powerquery(f"{filter_expr} | group events=count()", from_dt, to_dt, max_count=50_000_000),
)
rows = sample_result if isinstance(sample_result, list) else (sample_result.get("rows") or sample_result.get("events") or [])
events = [_flatten_event(row) for row in rows]
non_empty_keys: set = set()
for ev in events:
for k, v in ev.items():
if v is not None and v != "" and v != "null":
non_empty_keys.add(k)
events = [{k: v for k, v in ev.items() if k in non_empty_keys} for ev in events]
count_rows = count_result.get("events", []) if isinstance(count_result, dict) else []
total = count_rows[0].get("events", 0) if count_rows else 0
_coverage._unlabelled_event_count = total
return {
"events": events,
"count": len(events),
"total": total,
"hours": req.hours,
"columns_seen": sorted(non_empty_keys),
}
@router.post("/sample-events")
async def sample_events(req: SampleEventsRequest):
"""Return a sample of raw events from a given data source."""
@@ -196,21 +262,39 @@ async def field_population(req: FieldPopulationRequest):
events = [_flatten_event(row) for row in rows]
if not events:
raise HTTPException(status_code=404, detail=f"No events found for source '{req.source}' in the last {req.hours} hours.")
return {
"source": req.source,
"total_sampled": 0,
"hours": req.hours,
"fields": [],
"fields_seen_in_sample": [],
"message": f"No events found for source '{req.source}' in the last {req.hours} hours.",
}
total = len(events)
_empty = {None, "", "null"}
_empty_scalars = {None, "", "null"}
def _is_empty(val):
"""Return True if the value counts as unpopulated."""
if val is None:
return True
if isinstance(val, list):
return len(val) == 0
if isinstance(val, dict):
return len(val) == 0
return val in _empty_scalars
# Collect all field names seen across the sample (useful for surfacing what IS there)
all_seen_fields = sorted({k for ev in events for k in ev})
all_seen_fields_set = set(all_seen_fields)
field_stats = []
for field in req.fields:
# dataSource.name is always 100% — we filtered by it; Scalyr just doesn't echo it back
if field == "dataSource.name":
populated = total
else:
populated = sum(1 for ev in events if ev.get(field) not in _empty)
# Skip fields that don't appear anywhere in the sample
if field not in all_seen_fields_set:
continue
populated = sum(1 for ev in events if not _is_empty(ev.get(field)))
rate = round((populated / total) * 100, 1)
field_stats.append({
"field": field,
@@ -219,8 +303,8 @@ async def field_population(req: FieldPopulationRequest):
"rate": rate,
})
# Sort ascending by rate (worst coverage first)
field_stats.sort(key=lambda x: x["rate"])
# Sort descending by rate (best coverage first)
field_stats.sort(key=lambda x: x["rate"], reverse=True)
return {
"source": req.source,
@@ -255,7 +339,10 @@ async def test_parser(req: TestParserRequest):
# The regex-based path can't model that — handle it explicitly so users
# can test JSON-shaped logs against JSON-mode parsers.
log_input = req.log_line.strip()
is_json_mode = any("parse=json" in f for f in format_strings) or log_input.startswith("{")
# Only enter JSON mode if the log content actually looks like JSON.
# Don't force it based on the parser type alone — a JSON-capable parser
# should still fall through to regex matching for non-JSON inputs.
is_json_mode = log_input.startswith("{") or log_input.startswith("[")
if is_json_mode:
import json as _json
# Support multi-line input (one JSON object per line, or a JSON array)
@@ -307,18 +394,24 @@ async def test_parser(req: TestParserRequest):
# Use the first payload for the detail table; report totals.
payload = payloads[0]
extracted = _flatten_dict(payload)
# SDL's parse=json puts all keys into unmapped.* namespace first, then
# rewrites map unmapped.X -> ocsf.field. Mirror that so rewrites fire.
unmapped_aliases = {f"unmapped.{k}": v for k, v in extracted.items()}
extracted_with_unmapped = {**extracted, **unmapped_aliases}
# Apply lightweight rewrites if present (input/output/match/replace blocks).
# We only handle simple literal/regex matches with $0 or string replacements;
# this is best-effort, intended for quick visual verification.
rewrites_applied = []
# Handle both quoted keys ("input":) and unquoted keys (input:)
rewrite_re = re.compile(
r'\{\s*input:\s*"([^"]+)"\s*,\s*output:\s*"([^"]+)"\s*,\s*match:\s*"((?:[^"\\]|\\.)*)"\s*,\s*replace:\s*"((?:[^"\\]|\\.)*)"\s*\}',
r'\{\s*"?input"?\s*:\s*"([^"]+)"\s*,\s*"?output"?\s*:\s*"([^"]+)"\s*,\s*"?match"?\s*:\s*"((?:[^"\\]|\\.)*)"\s*,\s*"?replace"?\s*:\s*"((?:[^"\\]|\\.)*)"\s*\}',
re.DOTALL,
)
derived: dict[str, str] = {}
for m in rewrite_re.finditer(content):
in_field, out_field, match_pat, replace_val = m.group(1), m.group(2), m.group(3), m.group(4)
src_val = extracted.get(in_field)
src_val = extracted_with_unmapped.get(in_field)
if src_val is None:
continue
try:
@@ -359,32 +452,57 @@ async def test_parser(req: TestParserRequest):
"showing_payload": 1,
}
# ── Regex format-string path (original) ─────────────────────────────────
# ── Regex format-string path ─────────────────────────────────────────────
def _try_prefix_match(compiled: re.Pattern, py_to_sdl: dict, log_line: str):
"""
Try the full pattern; if it doesn't match, progressively shorten from
the right (group by group) until we get a match. This handles logs
that don't include all the trailing optional fields the parser defines.
Returns (match, truncated) or (None, False).
"""
m = compiled.search(log_line)
if m:
return m, False
# Shorten pattern by removing trailing named groups one at a time
p = compiled.pattern
# Find all (?P<name>...) group end positions (right to left)
group_ends = [m2.end() for m2 in re.finditer(r'\(\?P<[^>]+>[^)]*\)', p)]
for end in reversed(group_ends[1:]): # keep at least 1 group
try:
shorter = re.compile(p[:end], re.IGNORECASE | re.DOTALL)
m2 = shorter.search(log_line)
if m2:
return m2, True
except re.error:
continue
return None, False
for fmt in format_strings:
try:
compiled, py_to_sdl = _sdl_format_to_regex(fmt)
except re.error:
# Skip unparseable format strings
continue
match = compiled.search(req.log_line)
match, truncated = _try_prefix_match(compiled, py_to_sdl, req.log_line)
if match:
fields = [
{"field": py_to_sdl.get(group, group), "value": value}
for group, value in match.groupdict().items()
if value is not None
if value is not None and value != ""
]
return {
"parser_name": req.parser_name,
"matched": True,
"mode": "regex",
"format_matched": fmt,
"format_matched": fmt[:120] + ("" if len(fmt) > 120 else ""),
"fields": fields,
"note": "Partial match — log has fewer fields than the full parser format" if truncated else None,
}
return {
"parser_name": req.parser_name,
"matched": False,
"message": "No format pattern matched",
"message": "No format pattern matched. Check that the log includes the log-type keyword (e.g. TRAFFIC, THREAT) and enough comma-separated fields.",
"fields": [],
}
+1
View File
@@ -14,6 +14,7 @@ FIELDS = [
{"key": "S1_API_TOKEN", "label": "Console API Token", "secret": True, "placeholder": "eyJ..."},
{"key": "SDL_XDR_URL", "label": "SDL XDR URL", "secret": False, "placeholder": "https://xdr.us1.sentinelone.net"},
{"key": "SDL_LOG_READ_KEY", "label": "SDL Log Read Key", "secret": True, "placeholder": "1DnK0Y4e..."},
{"key": "SDL_CONFIG_READ_KEY", "label": "SDL Config Read Key", "secret": True, "placeholder": "Needs 'Manage config files' permission"},
{"key": "ANTHROPIC_API_KEY", "label": "Anthropic API Key", "secret": True, "placeholder": "sk-ant-..."},
]
+121 -17
View File
@@ -11,6 +11,11 @@ TOKEN = os.environ.get("S1_API_TOKEN", "")
SDL_XDR_URL = os.environ.get("SDL_XDR_URL", "https://xdr.us1.sentinelone.net").rstrip("/")
SDL_LOG_READ_KEY = os.environ.get("SDL_LOG_READ_KEY", "")
# SDL Configuration Read Key — used to list/fetch parser files under /logParsers/
# (separate from SDL_LOG_READ_KEY which is for querying events only).
# Find it in the S1 console: Settings → Integrations → Data Lake API Keys → Configuration Read.
SDL_CONFIG_READ_KEY = os.environ.get("SDL_CONFIG_READ_KEY", "")
# Management Console API uses ApiToken auth
HEADERS = {
"Authorization": f"ApiToken {TOKEN}",
@@ -92,7 +97,7 @@ async def get_library_rules(page_size: int = 100) -> list:
return results
async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
async def run_powerquery(query: str, from_date: str, to_date: str, max_count: int = 1000) -> dict:
"""
Run a PowerQuery against the Singularity Data Lake via the Scalyr XDR API.
Uses SDL_XDR_URL + SDL_LOG_READ_KEY (Scalyr readlog token).
@@ -109,7 +114,7 @@ async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
"query": query,
"startTime": start_ms,
"endTime": end_ms,
"maxCount": 1000,
"maxCount": max_count,
}
async with httpx.AsyncClient(timeout=120) as client:
@@ -154,8 +159,47 @@ async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
return {"events": matches}
def _sdl_config_headers() -> dict:
"""Auth headers for the SDL Configuration File API (uses POST /api/listFiles,
POST /api/getFile, etc.). Falls back to SDL_LOG_READ_KEY if no dedicated
Configuration Read key is set — that won't work for all endpoints, but lets
callers fail with a meaningful 401 instead of crashing."""
key = SDL_CONFIG_READ_KEY or SDL_LOG_READ_KEY
return {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}
async def list_sdl_parsers() -> list[str]:
"""List all parser filenames under /logParsers/ in SDL."""
"""List parser paths under /logParsers/ via the SDL Configuration File API.
Requires SDL_CONFIG_READ_KEY (or higher) in .env. The endpoint is
POST <SDL_XDR_URL>/api/listFiles with {"pathPrefix": "/logParsers/"}.
Returns names without the /logParsers/ prefix, suitable for use as
filenames in the local parsers/ directory.
"""
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{SDL_XDR_URL}/api/listFiles",
headers=_sdl_config_headers(),
json={"pathPrefix": "/logParsers/"},
)
resp.raise_for_status()
data = resp.json()
paths = data.get("paths") or data.get("files") or []
# Normalize: strip leading /logParsers/ and ignore anything that isn't there
names: list[str] = []
for p in paths:
if isinstance(p, dict):
p = p.get("path") or p.get("name") or ""
if isinstance(p, str) and p.startswith("/logParsers/"):
names.append(p[len("/logParsers/"):])
return names
async def list_sdl_parsers_legacy() -> list[str]:
"""[Deprecated] Legacy management-console path — kept for reference but unused."""
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/api/v1/files/logParsers",
@@ -170,46 +214,106 @@ async def list_sdl_parsers() -> list[str]:
async def get_sdl_parser(filename: str) -> dict:
"""Fetch a single SDL parser file by name."""
"""Fetch a single SDL parser file by name via POST /api/getFile.
Returns the raw SDL response dict, e.g.
{"status": "success", "path": "/logParsers/Foo", "content": "...", "version": 3, ...}
"""
path = filename if filename.startswith("/logParsers/") else f"/logParsers/{filename}"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{BASE_URL}/api/v1/files/logParsers/{filename}",
headers=HEADERS,
resp = await client.post(
f"{SDL_XDR_URL}/api/getFile",
headers=_sdl_config_headers(),
json={"path": path},
)
resp.raise_for_status()
return resp.json()
async def get_account_id() -> str | None:
"""Return the first account ID visible to the current token."""
"""Return the first account ID visible to the current token.
Tries /accounts first (works for account-scoped or higher tokens). If that
returns 403 (site-scoped token), falls back to /sites and reads accountId
from the first site.
"""
async with httpx.AsyncClient(timeout=15) as client:
# Path 1: account-scoped token
resp = await client.get(
f"{BASE_URL}/web/api/v2.1/accounts",
headers=HEADERS,
params={"limit": 1},
)
resp.raise_for_status()
accounts = resp.json().get("data", [])
return str(accounts[0]["id"]) if accounts else None
if resp.status_code == 200:
accounts = resp.json().get("data", [])
if accounts:
return str(accounts[0]["id"])
# Path 2: site-scoped token — accountId is embedded in sites payload
if resp.status_code in (401, 403):
sresp = await client.get(
f"{BASE_URL}/web/api/v2.1/sites",
headers=HEADERS,
params={"limit": 1},
)
if sresp.status_code == 200:
data = sresp.json().get("data", {})
sites = data.get("sites") if isinstance(data, dict) else data
if sites:
return str(sites[0].get("accountId") or "") or None
return None
async def get_scope_for_platform_rules() -> tuple[str, str] | None:
"""Pick the best scope for /detection-library/platform-rules.
Returns (scopeLevel, scopeId). Tries account first, then site — site-scoped
tokens cannot list accounts but CAN query platform-rules with site scope.
"""
async with httpx.AsyncClient(timeout=15) as client:
# Prefer account scope (broadest)
a = await client.get(
f"{BASE_URL}/web/api/v2.1/accounts",
headers=HEADERS,
params={"limit": 1},
)
if a.status_code == 200:
accounts = a.json().get("data", [])
if accounts:
return ("account", str(accounts[0]["id"]))
# Fall back to site scope (site-scoped tokens land here)
s = await client.get(
f"{BASE_URL}/web/api/v2.1/sites",
headers=HEADERS,
params={"limit": 1},
)
if s.status_code == 200:
data = s.json().get("data", {})
sites = data.get("sites") if isinstance(data, dict) else data
if sites:
sid = sites[0].get("id")
if sid:
return ("site", str(sid))
return None
async def get_platform_rules(page_size: int = 1000) -> list:
"""
Fetch all Detection Library platform rules from /detection-library/platform-rules.
Requires scopeLevel + scopeId — uses account scope with the first visible account.
Returns list of rules, each with a 'sources' list (authoritative data source names).
Requires scopeLevel + scopeId. Tries account scope first, then site scope so
site-scoped tokens also work.
"""
account_id = await get_account_id()
if not account_id:
scope = await get_scope_for_platform_rules()
if not scope:
return []
scope_level, scope_id = scope
all_rules: list = []
cursor: str = ""
async with httpx.AsyncClient(timeout=60) as client:
while True:
params: dict = {
"scopeLevel": "account",
"scopeId": account_id,
"scopeLevel": scope_level,
"scopeId": scope_id,
"limit": page_size,
"cursor": cursor,
}