Files
marcredhat-siem-toolkit-pat…/tools/sync_sdl_parsers.py
T

101 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
Pull every parser under /logParsers/ from the SDL tenant and drop it into
./parsers/ so the SIEM-Toolkit Parser Test Runner can list it.
Auth: config_read_key from sentinelone-sdl-api/config.json
"""
from __future__ import annotations
import json
import os
import sys
import urllib.request
import urllib.parse
import urllib.error
def _load_sdl_cfg():
import json as _j, os as _o, sys as _s
here = _o.path.dirname(_o.path.abspath(__file__))
candidates = [
_o.environ.get("SDL_CONFIG"),
_o.path.join(here, "sdl_config.json"),
_o.path.join(here, "..", "sdl_config.json"),
]
for p in candidates:
if p and _o.path.exists(p):
with open(p) as fh:
return _j.load(fh)
_s.stderr.write(
"ERROR: no SDL config found. Set $SDL_CONFIG or create sdl_config.json "
"(see sdl_config.example.json)\n")
_s.exit(2)
SDL_CFG_PATH = os.environ.get('SDL_CONFIG') # placeholder; cfg loaded below
DEST = os.environ.get('PARSERS_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'parsers'))
def call(base_url: str, token: str, path: str, params: dict) -> dict:
"""POST with JSON body — works for both listFiles and getFile on SDL."""
url = f"{base_url.rstrip('/')}{path}"
body = json.dumps({**params, "token": token}).encode()
req = urllib.request.Request(url, data=body, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")[:300]
raise RuntimeError(f"HTTP {e.code} {path}: {body}")
def main() -> int:
cfg = _load_sdl_cfg()
base = cfg["base_url"]
# config_read_key first (per docs), fall back to console_api_token
token = cfg.get("config_read_key") or cfg.get("console_api_token")
if not token:
print("No config_read_key or console_api_token in config.json", file=sys.stderr)
return 2
print(f"Listing /logParsers/ from {base} ...")
res = call(base, token, "/api/listFiles", {"pathPrefix": "/logParsers/"})
paths = res.get("paths", [])
print(f"Found {len(paths)} files under /logParsers/")
os.makedirs(DEST, exist_ok=True)
fetched, skipped, failed = 0, 0, []
for p in paths:
# Strip leading /logParsers/, sanitize for filesystem
name = p.rsplit("/", 1)[-1] or "_unnamed"
# Avoid colliding with existing sample files? Always overwrite to keep fresh.
try:
r = call(base, token, "/api/getFile", {"path": p})
except Exception as e:
failed.append((p, str(e)))
continue
content = r.get("content")
if content is None:
failed.append((p, "no content"))
continue
out = os.path.join(DEST, name)
with open(out, "w", encoding="utf-8") as fh:
fh.write(content)
ver = r.get("version", "?")
print(f" + {name:<60} v{ver} ({len(content)} bytes)")
fetched += 1
print()
print(f"Done: fetched={fetched}, failed={len(failed)}")
if failed:
for p, err in failed[:10]:
print(f" ! {p}: {err}")
return 0
if __name__ == "__main__":
sys.exit(main())