mirror of
https://github.com/marcredhat/SIEM-toolkit-patched
synced 2026-06-08 20:37:12 +00:00
Stormshield/F5/WatchGuard parser test fix + SDL KV-scanner support
- Format & rewrite extractors now accept JS-style unquoted keys
- Resolve $var=PatternName$ against parser's patterns: {} block
- Implement SDL key=value scanner ($_$=$prefix._$ + repeat: true)
- Apply rewrites across union of fields from all formats
- Fix $0/$N backref translation; remove shadowing of _to_py_backref
This commit is contained in:
+153
-24
@@ -157,9 +157,10 @@ def _flatten_event(event: dict) -> dict:
|
||||
def _extract_format_strings(content: str) -> list[str]:
|
||||
"""
|
||||
Extract SDL format string values from augmented-JSON parser content.
|
||||
Matches: "format": "..." (double-quoted value, supports escaped quotes).
|
||||
Matches: format: "..." or "format": "..." (SDL parser files are
|
||||
JS-style JSON: keys may or may not be quoted). Supports escaped quotes.
|
||||
"""
|
||||
pattern = re.compile(r'"format"\s*:\s*"((?:[^"\\]|\\.)*)"')
|
||||
pattern = re.compile(r'(?:"format"|format)\s*:\s*"((?:[^"\\]|\\.)*)"')
|
||||
return pattern.findall(content)
|
||||
|
||||
|
||||
@@ -208,6 +209,87 @@ def _sdl_format_to_regex(fmt: str) -> tuple[re.Pattern, dict[str, str]]:
|
||||
return compiled, py_group_to_sdl
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SDL parser helpers: pattern refs, key=value scanner, rewrites
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _extract_patterns_block(content: str) -> dict[str, str]:
|
||||
"""Extract the top-level `patterns: { name: "regex", ... }` block."""
|
||||
m = re.search(r'patterns\s*:\s*\{', content)
|
||||
if not m:
|
||||
return {}
|
||||
depth, i = 1, m.end()
|
||||
while i < len(content) and depth > 0:
|
||||
c = content[i]
|
||||
if c == '{':
|
||||
depth += 1
|
||||
elif c == '}':
|
||||
depth -= 1
|
||||
i += 1
|
||||
block = content[m.end():i - 1]
|
||||
return dict(re.findall(r'([A-Za-z_]\w*)\s*:\s*"((?:[^"\\]|\\.)*)"', block))
|
||||
|
||||
|
||||
def _resolve_pattern_refs(fmt: str, patterns: dict[str, str]) -> str:
|
||||
"""Replace $var=PatternName$ with $var=<resolved>$ when PatternName is defined."""
|
||||
if not patterns:
|
||||
return fmt
|
||||
|
||||
def sub(m: re.Match) -> str:
|
||||
token = m.group(1)
|
||||
if '=' in token:
|
||||
name, pat = token.split('=', 1)
|
||||
if pat in patterns:
|
||||
return f"${name}={patterns[pat]}$"
|
||||
return m.group(0)
|
||||
return re.sub(r'\$([^$]+)\$', sub, fmt)
|
||||
|
||||
|
||||
_KV_TOKEN_RE = re.compile(r'\$_\$=\$([^$]+)\._\$')
|
||||
_KV_SCAN_RE = re.compile(r'([A-Za-z_][\w.-]*)=(?:"((?:[^"\\]|\\.)*)"|([^\s"]+))')
|
||||
|
||||
|
||||
def _is_kv_format(fmt: str) -> bool:
|
||||
"""SDL key=value scanner idiom: $_$=$<prefix>._$."""
|
||||
return bool(_KV_TOKEN_RE.search(fmt))
|
||||
|
||||
|
||||
def _scan_kv(line: str, fmt: str) -> dict[str, str]:
|
||||
"""Extract key=value pairs (supports quoted values) and prefix the keys."""
|
||||
m = _KV_TOKEN_RE.search(fmt)
|
||||
prefix = m.group(1) if m else "unmapped"
|
||||
out: dict[str, str] = {}
|
||||
for km in _KV_SCAN_RE.finditer(line):
|
||||
k = km.group(1)
|
||||
v = km.group(2) if km.group(2) is not None else km.group(3)
|
||||
out[f"{prefix}.{k}"] = v
|
||||
return out
|
||||
|
||||
|
||||
_REWRITE_RE = re.compile(
|
||||
# JS-style or strict JSON: keys may or may not be quoted, in any order with
|
||||
# commas between. We assume the canonical SDL ordering input/output/match/replace.
|
||||
r'\{\s*(?:"input"|input)\s*:\s*"([^"]+)"\s*,'
|
||||
r'\s*(?:"output"|output)\s*:\s*"([^"]+)"\s*,'
|
||||
r'\s*(?:"match"|match)\s*:\s*"((?:[^"\\]|\\.)*)"\s*,'
|
||||
r'\s*(?:"replace"|replace)\s*:\s*"((?:[^"\\]|\\.)*)"',
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
|
||||
def _extract_rewrites(content: str) -> list[dict]:
|
||||
return [
|
||||
{"input": m.group(1), "output": m.group(2),
|
||||
"match": m.group(3), "replace": m.group(4)}
|
||||
for m in _REWRITE_RE.finditer(content)
|
||||
]
|
||||
|
||||
|
||||
def _to_py_backref(s: str) -> str:
|
||||
"""Translate SDL $0/$N backrefs to Python \\g<0>/\\g<N>."""
|
||||
return re.sub(r"\$(\d+)", lambda mm: f"\\g<{mm.group(1)}>", s)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -377,10 +459,7 @@ async def test_parser(req: TestParserRequest):
|
||||
continue
|
||||
if not m2:
|
||||
continue
|
||||
# SDL uses $0 for whole match, $1.. for groups. Translate to Python
|
||||
# \g<0>, \g<1>, ... so re.sub doesn't read \0 as a null byte.
|
||||
def _to_py_backref(s: str) -> str:
|
||||
return re.sub(r"\$(\d+)", lambda mm: f"\\g<{mm.group(1)}>", s)
|
||||
# SDL uses $0/$N backrefs; module-level _to_py_backref translates them.
|
||||
try:
|
||||
val = re.sub(match_pat, _to_py_backref(replace_val), str(src_val), count=1)
|
||||
except re.error:
|
||||
@@ -409,32 +488,82 @@ async def test_parser(req: TestParserRequest):
|
||||
"showing_payload": 1,
|
||||
}
|
||||
|
||||
# ── Regex format-string path (original) ─────────────────────────────────
|
||||
# ── Regex / KV / pattern-ref path ───────────────────────────────────────
|
||||
# Accumulate fields across all matching formats so that a parser like
|
||||
# Stormshield (one format for the timestamp + a KV scanner for the rest +
|
||||
# a third format to drive rewrites) returns a complete picture.
|
||||
patterns_block = _extract_patterns_block(content)
|
||||
extracted_fields: dict[str, str] = {}
|
||||
formats_matched: list[str] = []
|
||||
|
||||
for fmt in format_strings:
|
||||
resolved = _resolve_pattern_refs(fmt, patterns_block)
|
||||
|
||||
# SDL key=value scanner idiom (handles `$_$=$prefix._$` w/ repeat:true)
|
||||
if _is_kv_format(resolved):
|
||||
kv = _scan_kv(req.log_line, resolved)
|
||||
if kv:
|
||||
extracted_fields.update(kv)
|
||||
formats_matched.append(fmt)
|
||||
continue
|
||||
|
||||
try:
|
||||
compiled, py_to_sdl = _sdl_format_to_regex(fmt)
|
||||
compiled, py_to_sdl = _sdl_format_to_regex(resolved)
|
||||
except re.error:
|
||||
# Skip unparseable format strings
|
||||
continue
|
||||
|
||||
match = compiled.search(req.log_line)
|
||||
if match:
|
||||
fields = [
|
||||
{"field": py_to_sdl.get(group, group), "value": value}
|
||||
for group, value in match.groupdict().items()
|
||||
if value is not None
|
||||
]
|
||||
return {
|
||||
"parser_name": req.parser_name,
|
||||
"matched": True,
|
||||
"mode": "regex",
|
||||
"format_matched": fmt,
|
||||
"fields": fields,
|
||||
}
|
||||
for group, value in match.groupdict().items():
|
||||
if value is None:
|
||||
continue
|
||||
extracted_fields[py_to_sdl.get(group, group)] = value
|
||||
formats_matched.append(fmt)
|
||||
|
||||
if not extracted_fields:
|
||||
return {
|
||||
"parser_name": req.parser_name,
|
||||
"matched": False,
|
||||
"message": (
|
||||
"No format pattern matched. This parser may use SDL features "
|
||||
"the test runner doesn't model (e.g. dottedJson, grok, multi-line). "
|
||||
"Fields can still be parsed correctly at ingest time."
|
||||
),
|
||||
"fields": [],
|
||||
}
|
||||
|
||||
# Apply rewrites declared anywhere in the parser file.
|
||||
derived: dict[str, str] = {}
|
||||
rewrites_applied = []
|
||||
for rw in _extract_rewrites(content):
|
||||
src_val = extracted_fields.get(rw["input"])
|
||||
if src_val is None:
|
||||
continue
|
||||
try:
|
||||
if not re.search(rw["match"], str(src_val)):
|
||||
continue
|
||||
val = re.sub(rw["match"], _to_py_backref(rw["replace"]), str(src_val), count=1)
|
||||
except re.error:
|
||||
continue
|
||||
derived[rw["output"]] = val
|
||||
rewrites_applied.append({
|
||||
"input": rw["input"], "input_value": src_val,
|
||||
"output": rw["output"], "matched_on": rw["match"], "result": val,
|
||||
})
|
||||
|
||||
fields = (
|
||||
[{"field": k, "value": v, "source": "extract"}
|
||||
for k, v in sorted(extracted_fields.items())]
|
||||
+ [{"field": k, "value": v, "source": "rewrite"}
|
||||
for k, v in sorted(derived.items())]
|
||||
)
|
||||
return {
|
||||
"parser_name": req.parser_name,
|
||||
"matched": False,
|
||||
"message": "No format pattern matched",
|
||||
"fields": [],
|
||||
"matched": True,
|
||||
"mode": "regex",
|
||||
"format_matched": " + ".join(formats_matched) or "(none)",
|
||||
"fields": fields,
|
||||
"rewrites_applied": rewrites_applied,
|
||||
"extracted_count": len(extracted_fields),
|
||||
"derived_count": len(derived),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user