Add Stormshield ingest verifier

End-to-end regression test for the SDL Stormshield parser:
- test.py        single upload + 150s polling verifier
- send_burst.py  4 varied events (different users, IPs, actions) with current timestamps
- verify_query.py  query last 15 min of stormshield events
- run_and_verify.sh  burst + 40s wait + verify
- config.example.json  template (config.json is gitignored)
- README.md     setup, run, behaviour-quirks docs

Use against a real SDL tenant after deploying parsers/stormshield. Confirms
parser='stormshield', dataSource.name='Stormshield', and the 5 OCSF rewrites
(src_endpoint.ip/port, dst_endpoint.ip/port, actor.user.name).
This commit is contained in:
marc
2026-05-22 17:03:26 +02:00
parent 12fec66d9a
commit d6d0faf218
7 changed files with 422 additions and 0 deletions
+1
View File
@@ -10,3 +10,4 @@ data/
# Parsers ARE committed in this fork (snapshot of the demo tenant).
# .env still excluded for safety.
tools/stormshield-verify/config.json
+59
View File
@@ -0,0 +1,59 @@
# Stormshield ingest verifier
End-to-end regression test for the SDL Stormshield parser. Sends raw syslog
events to `/api/uploadLogs`, waits for ingest, and confirms the OCSF rewrites
(`src_endpoint.ip`, `dst_endpoint.ip`, `actor.user.name`, ...) populated by
the parser at ingest time.
## Setup
```bash
cp config.example.json config.json
chmod 600 config.json
# Fill in log_write_key, log_read_key — both are SDL Data Lake API keys.
# Generate them in the S1 console: Singularity Data Lake -> API Keys.
```
`config.json` is gitignored. Never commit real tokens.
## Run
```bash
# Single-event upload + 150s polling verifier (prints which OCSF fields landed)
python3 test.py
# Burst of 4 varied events with current timestamps (different users, IPs, actions)
python3 send_burst.py
# One-shot regression: burst + 40s wait + query last 15 min
bash run_and_verify.sh
```
## How to find the events afterwards
The SDL console search field (and PowerQuery) attribute for the parser name
is **`parser`**, not `parser.name`:
```
parser="stormshield" | sort -timestamp | limit 10
```
## Behaviour quirks worth knowing
1. **`server-host` HTTP header is overwritten** to the literal string
`uploadLogs` on this tenant. Don't try to filter by `serverHost` for
precise event matching; use `parser='stormshield'` instead.
2. **`parser.name` is always None** on `uploadLogs`-ingested events.
Use the bare `parser` attribute.
3. **Embedded `time="..."`** in the syslog body is taken as the event's
canonical timestamp via `$timestamp=tsPattern$`. The scripts rewrite
this to "now" so events appear under recent activity in the console.
4. **Ingest latency** is 5-60s. `test.py` polls for up to 150s.
## Files
- `test.py` — single upload + polling verifier
- `send_burst.py` — N varied events with current timestamps
- `verify_query.py` — query last 15 min of stormshield events
- `run_and_verify.sh` — burst + sleep + verify (regression test)
- `config.example.json` — template, copy to `config.json`
@@ -0,0 +1,12 @@
{
"_comment": "Copy to config.json (gitignored) and fill in your SDL keys. Generate them in the SentinelOne console under Singularity Data Lake -> API Keys. log_write_key needs 'Log Write Access'. log_read_key needs 'Log Read Access'. config_read_key needs 'Configuration Read'. config_write_key needs 'Configuration Write'. console_api_token is a regular console user/service-user API token; it works for query and config methods but NOT for uploadLogs (uploadLogs requires a real Log Write key).",
"base_url": "https://xdr.us1.sentinelone.net/",
"log_write_key": "REPLACE_WITH_LOG_WRITE_KEY",
"log_read_key": "REPLACE_WITH_LOG_READ_KEY",
"config_read_key": "REPLACE_WITH_CONFIG_READ_KEY",
"config_write_key": "REPLACE_WITH_CONFIG_WRITE_KEY",
"console_api_token": "REPLACE_WITH_CONSOLE_API_TOKEN_OR_LEAVE_EMPTY",
"s1_scope": "",
"verify_tls": true,
"timeout_seconds": 30
}
+11
View File
@@ -0,0 +1,11 @@
#!/usr/bin/env bash
set -e
cd /tmp/stormshield-verify
echo "============ STEP 1: send burst ============"
python3 send_burst.py
echo
echo "============ STEP 2: wait 40s for ingest ============"
sleep 40
echo
echo "============ STEP 3: query SDL ============"
python3 verify_query.py
+89
View File
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""Send N Stormshield events with current timestamps, varied src IPs/users,
so they appear as a recognizable cluster in the SDL console under
parser="stormshield"."""
import json, time, uuid, urllib.request, urllib.error
from datetime import datetime, timezone, timedelta
CFG = json.load(open("./config.json"))
BASE = CFG["base_url"].rstrip("/")
WRITE_KEY = CFG["log_write_key"]
PARSER = "stormshield"
# A handful of plausible variations
USERS = ["aimee.ndzodo", "luc.martin", "claire.dubois", "fatima.khelifi"]
SRCS = ["10.200.0.82", "10.200.0.91", "10.200.1.14", "10.200.2.55"]
DSTS = [("192.168.10.7","53","dns_udp","53"),
("192.168.10.7","53","dns_udp","53"),
("8.8.8.8","53","dns_udp","53"),
("1.1.1.1","443","https","443")]
ACTIONS = ["pass", "pass", "pass", "block"]
def _local_now():
tz = datetime.now(timezone.utc).astimezone().tzinfo
return datetime.now(tz).replace(microsecond=0)
def _ts(now):
syslog = now.strftime("%Y-%m-%dT%H:%M:%S%z")
syslog = syslog[:-2] + ":" + syslog[-2:]
time_ = now.strftime("%Y-%m-%d %H:%M:%S")
return syslog, time_
def build_line(i):
now = _local_now() + timedelta(seconds=i)
syslog, time_ = _ts(now)
start = (now - timedelta(seconds=120)).strftime("%Y-%m-%d %H:%M:%S")
u, src, (dst, dport, dpname, dportname), act = USERS[i % 4], SRCS[i % 4], DSTS[i % 4], ACTIONS[i % 4]
sport = 50000 + i * 137
return (
f'<14>1 {syslog} stormshield-v.univ-evry.fr asqd - - - '
f'?id=firewall time="{time_}" fw="stormshield-v.univ-evry.fr" '
f'tz=+0200 startime="{start}" pri=5 confid=01 slotlevel=2 ruleid={34+i} '
f'rulename="17209b9db27_{i+1}" user="{u}" domain="ueve.local" '
f'srcif="sslvpn0" srcifname="sslvpn" ipproto=udp dstif="Ethernet1" dstifname="in" '
f'proto={dpname} src={src} srcport={sport} srcportname=ephemeral_fw_udp '
f'dst={dst} dstport={dport} dstportname={dportname} dstname=resolver.example.com '
f'modsrc={src} modsrcport={sport} origdst={dst} origdstport={dport} '
f'ipv=4 sent={80+i*8} rcvd={196+i*16} duration=0.0{i} action={act} logtype="connection"'
)
def send_one(body, idx):
nonce = str(uuid.uuid4())
req = urllib.request.Request(
f"{BASE}/api/uploadLogs",
method="POST",
data=body.encode(),
headers={
"Authorization": f"Bearer {WRITE_KEY}",
"Content-Type": "text/plain",
"parser": PARSER,
"Nonce": nonce,
},
)
try:
with urllib.request.urlopen(req, timeout=30) as r:
print(f"[{idx}] HTTP {r.status} nonce={nonce[:8]}… body=`{body[:90]}...`")
return r.status
except urllib.error.HTTPError as e:
print(f"[{idx}] HTTP {e.code} {e.read().decode()[:120]}")
return e.code
def main():
n = 4
print(f"Sending {n} Stormshield events to {BASE} ...")
for i in range(n):
send_one(build_line(i), i)
time.sleep(1)
print(f"\nDone. Wait ~30-60s, then in https://demo.sentinelone.net search:")
print(f" parser=\"stormshield\"")
print("or run:")
print(f" parser='stormshield' | sort -timestamp | limit 10")
if __name__ == "__main__":
main()
+181
View File
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
End-to-end test that the Stormshield parser is actually applied at ingest by
SentinelOne SDL.
1. POSTs a raw Stormshield syslog line to /api/uploadLogs with `parser: stormshield`.
2. Polls SDL with PowerQuery to find the event we just ingested.
3. Inspects which OCSF fields are populated to confirm SDL parsed it correctly.
Requires: log_write_key + log_read_key in ./config.json (see config.example.json)
"""
from __future__ import annotations
import json, time, uuid, urllib.request, urllib.error, sys, os
CFG_PATH = "./config.json"
PARSER = "stormshield"
SERVER_HOST = f"siemtoolkit-test-{int(time.time())}" # unique tag to find our event back
# Use current timestamps so events show up under "now" in the SDL console.
# The parser extracts `time="..."` as the canonical event timestamp via
# $timestamp=tsPattern$, so we must rewrite that field (not just the syslog
# header) to see the event under recent activity in https://demo.sentinelone.net.
from datetime import datetime, timezone, timedelta
import time as _time
_local_tz = datetime.now(timezone.utc).astimezone().tzinfo
_now = datetime.now(_local_tz).replace(microsecond=0)
_start = _now - timedelta(minutes=2)
SYSLOG_TS = _now.strftime("%Y-%m-%dT%H:%M:%S%z") # 2026-05-22T16:32:00+0200
SYSLOG_TS = SYSLOG_TS[:-2] + ":" + SYSLOG_TS[-2:] # → 2026-05-22T16:32:00+02:00
TIME_TS = _now.strftime("%Y-%m-%d %H:%M:%S")
START_TS = _start.strftime("%Y-%m-%d %H:%M:%S")
TZ_OFFSET = _now.strftime("%z") # +0200
TZ_OFFSET = TZ_OFFSET[:-2] + TZ_OFFSET[-2:] # keep +0200 form
LOG_LINE = (
f'<14>1 {SYSLOG_TS} stormshield-v.univ-evry.fr asqd - - - '
f'?id=firewall time="{TIME_TS}" fw="stormshield-v.univ-evry.fr" '
f'tz={TZ_OFFSET} startime="{START_TS}" pri=5 confid=01 slotlevel=2 ruleid=34 '
'rulename="17209b9db27_4" user="aimee.ndzodo" domain="ueve.local" '
'srcif="sslvpn0" srcifname="sslvpn" ipproto=udp dstif="Ethernet1" dstifname="in" '
'proto=dns_udp src=10.200.0.82 srcport=56637 srcportname=ephemeral_fw_udp '
'dst=192.168.10.7 dstport=53 dstportname=dns_udp dstname=hyperion.univ-evry.fr '
'modsrc=10.200.0.82 modsrcport=56637 origdst=192.168.10.7 origdstport=53 '
'ipv=4 sent=80 rcvd=196 duration=0.00 action=pass logtype="connection"'
)
def _http(method, url, *, headers=None, data=None, timeout=60):
req = urllib.request.Request(url, method=method, headers=headers or {}, data=data)
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, r.read().decode("utf-8", "replace")
except urllib.error.HTTPError as e:
return e.code, e.read().decode("utf-8", "replace")
def main():
with open(CFG_PATH) as f:
cfg = json.load(f)
base = cfg["base_url"].rstrip("/")
write_key = cfg["log_write_key"]
read_key = cfg["log_read_key"]
nonce = str(uuid.uuid4())
headers = {
"Authorization": f"Bearer {write_key}",
"Content-Type": "text/plain",
"parser": PARSER,
"server-host": SERVER_HOST,
"Nonce": nonce,
}
print("=" * 70)
print("STEP 1 — POST /api/uploadLogs")
print("=" * 70)
print(f" url = {base}/api/uploadLogs")
print(f" parser = {PARSER}")
print(f" server_host = {SERVER_HOST}")
print(f" nonce = {nonce}")
print(f" body bytes = {len(LOG_LINE)}")
print(f" embedded ts = time=\"{TIME_TS}\" (parser uses this as event time)")
print(f" log line = {LOG_LINE[:140]}...")
status, body = _http("POST", f"{base}/api/uploadLogs",
headers=headers, data=LOG_LINE.encode())
print(f" -> HTTP {status}")
print(f" -> {body[:300]}")
if status >= 400:
sys.exit(f"uploadLogs failed: {status}")
# ── STEP 3: poll for the event ──────────────────────────────────────
# SDL ingest is typically visible in ~5-30s but can take up to 2 min.
# Note: `server-host` HTTP header is overwritten to "uploadLogs" by SDL,
# and `parser.name` is None on uploadLogs-ingested events. The reliable
# filter is `parser='stormshield' and dataSource.name='Stormshield'`
# constrained by Nonce (echoed back as an attribute) for our exact upload.
query = (
f"parser='{PARSER}' and dataSource.name='Stormshield' "
"| columns timestamp, dataSource.name, parser, "
"src_endpoint.ip, src_endpoint.port, dst_endpoint.ip, dst_endpoint.port, "
"actor.user.name, unmapped.action, unmapped.proto, unmapped.fw, "
"unmapped.rulename, unmapped.duration, message "
"| sort -timestamp | limit 5"
)
print("=" * 70)
print(f"STEP 2 — poll /api/powerQuery (up to 150s)")
print("=" * 70)
print(f" query = {query}\n")
matches: list = []
columns: list = []
deadline = time.time() + 150
waited = 0
while time.time() < deadline:
time.sleep(10); waited += 10
end_ms = int(time.time() * 1000)
start_ms = end_ms - 15 * 60 * 1000
pq_body = {"query": query, "startTime": str(start_ms), "endTime": str(end_ms)}
status, body = _http(
"POST",
f"{base}/api/powerQuery",
headers={"Authorization": f"Bearer {read_key}",
"Content-Type": "application/json"},
data=json.dumps(pq_body).encode(),
)
if status != 200:
print(f" t+{waited:3d}s: HTTP {status}{body[:200]}")
continue
result = json.loads(body)
columns = result.get("columns") or []
values = result.get("values") or []
n = result.get("matchingEvents", len(values))
print(f" t+{waited:3d}s: matchingEvents={n}")
if values:
matches = [{"values": v} for v in values]
break
if not matches:
print("\n No events found after 150s. Either ingest is slow today, "
"or the upload was rejected silently. Inspect upload response above.")
sys.exit(2)
# The response uses a columns/values layout. Discover column order.
columns = result.get("columns") or []
col_names = [c.get("name") if isinstance(c, dict) else str(c) for c in columns]
print(f"\ncolumns: {col_names}")
print(f"matches: {len(matches)}")
print("\n" + "=" * 70)
print("STEP 4 — parse results, check OCSF fields are populated")
print("=" * 70)
EXPECTED = {
"src_endpoint.ip": "10.200.0.82",
"src_endpoint.port": "56637",
"dst_endpoint.ip": "192.168.10.7",
"dst_endpoint.port": "53",
"actor.user.name": "aimee.ndzodo",
}
for i, m in enumerate(matches, 1):
vals = m.get("values") or m
row = dict(zip(col_names, vals)) if isinstance(vals, list) else vals
print(f"\n--- match {i} ---")
for k in col_names:
v = row.get(k)
mark = ""
if k in EXPECTED:
mark = "" if str(v) == EXPECTED[k] else f" ❌ (expected {EXPECTED[k]!r})"
print(f" {k:25s} = {v!r}{mark}")
# Summary
hits = sum(1 for k, want in EXPECTED.items() if str(row.get(k)) == want)
print(f"\n OCSF rewrites populated: {hits}/{len(EXPECTED)}")
if hits == len(EXPECTED):
print(" → SDL parser applied the rewrites correctly. ✅")
else:
print(" → Some rewrites missing — the SDL parser may not have run.")
if __name__ == "__main__":
main()
+69
View File
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""Query SDL to verify recent Stormshield events landed and were parsed."""
import json, time, urllib.request, sys
CFG = json.load(open("./config.json"))
BASE = CFG["base_url"].rstrip("/")
READ_KEY = CFG["log_read_key"]
now_ms = int(time.time() * 1000)
start_ms = now_ms - 15 * 60 * 1000 # last 15 minutes
QUERY = (
"parser='stormshield' "
"| columns timestamp, dataSource.name, parser, "
"src_endpoint.ip, src_endpoint.port, dst_endpoint.ip, dst_endpoint.port, "
"actor.user.name, unmapped.action, unmapped.proto, unmapped.fw, unmapped.rulename "
"| sort -timestamp | limit 10"
)
body = json.dumps({
"query": QUERY,
"startTime": str(start_ms),
"endTime": str(now_ms),
}).encode()
req = urllib.request.Request(
f"{BASE}/api/powerQuery",
method="POST",
data=body,
headers={
"Authorization": f"Bearer {READ_KEY}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=60) as r:
resp = json.loads(r.read())
cols = [c["name"] for c in resp.get("columns", [])]
values = resp.get("values", [])
total = resp.get("matchingEvents", len(values))
print(f"query = {QUERY}")
print(f"window = last 15 min")
print(f"matchingEvents = {total}")
print(f"cols = {cols}")
print()
if not values:
print("No events visible yet. SDL ingest can take 30-90s; re-run verify_query.py in a minute.")
sys.exit(1)
print(f"{'timestamp(ns)':>20} {'src':<16} {'sport':<6} -> {'dst':<16} {'dport':<6} {'user':<20} {'action':<8} {'proto':<8}")
print("-" * 110)
for row in values:
d = dict(zip(cols, row))
print(
f"{d.get('timestamp',''):>20} "
f"{str(d.get('src_endpoint.ip','')):<16} "
f"{str(d.get('src_endpoint.port','')):<6} -> "
f"{str(d.get('dst_endpoint.ip','')):<16} "
f"{str(d.get('dst_endpoint.port','')):<6} "
f"{str(d.get('actor.user.name','')):<20} "
f"{str(d.get('unmapped.action','')):<8} "
f"{str(d.get('unmapped.proto','')):<8}"
)
print()
print("✅ Events are visible in the SDL data lake under parser='stormshield'")
print(" Search in https://demo.sentinelone.net with: parser=\"stormshield\"")