mirror of
https://github.com/marcredhat/SIEM-toolkit-patched
synced 2026-06-08 12:33:51 +00:00
101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pull every parser under /logParsers/ from the SDL tenant and drop it into
|
|
./parsers/ so the SIEM-Toolkit Parser Test Runner can list it.
|
|
|
|
Auth: config_read_key from sentinelone-sdl-api/config.json
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
import urllib.parse
|
|
import urllib.error
|
|
|
|
def _load_sdl_cfg():
|
|
import json as _j, os as _o, sys as _s
|
|
here = _o.path.dirname(_o.path.abspath(__file__))
|
|
candidates = [
|
|
_o.environ.get("SDL_CONFIG"),
|
|
_o.path.join(here, "sdl_config.json"),
|
|
_o.path.join(here, "..", "sdl_config.json"),
|
|
]
|
|
for p in candidates:
|
|
if p and _o.path.exists(p):
|
|
with open(p) as fh:
|
|
return _j.load(fh)
|
|
_s.stderr.write(
|
|
"ERROR: no SDL config found. Set $SDL_CONFIG or create sdl_config.json "
|
|
"(see sdl_config.example.json)\n")
|
|
_s.exit(2)
|
|
|
|
|
|
SDL_CFG_PATH = os.environ.get('SDL_CONFIG') # placeholder; cfg loaded below
|
|
DEST = os.environ.get('PARSERS_DIR', os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'parsers'))
|
|
def call(base_url: str, token: str, path: str, params: dict) -> dict:
|
|
"""POST with JSON body — works for both listFiles and getFile on SDL."""
|
|
url = f"{base_url.rstrip('/')}{path}"
|
|
body = json.dumps({**params, "token": token}).encode()
|
|
req = urllib.request.Request(url, data=body, headers={
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
return json.loads(r.read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode(errors="replace")[:300]
|
|
raise RuntimeError(f"HTTP {e.code} {path}: {body}")
|
|
|
|
|
|
def main() -> int:
|
|
cfg = _load_sdl_cfg()
|
|
base = cfg["base_url"]
|
|
# config_read_key first (per docs), fall back to console_api_token
|
|
token = cfg.get("config_read_key") or cfg.get("console_api_token")
|
|
if not token:
|
|
print("No config_read_key or console_api_token in config.json", file=sys.stderr)
|
|
return 2
|
|
|
|
print(f"Listing /logParsers/ from {base} ...")
|
|
res = call(base, token, "/api/listFiles", {"pathPrefix": "/logParsers/"})
|
|
paths = res.get("paths", [])
|
|
print(f"Found {len(paths)} files under /logParsers/")
|
|
|
|
os.makedirs(DEST, exist_ok=True)
|
|
fetched, skipped, failed = 0, 0, []
|
|
|
|
for p in paths:
|
|
# Strip leading /logParsers/, sanitize for filesystem
|
|
name = p.rsplit("/", 1)[-1] or "_unnamed"
|
|
# Avoid colliding with existing sample files? Always overwrite to keep fresh.
|
|
try:
|
|
r = call(base, token, "/api/getFile", {"path": p})
|
|
except Exception as e:
|
|
failed.append((p, str(e)))
|
|
continue
|
|
|
|
content = r.get("content")
|
|
if content is None:
|
|
failed.append((p, "no content"))
|
|
continue
|
|
|
|
out = os.path.join(DEST, name)
|
|
with open(out, "w", encoding="utf-8") as fh:
|
|
fh.write(content)
|
|
ver = r.get("version", "?")
|
|
print(f" + {name:<60} v{ver} ({len(content)} bytes)")
|
|
fetched += 1
|
|
|
|
print()
|
|
print(f"Done: fetched={fetched}, failed={len(failed)}")
|
|
if failed:
|
|
for p, err in failed[:10]:
|
|
print(f" ! {p}: {err}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|