6 Commits

Author SHA1 Message Date
marc d6d0faf218 Add Stormshield ingest verifier
End-to-end regression test for the SDL Stormshield parser:
- test.py        single upload + 150s polling verifier
- send_burst.py  4 varied events (different users, IPs, actions) with current timestamps
- verify_query.py  query last 15 min of stormshield events
- run_and_verify.sh  burst + 40s wait + verify
- config.example.json  template (config.json is gitignored)
- README.md     setup, run, behaviour-quirks docs

Use against a real SDL tenant after deploying parsers/stormshield. Confirms
parser='stormshield', dataSource.name='Stormshield', and the 5 OCSF rewrites
(src_endpoint.ip/port, dst_endpoint.ip/port, actor.user.name).
2026-05-22 17:06:08 +02:00
Marc Chisinevski 12fec66d9a Update README.md 2026-05-22 16:17:44 +02:00
marc a9dcf48e65 Snapshot 95 demo-tenant parsers (incl. stormshield) + un-ignore parsers/
The original upstream gitignores parsers/* on the assumption that each tenant
has its own set. This fork commits a working snapshot so the Parser Test Runner
and Parser Coverage features are usable out of the box.

Stormshield parser exercises the new SDL key=value scanner, pattern references,
and JS-style unquoted format keys added to backend/routers/quality.py.
2026-05-22 14:11:56 +02:00
Marc Chisinevski 1e61fa9814 Update README.md 2026-05-22 13:58:13 +02:00
marc d1d92d3967 Stormshield/F5/WatchGuard parser test fix + SDL KV-scanner support
- Format & rewrite extractors now accept JS-style unquoted keys
- Resolve $var=PatternName$ against parser's patterns: {} block
- Implement SDL key=value scanner ($_$=$prefix._$ + repeat: true)
- Apply rewrites across union of fields from all formats
- Fix $0/$N backref translation; remove shadowing of _to_py_backref
2026-05-22 13:45:23 +02:00
marc 79efb6bf7d v0.1 Mick Marc merged 2026-05-20 23:44:53 +02:00
18 changed files with 667 additions and 3914 deletions
+5
View File
@@ -22,3 +22,8 @@ SDL_LOG_READ_KEY=
# Anthropic (for Onboarding Accelerator AI features) # Anthropic (for Onboarding Accelerator AI features)
# ─ https://console.anthropic.com/settings/api-keys # ─ https://console.anthropic.com/settings/api-keys
ANTHROPIC_API_KEY= ANTHROPIC_API_KEY=
# SDL Configuration Read key — used by /api/quality/sync-from-sdl to
# download parser files from /logParsers/ on the SDL tenant.
# Generate in S1 console: Settings -> Integrations -> Data Lake API Keys (Configuration Read scope).
SDL_CONFIG_READ_KEY=
+125 -153
View File
@@ -2,7 +2,7 @@
> *Inspired by Pineapple Boy!* 🍍 > *Inspired by Pineapple Boy!* 🍍
A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps engineers. Runs as a Docker Compose stack against your SentinelOne demo or production tenant and provides real-time insight into parser coverage, detection library mapping, ingest volume, and data quality — all without leaving a single interface. A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps engineers. Runs as a Docker Compose stack against your SentinelOne demo or production tenant and provides real-time insight into parser coverage, ingest volume, and data quality — all without leaving a single interface.
--- ---
@@ -10,11 +10,10 @@ A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps
| Page | Purpose | | Page | Purpose |
|---|---| |---|---|
| **Overview** | Live health stats — coverage %, active sources, top uncovered sources by volume | | **Overview** | Live health stats — coverage percentage, active sources, top uncovered sources by volume |
| **Parser Coverage Map** | Which active data sources have a parser? Detection rule mapping per source. Unlabelled event detection. | | **Parser Coverage Map** | Which active data sources have a parser? Which don't? |
| **Ingest Dashboard** | Event volume, top sources, cost projection, filter simulator | | **Ingest Dashboard** | Event volume, top sources, cost projection, filter simulator |
| **Parser Quality** | Live event sampler, field population rate, parser test runner, attributes missing audit | | **Parser Quality** | Live event sampler, field population rate, parser test runner |
| **Threat Coverage** | MITRE ATT&CK heatmap across all detection library rules, rule firing status (active vs never-fired) |
| **Onboarding Accelerator** | Prompt template for onboarding new log sources with Claude Code | | **Onboarding Accelerator** | Prompt template for onboarding new log sources with Claude Code |
| **Settings** | Manage your `.env` credentials directly from the interface | | **Settings** | Manage your `.env` credentials directly from the interface |
@@ -28,15 +27,13 @@ browser → nginx (port 3001) → single-page HTML/JS application
FastAPI backend (port 8001) FastAPI backend (port 8001)
┌───────────────────────────┐ ┌───────────────────────────┐
│ PostgreSQL (SQLAlchemy) │ rules, parser fields, active sources, │ PostgreSQL (SQLAlchemy) │ parser fields, active sources
│ │ firing cache, coverage snapshots
└───────────────────────────┘ └───────────────────────────┘
┌───────────────────────────┐ ┌───────────────────────────┐
│ SentinelOne APIs │ │ SentinelOne APIs │
│ • Management API v2.1 STAR rules, detection library, platform rules │ • Management API
│ • Scalyr XDR PowerQuery │ live event queries, source volumes │ • XDR PowerQuery │
│ • SDL Config File API │ parser file sync (/logParsers/)
└───────────────────────────┘ └───────────────────────────┘
``` ```
@@ -49,57 +46,48 @@ All services run via Docker Compose. The `parsers/` directory is volume-mounted
### 1. Clone and Configure ### 1. Clone and Configure
```bash ```bash
git clone https://github.com/mickbrowns1/SIEM-Toolkit.git git clone
cd SIEM-Toolkit cd SIEM-Toolkit-patched
cp .env.example .env cp .env.example .env
``` ```
Edit `.env` with your credentials: Edit `.env` with your credentials:
```env ```env
S1_BASE_URL=https://demo.sentinelone.net # Your console URL S1_BASE_URL= # Your console URL
S1_API_TOKEN=eyJ... # Service user API token (account or site scope) S1_API_TOKEN=... # Service user API token (account scope or higher)
SDL_XDR_URL=https://xdr.us1.sentinelone.net # Scalyr XDR endpoint SDL_XDR_URL= # XDR endpoint
SDL_LOG_READ_KEY=1j2IU0S... # Data Lake read key (query events) SDL_LOG_READ_KEY= # Data Lake read key
SDL_CONFIG_READ_KEY=... # Data Lake config key (sync parser files)
SDL_PQ_TIMEOUT=600 # PowerQuery timeout in seconds (default: 600)
SDL_PQ_TIMEOUT_RETRIES=1 # Retries on timeout (default: 1)
ANTHROPIC_API_KEY= # Optional — not currently used ANTHROPIC_API_KEY= # Optional — not currently used
# SDL Configuration Read key — used by /api/quality/sync-from-sdl to
# download parser files from /logParsers/ on the SDL tenant.
# Generate in S1 console: Settings -> Integrations -> Data Lake API Keys (Configuration Read scope).
SDL_CONFIG_READ_KEY=
``` ```
**S1_API_TOKEN** — generate at *Settings → Users → Service Users*. Account scope gives broadest access; site scope works for most features with some limitations. **S1_API_TOKEN** — generate at *Settings → Users → Service Users* in the console.
Ideally, the service user API token must be at **account scope** or higher. Site-scoped tokens will have limited visibility into rules and may see reduced source counts.
**SDL_LOG_READ_KEY** — found at *Settings → Integrations → Data Lake API Keys → Log Read*. **SDL_LOG_READ_KEY**
**SDL_CONFIG_READ_KEY** — found at *Settings → Integrations → Data Lake API Keys → Configuration Read*. Required to sync parser files directly from SDL via the Coverage Map. Without it, you can still load parser files manually from the `parsers/` directory.
### 2. Start the Stack
### 2. Add Parser Files
Place your SDL parser JSON files into the `parsers/` directory. The backend reads them directly at query time — no rebuild is necessary.
```bash ```bash
docker compose up -d --build cp ~/my-parsers/*.json parsers/
```
### 3. Start the Stack
```bash
docker-compose up -d --build
``` ```
Open **http://localhost:3001** in your browser and you're off. Open **http://localhost:3001** in your browser and you're off.
### 3. First Run — Sync Everything
Click **Sync All** on the Parser Coverage Map. This runs three steps in sequence:
1. **Sync SDL Parsers** — downloads all `/logParsers/` parser files from your SDL tenant into the `parsers/` volume (requires `SDL_CONFIG_READ_KEY`)
2. **Sync Detection Library** — imports all platform detection rules from the S1 API, including MITRE ATT&CK tactic/technique mappings and per-rule alert counts
3. **Sync Live Sources** — queries the data lake for every `dataSource.name` active in the last 7 days
### 4. Detection Library (alternative: local file)
If the live API import fails (e.g. token scope is too narrow), the toolkit falls back to a local `detections.json` generated from the [detection-validator](https://github.com/mickbrowns1/detection-validator) repository:
```bash
mkdir -p data
cp /path/to/detection-validator/data/detections/extracted.json data/detections.json
```
The `data/` directory is gitignored and never committed.
--- ---
## Features ## Features
@@ -118,44 +106,29 @@ If any sources are uncovered, the **Top Sources Needing a Parser** table lists t
### Parser Coverage Map ### Parser Coverage Map
Answers the question: *does each active data source have a parser running, and is it covered by detection rules?* Answers the question: *does each active data source have a parser running?*
#### Syncing **How it works:**
- **Sync All** — runs all three sync operations in sequence (SDL parsers → detection library → live sources) with one click 1. **Sync Live Sources**executes a PowerQuery against your data lake to retrieve every `dataSource.name` seen in the last 7 days, along with event counts.
- **Sync SDL Parsers** — downloads parser files from `/logParsers/` on your SDL tenant via the Config File API 2. **Load SDL Parsers**reads parser files from `parsers/`, extracts the `dataSource.name` attribute from each, and stores the field list in the database.
- **Sync Detection Library** — imports platform rules from the S1 API with MITRE mappings and alert counts
- **Sync Live Sources** — queries the data lake for active `dataSource.name` values and event counts
#### Matching Logic (three-tier)
**Matching logic (three-tier):**
1. Exact `dataSource.name` match between the active source and the parser attribute 1. Exact `dataSource.name` match between the active source and the parser attribute
2. Normalised substring match (ignores spaces, dashes, case) between active source name and parser `dataSource.name` 2. Normalised substring match (ignores spaces, dashes, and case) between the active source name and the parser's `dataSource.name`
3. Normalised substring match against the parser filename 3. Normalised substring match against the parser filename — catches files where the `dataSource.name` attribute is incorrect or missing
#### Parser Detection from Data **Parser detection from data:** During sync, a parallel PowerQuery checks whether each source has events with `event.type` populated in the data lake. If so, a parser is confirmed as running — the source is marked **Covered** even without a local parser file. This handles built-in and cloud-managed parsers that are not present in your `parsers/` folder.
During sync, a parallel PowerQuery checks whether each source has events with `event.type` populated in the data lake. If so, a parser is confirmed running — the source is marked **Covered** even without a local parser file. This handles built-in and cloud-managed parsers not present in `parsers/`. **Status values:**
- 🟢 **Covered** — custom parser confirmed (local file or detected via parsed events in the data lake)
- 🔴 **Parser Needed** — no parser found, or only a grok/dottedJson format (which typically indicates an incomplete parser)
#### Status Values **Filters:** Use the filter pills to focus on Custom Parser only, Default Parser Only (data lake detected), or No Parser.
- 🟢 **Covered** — parser confirmed (local file or detected via parsed fields in the data lake) **Deep link:** Click any source name in the table to open it directly in Parser Quality with all dropdowns pre-populated.
- 🟡 **Incomplete Parser** — parser file exists but is missing `dataSource.name` attribute
- 🔴 **Parser Needed** — no parser found, or only a grok/dottedJson format
#### Filter Pills **Expected results:** After syncing sources and loading parsers, sources with active SDL parsers will appear as Covered. Sources sending raw, unparsed data — where only `message` and `timestamp` appear in the data lake — will appear as Parser Needed.
- **All** — show every source
- **Complete Parser** — sources with a working custom or detected parser
- **Attributes Missing** — sources whose parser file lacks `dataSource.name`
#### Detections Column
Each source row shows how many detection library rules target it, with close-match suggestions when the `dataSource.name` doesn't align exactly with the library's naming. Once the **Rule Firing Status** cache is populated (via Threat Coverage page), each rule badge also shows its alert count — rules that have never fired are highlighted in amber (⚠).
#### Unlabelled Events Banner
A banner at the bottom of the coverage map lets you sample events that arrived with no `dataSource.name` — these are events whose parser is missing the `dataSource.name` attribute. Click **Sample Events** to run the query; the time window matches the Sync Live Sources period.
--- ---
@@ -163,88 +136,79 @@ A banner at the bottom of the coverage map lets you sample events that arrived w
Answers the question: *where is my event volume coming from, and what would happen if I filtered some of it?* Answers the question: *where is my event volume coming from, and what would happen if I filtered some of it?*
**Time range:** 1h, 3d, 5d, 7d **Time range:** 1h (default), 3d, 5d, 7d
**Daily Event Volume** — bar chart of total events per day. **Daily Event Volume** — bar chart of total events per day. In 1h mode, this switches to a by-source breakdown of the current hour's activity.
**Top Sources** — the 25 highest-volume `dataSource.name` values with event count and estimated GB (at 0.5 GB per million events). **Top Sources** a table of the 25 highest-volume `dataSource.name` values with event count and estimated GB (calculated at 0.5 GB per million events).
**Filter Simulator** — enter a source name and an optional event type, then press Simulate. The backend runs a live PowerQuery counting matching events and projects matched events, estimated GB saved, and projected monthly figures. Entirely read-only — no filter is created or applied. **Filter Simulator** — enter a source name and an optional event type, then press Simulate. The backend runs a live PowerQuery counting matching events and projects:
- Matched events in the selected period
- Estimated GB that would be saved
- Projected monthly events and GB if the filter were applied permanently
This is entirely read-only — no filter is created or applied. Use the results to inform an exclusion rule you apply manually in the console.
**Expected results:** Top sources should reflect what you see in the SentinelOne console PowerQuery tool. The filter simulator provides a reasonable GB estimate assuming uniform event size across the source.
--- ---
### Parser Quality ### Parser Quality
Four tools for diagnosing and auditing parser health. Three tools for diagnosing parser extraction failures.
#### Live Event Sampler #### Live Event Sampler
Pulls raw events from a selected source directly from the data lake and renders every field that came back. Empty fields display as `∅` in grey — immediately highlighting fields the parser is failing to populate. The `message` column is pinned to the right with a **⎘ copy** button on each row. Pulls raw events from a selected source directly from the data lake and renders every field that came back. The `message` column is pinned to the right of the table, with a **⎘ copy** button on each row for convenient extraction of raw log lines.
#### Unlabelled Event Sampler - **Empty fields** are displayed as `∅` in grey — immediately highlighting fields the parser is failing to populate
- **Healthy source:** many fields populated (`src.ip`, `user.name`, `event.type`, etc.), with `message` present as the raw log backup
Samples events that have *no* `dataSource.name` — events the SDL received but couldn't attribute to any parser. Uses the filter expression `!(dataSource.name = *) !(source = 'scalyr')` to eliminate internal SDL noise. Returns a sample plus a count of how many such events exist in the time window. - **Unhealthy source:** only `timestamp` and `message` populated — the parser is not extracting anything of value
#### Field Population Rate #### Field Population Rate
Samples up to 500 events from a source and measures what percentage have each field populated, sorted worst-first. Samples up to 500 events from a source and measures what percentage of them have each field populated. Results are sorted worst-first so the most pressing gaps are immediately visible.
When you select a source, the tool automatically discovers which fields exist in that source's events and pre-fills the field list — merged with SDL schema defaults. The list is fully editable before running the analysis.
**Colour coding:**
- 🟢 ≥ 80% — healthy extraction - 🟢 ≥ 80% — healthy extraction
- 🟡 4079% — partial; check regex patterns - 🟡 4079% — partial extraction; check your regex patterns
- 🔴 < 40% — rarely populated; parser likely not matching this log format variant - 🔴 < 40% — field is rarely populated; the parser is likely not matching this log format variant
**Healthy parser:** Key fields such as `src.ip`, `event.type`, and `user.name` should sit between 70100%. Niche fields like `src.process.cmdline` or `tgt.file.path` will naturally be lower, as not every event type produces them.
**Broken parser:** All SDL fields at 0%, with only `timestamp` and `message` visible in the "fields seen in sample" chip list at the bottom of the results.
#### Parser Test Runner #### Parser Test Runner
Paste a raw log line, select a loaded parser, and press Test. Supports: Paste a raw log line, select a loaded parser, and press Test. The backend extracts SDL `$field=pattern$` format strings from the parser file, converts them to Python named-group regular expressions, and tries each against your log line.
- **Regex parsers** — extracts SDL `$field=pattern$` format strings and matches against your log line
- **JSON parsers** — parses JSON input directly, flattens to dotted keys, and applies any `input/output/match/replace` rewrite rules
- **NDJSON** — multiple JSON objects separated by newlines
#### Attributes Missing - **Matched:** displays the format string that matched and every field extracted with its value
- **No match:** none of the parser's format strings apply to this log line — the log may contain a format variant the parser does not yet cover
A sub-section listing all parser files in the `parsers/` directory that have a `formats:` section but no `dataSource.name` attribute. These parsers are loaded into SDL but won't attach a source label to events they process — surfaced here regardless of whether they have active traffic. > **Note:** Only parsers using SDL custom format strings are supported by the test runner. Grok and dottedJson parsers are not currently testable here.
---
### Threat Coverage
Two views for understanding detection effectiveness across your estate.
#### MITRE ATT&CK Heatmap
Shows which MITRE ATT&CK tactics and techniques are covered by your detection library. Rules are imported from the S1 platform-rules API, which returns structured MITRE metadata per rule.
- **Tactic cards** — ordered by ATT&CK kill chain (Reconnaissance → Impact), colour-coded by rule count
- **Technique chips** — each technique ID and name within a tactic; expands to show all if > 12
- **Stats** — Total Library Rules, Rules with MITRE Mapping, Tactics Covered, Techniques Covered
Click **Sync Detection Library** to re-import rules and refresh MITRE data.
#### Rule Firing Status
Shows which detection rules have actually triggered alerts — and which have never fired.
Click **Sync Alert Firing Status**. The backend reads `generatedAlerts` directly from the platform-rules API data stored during the last Detection Library sync — no SDL PowerQuery needed. Results are cached in the database.
- **Active** (green) — rule has fired at least once in the monitored period
- **Silent** (amber) — rule has never fired; may be misconfigured or require a data source not yet active
The Coverage Map Detections column also reflects this data — fired rule counts appear inline on each source row.
--- ---
### Onboarding Accelerator ### Onboarding Accelerator
A prompt template for using Claude Code to onboard a new log source. Copy the template, paste sample raw log lines, and Claude Code will generate an SDL parser skeleton with field mappings and test assertions. No Anthropic API key required. A prompt template for using Claude Code to onboard a new log source. Copy the template, paste a sample of raw log lines, and Claude Code will generate:
- An SDL parser skeleton in augmented-JSON format
- Field mappings to the SDL common schema
- Parser test assertions
No Anthropic API key is required — this uses Claude Code directly from your terminal.
--- ---
### Settings ### Settings
Read and write your `.env` credentials from the interface. Secret fields are masked by default with a show/hide toggle. Changes are written to the mounted `.env` file and take effect after restarting the backend: Read and write your `.env` credentials from the interface. Secret fields (API tokens, keys) are masked by default with a show/hide toggle. Changes are written to the mounted `.env` file and take effect after restarting the backend:
```bash ```bash
docker compose up -d --build backend docker-compose up -d --build backend
``` ```
--- ---
@@ -253,15 +217,15 @@ docker compose up -d --build backend
```bash ```bash
# Full rebuild # Full rebuild
docker compose up -d --build docker-compose up -d --build
# Backend only (after Python changes) # Backend only (after Python changes)
docker compose build backend && docker compose up -d backend docker-compose up -d --build backend
# Frontend only (after HTML/JS changes) # Frontend only (after HTML/JS changes)
docker compose build frontend && docker compose up -d frontend docker-compose up -d --build frontend
# Reset the database (clears all synced data) # Reset the database
curl -X DELETE http://localhost:8001/api/coverage/reset curl -X DELETE http://localhost:8001/api/coverage/reset
``` ```
@@ -272,23 +236,21 @@ curl -X DELETE http://localhost:8001/api/coverage/reset
``` ```
. .
├── backend/ ├── backend/
│ ├── main.py # FastAPI app, router registration, startup migrations │ ├── main.py # FastAPI application, router registration
│ ├── db.py # SQLAlchemy models (ParsedRule, ActiveSource, │ ├── db.py # SQLAlchemy models
│ │ # ParserField, RuleFiringCache, IngestSnapshot)
│ ├── routers/ │ ├── routers/
│ │ ├── coverage.py # Coverage map, MITRE heatmap, firing status, SDL sync │ │ ├── coverage.py # Parser coverage map endpoints
│ │ ├── ingest.py # Ingest dashboard, filter simulator │ │ ├── ingest.py # Ingest dashboard + filter simulator
│ │ ├── quality.py # Parser quality tools, unlabelled event sampler │ │ ├── quality.py # Parser quality tools
│ │ └── settings.py # .env read/write │ │ └── settings.py # .env read/write
│ └── services/ │ └── services/
│ ├── s1_client.py # SentinelOne Management API + Scalyr PowerQuery client │ ├── s1_client.py # SentinelOne + Scalyr API client
│ └── rule_parser.py # SDL format string field extraction │ └── rule_parser.py # SDL format string field extraction
├── frontend/ ├── frontend/
│ └── index.html # Single-page application (Tailwind, vanilla JS) │ └── index.html # Single-page application (Tailwind, vanilla JS)
├── parsers/ # SDL parser files (volume-mounted, gitignored) ├── parsers/ # SDL parser files (volume-mounted)
├── data/ # detections.json fallback (gitignored)
├── db/ ├── db/
│ └── init.sql # Postgres initialisation │ └── init.sql # Postgres initialisation (tables created by SQLAlchemy)
├── docker-compose.yml ├── docker-compose.yml
├── .env.example ├── .env.example
└── README.md └── README.md
@@ -296,25 +258,35 @@ curl -X DELETE http://localhost:8001/api/coverage/reset
--- ---
## Environment Variables Reference ```
Nothing pushes parsers to the SDL tenant
The data flow is strictly one-way: SDL tenant → local disk.
| Variable | Required | Description | What actually happens
|---|---|---| ┌──────────────────┐ GET /api/listFiles/logParsers/ ┌──────────────────┐
| `S1_BASE_URL` | ✅ | SentinelOne console URL (e.g. `https://demo.sentinelone.net`) | │ SDL tenant │ ───────────────────────────────────▶ │ tools/sync_sdl_ │
| `S1_API_TOKEN` | ✅ | Service user API token — account scope recommended | │ │ GET /api/getFile/logParsers/... │ parsers.py │
| `SDL_XDR_URL` | ✅ | Scalyr XDR endpoint (e.g. `https://xdr.us1.sentinelone.net`) | └──────────────────┘ └────────┬─────────┘
| `SDL_LOG_READ_KEY` | ✅ | Data Lake log read key — for PowerQuery event queries | │ writes
| `SDL_CONFIG_READ_KEY` | ⚪ | Data Lake config read key — for SDL parser file sync |
| `SDL_PQ_TIMEOUT` | ⚪ | PowerQuery read timeout in seconds (default: `600`) | ./parsers/<name>
| `SDL_PQ_TIMEOUT_RETRIES` | ⚪ | Extra retries on timeout (default: `1`) |
| `ANTHROPIC_API_KEY` | ⚪ | Not currently used | │ bind-mount
/app/parsers (in container)
│ read-only
┌──────────────────────────────────┐
│ POST /api/quality/test-parser │
│ POST /api/quality/sync-from-sdl │
│ GET /api/quality/parsers │
└──────────────────────────────────┘
--- Endpoint / What it really does
Sync from SDL (POST /api/quality/sync-from-sdl) Downloads parsers from the tenant into /app/parsers/
Load SDL Parsers (UI button) Just re-indexes whatever files already exist in /app/parsers/
Test Parser (POST /api/quality/test-parser) Runs the parser logic locally in Python; tenant never touched
tools/sync_sdl_parsers.py (helper) Downloads parsers; never uploads
```
## Notes
- Parser files in `parsers/` are read at query time — add or update files without rebuilding.
- The filter simulator is entirely read-only and makes no changes to your tenant.
- `SDL_CONFIG_READ_KEY` requires the *Manage config files* permission in the console. Without it, Sync SDL Parsers is skipped but all other features remain available.
- Site-scoped tokens work for most features. Account-scoped tokens are needed for the detection library API and provide broader source visibility.
- The `parsers/` directory is gitignored except for specific tracked parser files. SDL dashboard and saved-search files downloaded during sync are intentionally not committed.
+1 -28
View File
@@ -1,5 +1,5 @@
import os import os
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Boolean from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime from datetime import datetime
@@ -37,7 +37,6 @@ class ActiveSource(Base):
event_count = Column(Integer, default=0) event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow) synced_at = Column(DateTime, default=datetime.utcnow)
parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
unlabelled = Column(Boolean, default=False) # True = events had no dataSource.name
class IngestSnapshot(Base): class IngestSnapshot(Base):
@@ -48,32 +47,6 @@ class IngestSnapshot(Base):
recorded_at = Column(DateTime, default=datetime.utcnow) recorded_at = Column(DateTime, default=datetime.utcnow)
class RuleFiringCache(Base):
__tablename__ = "rule_firing_cache"
id = Column(Integer, primary_key=True)
rule_name = Column(String, unique=True, index=True)
alert_count = Column(Integer, default=0)
period_days = Column(Integer, default=30)
checked_at = Column(DateTime, default=datetime.utcnow)
class CoverageSnapshot(Base):
__tablename__ = "coverage_snapshots"
id = Column(Integer, primary_key=True)
recorded_at = Column(DateTime, default=datetime.utcnow, index=True)
health_score = Column(Float, default=0.0)
parser_pct = Column(Float, default=0.0) # % sources with working parser
mitre_pct = Column(Float, default=0.0) # % ATT&CK tactics covered
firing_pct = Column(Float, default=0.0) # % rules that have fired
active_sources = Column(Integer, default=0)
covered_sources = Column(Integer, default=0)
rules_loaded = Column(Integer, default=0)
tactics_covered = Column(Integer, default=0)
techniques_covered = Column(Integer, default=0)
rules_with_mitre = Column(Integer, default=0)
rules_fired = Column(Integer, default=0)
def get_db(): def get_db():
db = SessionLocal() db = SessionLocal()
try: try:
+2 -40
View File
@@ -1,7 +1,7 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from db import engine, Base, get_db, ParsedRule, RuleFiringCache, CoverageSnapshot from db import engine, Base, get_db, ParsedRule
from routers import coverage, ingest, settings, quality, query from routers import coverage, ingest, settings, quality
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
@@ -11,48 +11,11 @@ with engine.connect() as _conn:
_conn.execute(text( _conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0" "ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
)) ))
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS unlabelled BOOLEAN DEFAULT FALSE"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS rule_firing_cache ("
"id SERIAL PRIMARY KEY, "
"rule_name VARCHAR UNIQUE, "
"alert_count INTEGER DEFAULT 0, "
"period_days INTEGER DEFAULT 30, "
"checked_at TIMESTAMP"
")"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS coverage_snapshots ("
"id SERIAL PRIMARY KEY, "
"recorded_at TIMESTAMP, "
"health_score FLOAT DEFAULT 0, "
"parser_pct FLOAT DEFAULT 0, "
"mitre_pct FLOAT DEFAULT 0, "
"firing_pct FLOAT DEFAULT 0, "
"active_sources INTEGER DEFAULT 0, "
"covered_sources INTEGER DEFAULT 0, "
"rules_loaded INTEGER DEFAULT 0, "
"tactics_covered INTEGER DEFAULT 0, "
"techniques_covered INTEGER DEFAULT 0, "
"rules_with_mitre INTEGER DEFAULT 0, "
"rules_fired INTEGER DEFAULT 0"
")"
))
_conn.commit() _conn.commit()
app = FastAPI(title="SIEM Toolkit", version="1.0.0") app = FastAPI(title="SIEM Toolkit", version="1.0.0")
@app.on_event("startup")
async def start_ingest_prewarmer():
"""Start optional background pre-warmer for the Ingest Dashboard cache.
Opt-in via INGEST_PREWARM=1. See backend/services/prewarmer.py."""
from services import prewarmer
prewarmer.start_if_enabled()
@app.on_event("startup") @app.on_event("startup")
async def auto_load_detections(): async def auto_load_detections():
""" """
@@ -98,7 +61,6 @@ app.include_router(coverage.router, prefix="/api/coverage", tags=["Coverage"])
app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"]) app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(quality.router, prefix="/api/quality", tags=["Quality"]) app.include_router(quality.router, prefix="/api/quality", tags=["Quality"])
app.include_router(query.router, prefix="/api/query", tags=["Query"])
@app.get("/health") @app.get("/health")
+20 -787
View File
@@ -4,7 +4,7 @@ from fastapi import APIRouter, UploadFile, File, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from datetime import datetime from datetime import datetime
from db import get_db, ParsedRule, ParserField, ActiveSource, RuleFiringCache, CoverageSnapshot from db import get_db, ParsedRule, ParserField, ActiveSource
from services import s1_client, rule_parser from services import s1_client, rule_parser
DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json") DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
@@ -12,75 +12,6 @@ DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
router = APIRouter() router = APIRouter()
def _extract_mitre(rule: dict) -> tuple[list[str], list[dict]]:
"""Extract (tactics, techniques) from a raw S1 rule dict.
Primary format (platform-rules API):
rule["mitre"] = [
{"tactic": "Execution", "techniques": [{"id": "T1204", "title": "User Execution"}]},
...
]
Falls back to flat field names used by older API versions / STAR rules.
"""
tactics: list[str] = []
techniques: list[dict] = []
# ── Primary: structured mitre array (platform-rules API) ──────────────────
mitre_list = rule.get("mitre")
if isinstance(mitre_list, list):
for item in mitre_list:
if not isinstance(item, dict):
continue
tac = item.get("tactic")
if isinstance(tac, str) and tac.strip():
tactics.append(tac.strip())
for tech in item.get("techniques", []):
if isinstance(tech, dict):
tid = str(tech.get("id", "") or "").strip()
tname = str(tech.get("title") or tech.get("name") or tid).strip()
if tid or tname:
techniques.append({"id": tid, "name": tname})
# ── Fallback: flat field names (STAR rules / older API versions) ──────────
if not tactics:
for key in ("tactic", "tactics", "mitreTactic", "mitreTactics"):
val = rule.get(key)
if isinstance(val, str) and val:
tactics.extend(v.strip() for v in val.split(",") if v.strip())
elif isinstance(val, list):
for v in val:
if isinstance(v, str) and v:
tactics.append(v.strip())
elif isinstance(v, dict):
n = v.get("name") or v.get("tactic") or ""
if n:
tactics.append(n.strip())
if not techniques:
for key in ("technique", "techniques", "mitreTechnique", "mitreTechniques", "mitreAttack"):
val = rule.get(key)
if isinstance(val, list):
for v in val:
if isinstance(v, str) and v.strip():
techniques.append({"id": v.strip(), "name": v.strip()})
elif isinstance(v, dict):
tid = str(v.get("id") or v.get("techniqueId") or "").strip()
tname = str(v.get("name") or v.get("title") or v.get("technique") or tid).strip()
if tid or tname:
techniques.append({"id": tid, "name": tname})
# Deduplicate
seen_ids: set = set()
unique_techniques = []
for t in techniques:
key_t = t["id"] or t["name"]
if key_t not in seen_ids:
seen_ids.add(key_t)
unique_techniques.append(t)
return list(dict.fromkeys(tactics)), unique_techniques
def _star_query_texts(rule: dict) -> list[str]: def _star_query_texts(rule: dict) -> list[str]:
""" """
Extract all PowerQuery/filter strings from a STAR rule. Extract all PowerQuery/filter strings from a STAR rule.
@@ -163,20 +94,12 @@ def _import_from_api_rules(db, rules: list) -> int:
seen_ids.add(rule_id) seen_ids.add(rule_id)
sources = rule.get("sources") or [] sources = rule.get("sources") or []
tactics, techniques = _extract_mitre(rule)
# generatedAlerts is returned directly by the platform-rules API
generated_alerts = rule.get("generatedAlerts")
db.add(ParsedRule( db.add(ParsedRule(
rule_id=rule_id, rule_id=rule_id,
name=rule.get("name", "unnamed"), name=rule.get("name", "unnamed"),
rule_type="library", rule_type="library",
fields_used=[], # API rules don't expose field-level info fields_used=[], # API rules don't expose field-level info
raw=json.dumps({ raw=json.dumps({"data_sources": sources}),
"data_sources": sources,
"tactics": tactics,
"techniques": techniques,
"generated_alerts": generated_alerts,
}),
)) ))
loaded += 1 loaded += 1
if loaded % 500 == 0: if loaded % 500 == 0:
@@ -219,17 +142,12 @@ def _import_detections(db, detections_file: str) -> int:
continue continue
seen_ids.add(rule_id) seen_ids.add(rule_id)
tactics, techniques = _extract_mitre(rule)
db.add(ParsedRule( db.add(ParsedRule(
rule_id=rule_id, rule_id=rule_id,
name=rule.get("name", "unnamed"), name=rule.get("name", "unnamed"),
rule_type="library", rule_type="library",
fields_used=list(all_fields), fields_used=list(all_fields),
raw=json.dumps({ raw=json.dumps({"data_sources": list(set(data_sources))}),
"data_sources": list(set(data_sources)),
"tactics": tactics,
"techniques": techniques,
}),
)) ))
loaded += 1 loaded += 1
if loaded % 500 == 0: if loaded % 500 == 0:
@@ -289,109 +207,16 @@ async def upload_sigma(files: list[UploadFile] = File(...), db: Session = Depend
return {"loaded": len(loaded), "rules": loaded} return {"loaded": len(loaded), "rules": loaded}
def _fetch_parsers_from_console(parsers_dir: str) -> dict:
"""
Fetch every parser under /logParsers/ from the SDL console and write them
to parsers_dir. Uses SDL_CONFIG_READ_KEY (needs 'Manage config files' permission)
and SDL_XDR_URL from the environment.
Returns {"fetched": N, "failed": [...], "skipped": reason_or_None}
"""
import urllib.request, urllib.error, json as _json, os as _os
# Read live from .env file so Settings-page saves are picked up without restart
def _env_val(key: str) -> str:
val = _os.environ.get(key, "")
if not val:
env_path = _os.environ.get("ENV_FILE_PATH", "/app/.env")
try:
for line in open(env_path).read().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
if k.strip() == key:
val = v.strip()
break
except Exception:
pass
return val
config_key = _env_val("SDL_CONFIG_READ_KEY")
base_url = _env_val("SDL_XDR_URL").rstrip("/")
if not config_key:
return {"fetched": 0, "failed": [], "skipped": "SDL_CONFIG_READ_KEY not set"}
if not base_url:
return {"fetched": 0, "failed": [], "skipped": "SDL_XDR_URL not set"}
def _post(path: str, params: dict) -> dict:
url = f"{base_url}{path}"
body = _json.dumps({**params, "token": config_key}).encode()
req = urllib.request.Request(url, data=body, headers={
"Authorization": f"Bearer {config_key}",
"Content-Type": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return _json.loads(r.read())
except urllib.error.HTTPError as e:
err_body = e.read().decode(errors="replace")[:300]
raise RuntimeError(f"HTTP {e.code} {path}: {err_body}")
# List all parser paths
res = _post("/api/listFiles", {"pathPrefix": "/logParsers/"})
# Support multiple response shapes: {"paths": [...]} or {"files": [...]}
raw_paths = res.get("paths") or res.get("files") or []
# Each element may be a plain string or a dict with a "path"/"name" key
paths = []
for p in raw_paths:
if isinstance(p, dict):
p = p.get("path") or p.get("name") or ""
if isinstance(p, str) and p.startswith("/logParsers/"):
paths.append(p)
_os.makedirs(parsers_dir, exist_ok=True)
fetched, failed = 0, []
for p in paths:
name = p.rsplit("/", 1)[-1] or "_unnamed"
try:
r = _post("/api/getFile", {"path": p})
content = r.get("content")
if content is None:
failed.append({"path": p, "error": "no content", "raw": r})
continue
with open(_os.path.join(parsers_dir, name), "w", encoding="utf-8") as fh:
fh.write(content)
fetched += 1
except Exception as e:
failed.append({"path": p, "error": str(e)})
# Surface the raw API response so callers can see exactly what was returned.
# Truncate paths list so the response stays readable (first 200).
debug_info = {
"response_keys": list(res.keys()),
"paths_found": len(paths),
"paths_listed": paths[:200],
}
return {"fetched": fetched, "failed": failed, "skipped": None, "debug": debug_info}
@router.post("/load-parsers-from-sdl") @router.post("/load-parsers-from-sdl")
async def load_parsers_from_sdl(db: Session = Depends(get_db)): async def load_parsers_from_sdl(db: Session = Depends(get_db)):
""" """
Sync SDL parsers from the console (if SDL_CONFIG_READ_KEY is set) then index Load SDL parsers from the local /app/parsers directory (mounted from ./parsers/).
every file in the local /app/parsers directory into the DB. Files are placed there by the MCP-based loader or by manual copy.
Falls back to a clear error if the directory is empty.
""" """
import os import os
parsers_dir = "/app/parsers" parsers_dir = "/app/parsers"
# ── Step 1: fetch from console (best-effort) ────────────────────────────
fetch_result = _fetch_parsers_from_console(parsers_dir)
# ── Step 2: load whatever is on disk into the DB ─────────────────────────
try: try:
entries = [ entries = [
e for e in os.scandir(parsers_dir) e for e in os.scandir(parsers_dir)
@@ -400,19 +225,12 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
except FileNotFoundError: except FileNotFoundError:
raise HTTPException(503, "parsers/ directory not found — check Docker volume mount") raise HTTPException(503, "parsers/ directory not found — check Docker volume mount")
if not entries and fetch_result["skipped"]:
raise HTTPException(
422,
f"No parser files found in parsers/ directory and console sync was skipped "
f"({fetch_result['skipped']}). "
"Add SDL_CONFIG_READ_KEY in Settings (needs 'Manage config files' permission) "
"or upload a parser file manually."
)
if not entries: if not entries:
raise HTTPException( raise HTTPException(
422, 422,
"No parser files found in parsers/ directory after console sync. " "No parser files found in parsers/ directory. "
"Check SDL_CONFIG_READ_KEY permissions ('Manage config files' required)." "Use 'Load SDL Parsers via MCP' in Claude Code to populate it, "
"or upload a parser file manually."
) )
loaded = [] loaded = []
@@ -440,12 +258,7 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
errors.append({"parser": entry.name, "error": str(e)}) errors.append({"parser": entry.name, "error": str(e)})
db.commit() db.commit()
return { return {"loaded": len(loaded), "parsers": loaded, "errors": errors}
"loaded": len(loaded),
"parsers": loaded,
"errors": errors,
"console_fetch": fetch_result,
}
@router.post("/upload-parser") @router.post("/upload-parser")
@@ -516,9 +329,6 @@ _S1_NATIVE_SOURCES = {
"SentinelOne Ranger AD", "SentinelOne Ranger AD",
} }
# Cached count of events with no dataSource.name — updated on each sync
_unlabelled_event_count: int = -1 # -1 = not yet queried
@router.post("/sync-sources") @router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)): async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
@@ -568,55 +378,28 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
parser_detected=parsed_by_source.get(name, 0), parser_detected=parsed_by_source.get(name, 0),
)) ))
seen += 1 seen += 1
db.commit() db.commit()
synced_names = [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES] return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]}
# Auto-record a coverage snapshot after every live-sources sync
try:
h = _compute_health(db)
db.add(CoverageSnapshot(
health_score=h["health_score"],
parser_pct=h["parser_pct"],
mitre_pct=h["mitre_pct"],
firing_pct=h["firing_pct"] or 0.0,
active_sources=h["active_sources"],
covered_sources=h["covered_sources"],
rules_loaded=h["rules_loaded"],
tactics_covered=h["tactics_covered"],
techniques_covered=h["techniques_covered"],
rules_with_mitre=h["rules_with_mitre"],
rules_fired=h["rules_fired"],
))
db.commit()
except Exception:
pass # snapshot failure should never break sync
return {"synced": seen, "sources": synced_names}
def _build_parser_ds_index() -> tuple[dict[str, dict], list[dict]]: def _build_parser_ds_index() -> dict[str, dict]:
""" """
Read all parser files from /app/parsers/ and build: Read all parser files from /app/parsers/ and build an index:
- index: dataSource.name → {parser_name, format_type} (complete parsers) dataSource.name (exact, from parser attributes){parser_name, format_type}
- stubs: list of {parser_name} for files with no dataSource.name attribute
Format type is "grok", "dottedJson", or "custom". Format type is "grok", "dottedJson", or "custom".
Sources with grok/dottedJson parsers are flagged as needing a proper parser. Sources with grok/dottedJson parsers are flagged as needing a proper parser.
""" """
import os, re import os, re
parsers_dir = "/app/parsers" parsers_dir = "/app/parsers"
_DS_NAME_RE = re.compile(r'"?dataSource\.name"?\s*:\s*"([^"]+)"') _DS_NAME_RE = re.compile(r'"dataSource\.name"\s*:\s*"([^"]+)"')
_FORMAT_TYPE_RE = re.compile(r'"type"\s*:\s*"([^"]+)"') _FORMAT_TYPE_RE = re.compile(r'"type"\s*:\s*"([^"]+)"')
# Only treat a file as a parser if it has a formats section — rules out dashboards/saved-searches
_HAS_FORMATS_RE = re.compile(r'\bformats\s*:', re.IGNORECASE)
index: dict[str, dict] = {} index: dict[str, dict] = {}
stubs: list[dict] = []
try: try:
entries = [e for e in os.scandir(parsers_dir) if e.is_file() and not e.name.startswith(".")] entries = [e for e in os.scandir(parsers_dir) if e.is_file() and not e.name.startswith(".")]
except FileNotFoundError: except FileNotFoundError:
return index, stubs return index
for entry in entries: for entry in entries:
try: try:
@@ -625,15 +408,9 @@ def _build_parser_ds_index() -> tuple[dict[str, dict], list[dict]]:
except Exception: except Exception:
continue continue
# Skip files that have no formats section — they're dashboards/queries, not parsers
if not _HAS_FORMATS_RE.search(content):
continue
# Extract dataSource.name (may appear multiple times — take first) # Extract dataSource.name (may appear multiple times — take first)
ds_match = _DS_NAME_RE.search(content) ds_match = _DS_NAME_RE.search(content)
if not ds_match: if not ds_match:
# Has formats but no dataSource.name — genuine stub parser
stubs.append({"parser_name": entry.name})
continue continue
ds_name = ds_match.group(1).strip() ds_name = ds_match.group(1).strip()
@@ -648,7 +425,7 @@ def _build_parser_ds_index() -> tuple[dict[str, dict], list[dict]]:
index[ds_name] = {"parser_name": entry.name, "format_type": fmt} index[ds_name] = {"parser_name": entry.name, "format_type": fmt}
return index, stubs return index
@router.get("/map") @router.get("/map")
@@ -664,32 +441,17 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_fields_rows = db.query(ParserField).all() parser_fields_rows = db.query(ParserField).all()
rules = db.query(ParsedRule).all() rules = db.query(ParsedRule).all()
firing_cache: dict[str, int] = {
row.rule_name: row.alert_count
for row in db.query(RuleFiringCache).all()
}
firing_cache_populated = len(firing_cache) > 0
# parser_name → set of field names (for field count display) # parser_name → set of field names (for field count display)
parser_index: dict[str, set] = {} parser_index: dict[str, set] = {}
for pf in parser_fields_rows: for pf in parser_fields_rows:
parser_index.setdefault(pf.parser_name, set()).add(pf.field_name) parser_index.setdefault(pf.parser_name, set()).add(pf.field_name)
# Build dataSource.name → {parser_name, format_type} index from parser files # Build dataSource.name → {parser_name, format_type} index from parser files
ds_index, stub_parsers = _build_parser_ds_index() ds_index = _build_parser_ds_index()
def _normalize(s: str) -> str: def _normalize(s: str) -> str:
return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "") return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "")
def _find_stub_match(source_name: str) -> dict | None:
"""Return stub parser info if a stub filename fuzzy-matches this source name."""
sn = _normalize(source_name)
for stub in stub_parsers:
fn = _normalize(stub["parser_name"])
if fn in sn or sn in fn:
return stub
return None
def _find_parser_info(source_name: str) -> dict | None: def _find_parser_info(source_name: str) -> dict | None:
""" """
Match priority: Match priority:
@@ -752,8 +514,6 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_info = _find_parser_info(src.source_name) parser_info = _find_parser_info(src.source_name)
parser_in_data = (src.parser_detected or 0) > 0 parser_in_data = (src.parser_detected or 0) > 0
stub_info = _find_stub_match(src.source_name) if not parser_info else None
if parser_info and parser_info["format_type"] == "custom": if parser_info and parser_info["format_type"] == "custom":
status = "covered" status = "covered"
matched_parser = parser_info["parser_name"] matched_parser = parser_info["parser_name"]
@@ -768,12 +528,6 @@ def get_coverage_map(db: Session = Depends(get_db)):
status = "covered" status = "covered"
matched_parser = parser_info["parser_name"] if parser_info else "detected in data" matched_parser = parser_info["parser_name"] if parser_info else "detected in data"
format_type = parser_info["format_type"] if parser_info else "unknown" format_type = parser_info["format_type"] if parser_info else "unknown"
elif stub_info:
# A parser file exists but has no dataSource.name — it's a stub/incomplete
status = "stub_parser"
matched_parser = stub_info["parser_name"]
format_type = None
stub_info["suggested_ds_name"] = src.source_name
else: else:
status = "parser_needed" status = "parser_needed"
matched_parser = None matched_parser = None
@@ -782,13 +536,9 @@ def get_coverage_map(db: Session = Depends(get_db)):
if status == "covered": if status == "covered":
covered_count += 1 covered_count += 1
else: else:
needed_count += 1 # stub_parser and parser_needed both count as needing work needed_count += 1
rules_for_src: list = [ rules_for_src: list = [r for r in rule_by_source.get(src.source_name, []) if r["type"] == "library"]
{**r, "alert_count": firing_cache.get(r["rule"], 0)}
for r in rule_by_source.get(src.source_name, [])
if r["type"] == "library"
]
# Close-match suggestions — shown when there are no library rules for this source. # Close-match suggestions — shown when there are no library rules for this source.
close_matches: list = [] close_matches: list = []
@@ -864,8 +614,6 @@ def get_coverage_map(db: Session = Depends(get_db)):
"status": status, "status": status,
"parser": matched_parser, "parser": matched_parser,
"format_type": format_type, "format_type": format_type,
"unlabelled": bool(src.unlabelled),
"stub_suggested_ds_name": stub_info.get("suggested_ds_name") if stub_info and status == "stub_parser" else None,
"parser_fields": len(parser_provides), "parser_fields": len(parser_provides),
"parser_detected": src.parser_detected or 0, "parser_detected": src.parser_detected or 0,
"rules": rules_for_src, "rules": rules_for_src,
@@ -876,23 +624,15 @@ def get_coverage_map(db: Session = Depends(get_db)):
"synced_at": src.synced_at.isoformat() if src.synced_at else None, "synced_at": src.synced_at.isoformat() if src.synced_at else None,
}) })
# Only surface stub parsers that matched an active source with real events —
# unmatched stubs with zero events are noise and are suppressed.
synced_at = active_sources[0].synced_at.isoformat() if active_sources else None synced_at = active_sources[0].synced_at.isoformat() if active_sources else None
stub_count = sum(1 for s in sources_out if s["status"] == "stub_parser")
return { return {
"summary": { "summary": {
"active_sources": len(active_sources), "active_sources": len(active_sources),
"covered": covered_count, "covered": covered_count,
"parser_needed": needed_count, "parser_needed": needed_count,
"stub_parsers": stub_count,
"unlabelled_events": _unlabelled_event_count,
"parsers_loaded": len(parser_index), "parsers_loaded": len(parser_index),
"rules_loaded": len(rules), "rules_loaded": len(rules),
"firing_cache_populated": firing_cache_populated,
}, },
"sources": sources_out, "sources": sources_out,
"synced_at": synced_at, "synced_at": synced_at,
@@ -900,516 +640,9 @@ def get_coverage_map(db: Session = Depends(get_db)):
} }
@router.get("/stub-parsers")
def get_stub_parsers():
"""Return all parser files that have a formats: section but no dataSource.name attribute.
Used by Parser Quality — Attributes Missing section. Independent of active sources."""
_, stubs = _build_parser_ds_index()
return {"stubs": stubs, "count": len(stubs)}
@router.get("/mitre")
def get_mitre_coverage(db: Session = Depends(get_db)):
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
TACTIC_ORDER = [
"Reconnaissance", "Resource Development", "Initial Access", "Execution",
"Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access",
"Discovery", "Lateral Movement", "Collection", "Command and Control",
"Exfiltration", "Impact", "Uncategorized",
]
tactic_map: dict[str, dict] = {}
no_mitre_count = 0
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
tactics = raw_data.get("tactics", [])
techniques = raw_data.get("techniques", [])
if not tactics and not techniques:
no_mitre_count += 1
continue
if not tactics:
tactics = ["Uncategorized"]
for tactic in tactics:
if tactic not in tactic_map:
tactic_map[tactic] = {"techniques": {}, "rule_count": 0}
tactic_map[tactic]["rule_count"] += 1
for tech in techniques:
key_t = tech["id"] or tech["name"]
if key_t:
tactic_map[tactic]["techniques"][key_t] = tech["name"] or key_t
def _tactic_sort_key(name: str) -> int:
try:
return TACTIC_ORDER.index(name)
except ValueError:
return len(TACTIC_ORDER)
tactics_out = []
total_techniques = 0
for tactic_name in sorted(tactic_map.keys(), key=_tactic_sort_key):
tech_dict = tactic_map[tactic_name]["techniques"]
techniques_list = [
{"id": k if (k.startswith("T") and len(k) >= 4) else "", "name": v}
for k, v in sorted(tech_dict.items())
]
total_techniques += len(techniques_list)
tactics_out.append({
"tactic": tactic_name,
"rule_count": tactic_map[tactic_name]["rule_count"],
"technique_count": len(techniques_list),
"techniques": techniques_list,
})
return {
"tactics": tactics_out,
"total_rules": len(rules),
"rules_with_mitre": len(rules) - no_mitre_count,
"rules_without_mitre": no_mitre_count,
"total_techniques": total_techniques,
"tactic_count": len(tactics_out),
}
@router.post("/sync-rule-firing")
async def sync_rule_firing(period_days: int = 30, db: Session = Depends(get_db)):
"""Populate rule firing cache from the generatedAlerts field stored during
the last Detection Library sync (platform-rules API). This is instant and
requires no SDL PowerQuery. Falls back to SDL PowerQuery if the stored data
is missing (e.g. rules were imported from the detections.json file fallback)."""
from datetime import datetime
checked_at = datetime.utcnow()
result_rows = []
source = "api"
# ── Fast path: use generatedAlerts stored in ParsedRule.raw ───────────────
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
ga = raw_data.get("generated_alerts")
if ga is not None: # present means rule was imported from the live API
result_rows.append({"rule_name": rule.name, "alerts": int(ga)})
# ── Fallback: SDL PowerQuery (rules imported from detections.json) ─────────
if not result_rows:
source = "powerquery"
from datetime import timedelta
now = datetime.utcnow()
from_dt = (now - timedelta(days=period_days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_dt = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
FIRING_QUERIES = [
("| filter ruleName != '' | group alerts=count() by ruleName | sort -alerts | limit 2000", "ruleName"),
("| filter threatInfo.detectionEngineRule.name != '' | group alerts=count() by threatInfo.detectionEngineRule.name | sort -alerts | limit 2000", "threatInfo.detectionEngineRule.name"),
]
for query, name_field in FIRING_QUERIES:
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt, max_count=10_000_000)
rows = result.get("events", []) if isinstance(result, dict) else []
if rows:
result_rows = [
{"rule_name": r.get(name_field, ""), "alerts": r.get("alerts", 0)}
for r in rows if r.get(name_field)
]
if result_rows:
break
except Exception:
continue
if not result_rows:
return {
"synced": 0,
"rules_with_alerts": 0,
"source": source,
"message": "No alert data found. Run Sync Detection Library first to import generatedAlerts from the S1 API.",
}
# Upsert into cache
db.query(RuleFiringCache).delete()
for row in result_rows:
db.add(RuleFiringCache(
rule_name=row["rule_name"],
alert_count=row["alerts"],
period_days=period_days,
checked_at=checked_at,
))
db.commit()
fired = sum(1 for r in result_rows if r["alerts"] > 0)
return {
"synced": len(result_rows),
"rules_with_alerts": fired,
"rules_never_fired": len(result_rows) - fired,
"source": source,
"period_days": period_days,
}
@router.get("/rule-firing-cache")
def get_rule_firing_cache(db: Session = Depends(get_db)):
"""Return all cached rule firing data sorted by alert count descending."""
rows = db.query(RuleFiringCache).order_by(RuleFiringCache.alert_count.desc()).all()
total_rules = db.query(ParsedRule).filter_by(rule_type="library").count()
fired = [r for r in rows if r.alert_count > 0]
never_fired_count = total_rules - len(fired)
period_days = rows[0].period_days if rows else 30
checked_at = rows[0].checked_at.isoformat() if rows and rows[0].checked_at else None
return {
"rules": [
{
"rule_name": r.rule_name,
"alert_count": r.alert_count,
"period_days": r.period_days,
"checked_at": r.checked_at.isoformat() if r.checked_at else None,
}
for r in rows
],
"summary": {
"rules_monitored": len(rows),
"fired_in_period": len(fired),
"never_fired": never_fired_count,
"period_days": period_days,
"checked_at": checked_at,
},
}
def _compute_health(db) -> dict:
"""Compute current health score from DB state.
Weights:
40% parser coverage — what % of active sources have a working parser
35% MITRE coverage — what % of the 14 standard ATT&CK tactics are covered
25% rule firing — what % of library rules have fired (0 if cache empty)
"""
# --- Parser coverage ---
all_sources = db.query(ActiveSource).all()
total_sources = len(all_sources)
# "covered" = parser_detected > 0 (parser running in data lake)
covered_sources = sum(1 for s in all_sources if (s.parser_detected or 0) > 0)
parser_pct = round((covered_sources / total_sources * 100) if total_sources else 0.0, 1)
# --- MITRE coverage ---
# Standard ATT&CK Enterprise tactics (14).
CANONICAL_TACTICS = frozenset({
"Reconnaissance", "Resource Development", "Initial Access", "Execution",
"Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access",
"Discovery", "Lateral Movement", "Collection", "Command and Control",
"Exfiltration", "Impact",
})
# SentinelOne STAR rules sometimes label tactics with non-canonical names.
# Map them to canonical ATT&CK so we don't over-count and exceed 100%.
TACTIC_ALIASES = {
"Stealth": "Defense Evasion",
"Defense Impairment": "Defense Evasion",
}
TOTAL_TACTICS = len(CANONICAL_TACTICS)
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
total_rules = len(rules)
covered_tactics: set = set()
covered_techniques: set = set()
rules_with_mitre = 0
for rule in rules:
try:
raw = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw = {}
tactics = raw.get("tactics", [])
techniques = raw.get("techniques", [])
if tactics or techniques:
rules_with_mitre += 1
for t in tactics:
if not t or t == "Uncategorized":
continue
t = TACTIC_ALIASES.get(t, t)
if t in CANONICAL_TACTICS:
covered_tactics.add(t)
for tech in techniques:
k = tech.get("id") or tech.get("name")
if k:
covered_techniques.add(k)
tactics_covered = len(covered_tactics)
techniques_covered = len(covered_techniques)
mitre_pct = round((tactics_covered / TOTAL_TACTICS * 100), 1)
# --- Rule firing ---
firing_rows = db.query(RuleFiringCache).all()
cache_populated = len(firing_rows) > 0
rules_fired = sum(1 for r in firing_rows if r.alert_count > 0)
if cache_populated and total_rules > 0:
firing_pct = round(rules_fired / total_rules * 100, 1)
else:
firing_pct = 0.0
# --- Weighted health score ---
if cache_populated:
score = round(0.40 * parser_pct + 0.35 * mitre_pct + 0.25 * firing_pct, 1)
else:
# Without firing data, reweight between parser + MITRE
score = round(0.55 * parser_pct + 0.45 * mitre_pct, 1)
return {
"health_score": score,
"parser_pct": parser_pct,
"mitre_pct": mitre_pct,
"firing_pct": firing_pct if cache_populated else None,
"active_sources": total_sources,
"covered_sources": covered_sources,
"rules_loaded": total_rules,
"tactics_covered": tactics_covered,
"techniques_covered": techniques_covered,
"rules_with_mitre": rules_with_mitre,
"rules_fired": rules_fired,
"firing_cache_populated": cache_populated,
"components": {
"parser_coverage": {"value": parser_pct, "weight": 0.40 if cache_populated else 0.55, "label": "Parser Coverage"},
"mitre_coverage": {"value": mitre_pct, "weight": 0.35 if cache_populated else 0.45, "label": "MITRE Coverage"},
"rule_firing": {"value": firing_pct if cache_populated else None, "weight": 0.25 if cache_populated else 0.0, "label": "Rule Firing Rate"},
}
}
@router.get("/health")
def get_health_score(db: Session = Depends(get_db)):
"""Return the current tenant health score and component breakdown."""
h = _compute_health(db)
# Most recent snapshot for trend comparison
prev = db.query(CoverageSnapshot).order_by(CoverageSnapshot.recorded_at.desc()).offset(1).first()
delta = None
if prev:
delta = round(h["health_score"] - prev.health_score, 1)
h["delta_from_previous"] = delta
return h
@router.post("/snapshot")
def record_snapshot(db: Session = Depends(get_db)):
"""Record a coverage snapshot. Called automatically at end of sync-sources."""
h = _compute_health(db)
snap = CoverageSnapshot(
health_score=h["health_score"],
parser_pct=h["parser_pct"],
mitre_pct=h["mitre_pct"],
firing_pct=h["firing_pct"] or 0.0,
active_sources=h["active_sources"],
covered_sources=h["covered_sources"],
rules_loaded=h["rules_loaded"],
tactics_covered=h["tactics_covered"],
techniques_covered=h["techniques_covered"],
rules_with_mitre=h["rules_with_mitre"],
rules_fired=h["rules_fired"],
)
db.add(snap)
db.commit()
return {"recorded": True, "health_score": h["health_score"]}
@router.get("/snapshots")
def get_snapshots(limit: int = 30, db: Session = Depends(get_db)):
"""Return the last N daily snapshots for sparkline charts."""
rows = (
db.query(CoverageSnapshot)
.order_by(CoverageSnapshot.recorded_at.desc())
.limit(limit)
.all()
)
return {
"snapshots": [
{
"date": r.recorded_at.strftime("%Y-%m-%d"),
"health_score": r.health_score,
"parser_pct": r.parser_pct,
"mitre_pct": r.mitre_pct,
"firing_pct": r.firing_pct,
"active_sources": r.active_sources,
"covered_sources": r.covered_sources,
}
for r in reversed(rows) # chronological order
]
}
@router.get("/dependency-map")
def get_dependency_map(db: Session = Depends(get_db)):
"""
Flip of the coverage map: for each detection library rule, show which
data sources it requires. Flags rules as 'at_risk' if any required
source has no parser or has zero recent events.
"""
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
active_sources = {s.source_name: s for s in db.query(ActiveSource).all()}
ds_index, _ = _build_parser_ds_index()
# Build set of source names that are "healthy" (have events + parser)
healthy_sources: set = set()
for name, src in active_sources.items():
has_parser = name in ds_index or (src.parser_detected or 0) > 0
if has_parser and (src.event_count or 0) > 0:
healthy_sources.add(name)
out = []
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
data_sources = raw_data.get("data_sources", [])
tactics = raw_data.get("tactics", [])
techniques = raw_data.get("techniques", [])
generated_alerts = raw_data.get("generated_alerts")
source_statuses = []
at_risk = False
for ds in data_sources:
src = active_sources.get(ds)
if src is None:
status = "inactive"
at_risk = True
elif ds not in healthy_sources:
status = "no_parser"
at_risk = True
else:
status = "healthy"
source_statuses.append({"source": ds, "status": status})
# Rules with no source requirements are not "at risk" (platform-wide rules)
if not data_sources:
at_risk = False
out.append({
"rule": rule.name,
"rule_id": rule.rule_id,
"sources": source_statuses,
"source_count": len(data_sources),
"tactics": tactics,
"techniques": [t.get("id", "") for t in techniques if t.get("id")],
"generated_alerts": generated_alerts,
"at_risk": at_risk,
"no_sources": len(data_sources) == 0,
})
# Sort: at-risk first, then by source count desc, then alphabetical
out.sort(key=lambda r: (not r["at_risk"], -r["source_count"], r["rule"]))
at_risk_count = sum(1 for r in out if r["at_risk"])
healthy_count = sum(1 for r in out if not r["at_risk"] and not r["no_sources"])
return {
"rules": out,
"total": len(out),
"at_risk": at_risk_count,
"healthy": healthy_count,
"no_source_requirements": sum(1 for r in out if r["no_sources"]),
}
@router.get("/onboarding-status")
def get_onboarding_status(db: Session = Depends(get_db)):
"""
Pipeline status for each active source across 6 lifecycle stages.
Returns per-source progress for the onboarding tracker view.
"""
import re as _re
active_sources = db.query(ActiveSource).order_by(ActiveSource.event_count.desc()).all()
ds_index, stub_parsers = _build_parser_ds_index()
stub_names = {s["parser_name"] for s in stub_parsers}
firing_cache = {r.rule_name: r.alert_count for r in db.query(RuleFiringCache).all()}
# rule_by_source: source_name → list of rule names
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
rule_by_source: dict = {}
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
for ds in raw_data.get("data_sources", []):
rule_by_source.setdefault(ds, []).append(rule.name)
def _normalize(s):
return _re.sub(r"[^a-z0-9]", "", s.lower())
def _find_parser(source_name):
if source_name in ds_index:
return ds_index[source_name]
sn = _normalize(source_name)
for ds_name, info in ds_index.items():
if _normalize(ds_name) in sn or sn in _normalize(ds_name):
return info
return None
out = []
for src in active_sources:
parser_info = _find_parser(src.source_name)
parser_active = (src.parser_detected or 0) > 0
has_ds_name = parser_info is not None and parser_info.get("parser_name") not in stub_names
rules_for_src = rule_by_source.get(src.source_name, [])
rules_firing = any(firing_cache.get(r, 0) > 0 for r in rules_for_src)
has_detection_rules = len(rules_for_src) > 0
# Core stages (apply to every source)
core_stages = [
{"stage": "Data Received", "done": (src.event_count or 0) > 0},
{"stage": "Parser File Exists", "done": parser_info is not None},
{"stage": "Parser Active", "done": parser_active},
{"stage": "Source Labeled", "done": has_ds_name and parser_active},
]
# Detection stages (only meaningful when detection rules exist)
detection_stages = [
{"stage": "Detection Rules", "done": has_detection_rules, "na": False},
{"stage": "Rules Firing", "done": rules_firing, "na": False},
]
if has_detection_rules:
stages = core_stages + detection_stages
total = 6
else:
# Mark detection stages as N/A — don't count against progress
stages = core_stages + [
{"stage": "Detection Rules", "done": False, "na": True},
{"stage": "Rules Firing", "done": False, "na": True},
]
total = 4 # progress calculated over core stages only
completed = sum(1 for s in stages if s.get("done") and not s.get("na"))
out.append({
"source": src.source_name,
"event_count": src.event_count,
"stages": stages,
"completed": completed,
"total": total,
"has_detection_rules": has_detection_rules,
"pct": round(completed / total * 100) if total else 0,
})
# Sort: incomplete first, then by event volume
out.sort(key=lambda x: (x["completed"] == x["total"], -x["event_count"]))
return {
"sources": out,
"fully_onboarded": sum(1 for s in out if s["completed"] == s["total"]),
"in_progress": sum(1 for s in out if 0 < s["completed"] < s["total"]),
"not_started": sum(1 for s in out if s["completed"] == 0),
}
@router.delete("/reset") @router.delete("/reset")
def reset_data(db: Session = Depends(get_db)): def reset_data(db: Session = Depends(get_db)):
db.query(ParsedRule).delete() db.query(ParsedRule).delete()
db.query(ParserField).delete() db.query(ParserField).delete()
db.query(ActiveSource).delete()
db.commit() db.commit()
global _unlabelled_event_count
_unlabelled_event_count = -1
return {"cleared": True} return {"cleared": True}
+18 -69
View File
@@ -2,15 +2,9 @@ from datetime import datetime, timedelta
from fastapi import APIRouter, Query, HTTPException from fastapi import APIRouter, Query, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from services import s1_client from services import s1_client
from services.async_cache import async_ttl_cache, cache_stats, cache_clear
router = APIRouter() router = APIRouter()
# Dashboard endpoints can be expensive on busy tenants. Cache results in-process
# for a short TTL so reloads and parallel widgets are instant. Pass ?nocache=1
# to bypass for a forced refresh.
_DASHBOARD_TTL_SECONDS = 300
def _date_range(days: int) -> tuple[str, str]: def _date_range(days: int) -> tuple[str, str]:
now = datetime.utcnow() now = datetime.utcnow()
@@ -28,65 +22,41 @@ def _date_range_hours(hours: int) -> tuple[str, str]:
) )
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _top_sources_cached(hours: int) -> dict:
"""Cache key: hours only. days is normalised to hours upstream."""
from_dt, to_dt = _date_range_hours(hours)
query = "| group events=count() by dataSource.name | sort -events | limit 25"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/top-sources") @router.get("/top-sources")
async def get_top_sources( async def get_top_sources(
days: int = Query(None, ge=1, le=90), days: int = Query(None, ge=1, le=90),
hours: int = Query(None, ge=1, le=720), hours: int = Query(None, ge=1, le=24),
nocache: bool = Query(False, description="Bypass dashboard cache"),
): ):
"""Top log sources by event count. """Top log sources by event count over the given period."""
if hours is not None:
Note: SDL returns 'internal Scalyr error' when this query uses day-scale from_dt, to_dt = _date_range_hours(hours)
timestamps on busy tenants, but the same window expressed in hours runs
fine. We normalise days -> hours internally for stability.
"""
if hours is None and days is None:
days = 7
if hours is None:
hours = days * 24
period_label = f"{days}d"
else:
period_label = f"{hours}h" period_label = f"{hours}h"
else:
from_dt, to_dt = _date_range(days or 7)
period_label = f"{days or 7}d"
query = "| group events=count() by dataSource.name | sort -events | limit 25"
try: try:
cached = await _top_sources_cached(hours, nocache=nocache) result = await s1_client.run_powerquery(query, from_dt, to_dt)
except Exception as e: except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}") raise HTTPException(502, f"PowerQuery error: {e}")
return {"period": period_label, "data": cached["data"]} return {"period": period_label, "data": result.get("events", [])}
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _by_event_type_cached(days: int) -> dict:
# Same days->hours normalisation as top-sources for tenant stability.
from_dt, to_dt = _date_range_hours(days * 24)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/by-event-type") @router.get("/by-event-type")
async def get_by_event_type( async def get_by_event_type(days: int = Query(7, ge=1, le=90)):
days: int = Query(7, ge=1, le=90),
nocache: bool = Query(False),
):
"""Event counts grouped by source and event type.""" """Event counts grouped by source and event type."""
from_dt, to_dt = _date_range(days)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
try: try:
cached = await _by_event_type_cached(days, nocache=nocache) result = await s1_client.run_powerquery(query, from_dt, to_dt)
except Exception as e: except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}") raise HTTPException(502, f"PowerQuery error: {e}")
return {"period_days": days, "data": cached["data"]} return {"period_days": days, "data": result.get("events", [])}
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS) @router.get("/daily-volume")
async def _daily_volume_cached(days: int) -> list: async def get_daily_volume(days: int = Query(5, ge=1, le=7)):
"""Total event count per day — queries run in parallel."""
import asyncio import asyncio
now = datetime.utcnow() now = datetime.utcnow()
@@ -108,27 +78,6 @@ async def _daily_volume_cached(days: int) -> list:
return list(reversed(results)) return list(reversed(results))
@router.get("/daily-volume")
async def get_daily_volume(
days: int = Query(5, ge=1, le=7),
nocache: bool = Query(False),
):
"""Total event count per day — queries run in parallel."""
return await _daily_volume_cached(days, nocache=nocache)
@router.get("/cache-stats")
def ingest_cache_stats():
"""Inspect dashboard cache (entry count + TTL remaining per key)."""
return cache_stats()
@router.delete("/cache")
def ingest_cache_clear():
"""Forcefully wipe the dashboard cache (next call refetches from SDL)."""
return {"cleared": cache_clear()}
class FilterRule(BaseModel): class FilterRule(BaseModel):
source: str = "" source: str = ""
event_type: str = "" event_type: str = ""
+5 -79
View File
@@ -11,52 +11,16 @@ router = APIRouter()
PARSERS_DIR = "/app/parsers" PARSERS_DIR = "/app/parsers"
# Files in /app/parsers/ are also used to hold non-parser SDL artefacts
# (UEBA analytics tables, saved searches, dashboard configs) that the SDL
# config-files API returns from the same directory. Detect real parsers by
# looking for parser-config keywords in the file header.
_PARSER_MARKER_RE = re.compile(
r"^\s*(attributes|patterns|formats|patternRefs|rewrites|parser)\s*[:=]",
re.MULTILINE,
)
# Names known to be non-parser SDL configs even if the marker check is fooled.
_PARSER_NAME_DENYLIST = re.compile(
r"^(ueba[_\-]|searches$|alerts$|.*_baselines?_|.*_features?_|.*_scores?_|"
r"bsi[_\-]|.*-overview$|.*[_\-]membership$|.*[_\-]risk$|.*[_\-]smoke[_\-]test$|"
r".*[_\-]test[_\-](default|merge|replace|same))",
re.IGNORECASE,
)
def _looks_like_parser(path: str, name: str) -> bool:
"""Return True if a file under /app/parsers/ is actually a parser config."""
if _PARSER_NAME_DENYLIST.match(name):
return False
try:
with open(path, "r", encoding="utf-8", errors="replace") as fh:
head = fh.read(4096)
except OSError:
return False
return bool(_PARSER_MARKER_RE.search(head))
@router.get("/parsers") @router.get("/parsers")
def list_parser_files(): def list_parser_files():
"""List parser filenames under /app/parsers/ for the Test Runner. """List parser filenames available under /app/parsers/ for the Test Runner."""
Excludes non-parser SDL artefacts (UEBA tables, searches, dashboards).
"""
try: try:
candidates = [ names = sorted(
e for e in os.scandir(PARSERS_DIR) e.name for e in os.scandir(PARSERS_DIR)
if e.is_file() and not e.name.startswith(".") if e.is_file() and not e.name.startswith(".")
] )
except FileNotFoundError: except FileNotFoundError:
return {"parsers": [], "count": 0} names = []
names = sorted(
e.name for e in candidates
if _looks_like_parser(e.path, e.name)
)
return {"parsers": names, "count": len(names)} return {"parsers": names, "count": len(names)}
@@ -330,44 +294,6 @@ def _to_py_backref(s: str) -> str:
# Endpoints # Endpoints
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@router.post("/sample-unlabelled")
async def sample_unlabelled(req: SampleEventsRequest):
"""Return a sample of events that have no dataSource.name — these need parsers.
Also runs a count query so the caller can update the banner with the real total.
"""
import asyncio
from routers import coverage as _coverage
filter_expr = "!(dataSource.name = *) !(source = 'scalyr')"
from_dt, to_dt = _date_range_hours(req.hours)
sample_result, count_result = await asyncio.gather(
s1_client.run_powerquery(f"{filter_expr} | limit {req.limit}", from_dt, to_dt),
s1_client.run_powerquery(f"{filter_expr} | group events=count()", from_dt, to_dt, max_count=50_000_000),
)
rows = sample_result if isinstance(sample_result, list) else (sample_result.get("rows") or sample_result.get("events") or [])
events = [_flatten_event(row) for row in rows]
non_empty_keys: set = set()
for ev in events:
for k, v in ev.items():
if v is not None and v != "" and v != "null":
non_empty_keys.add(k)
events = [{k: v for k, v in ev.items() if k in non_empty_keys} for ev in events]
count_rows = count_result.get("events", []) if isinstance(count_result, dict) else []
total = count_rows[0].get("events", 0) if count_rows else 0
_coverage._unlabelled_event_count = total
return {
"events": events,
"count": len(events),
"total": total,
"hours": req.hours,
"columns_seen": sorted(non_empty_keys),
}
@router.post("/sample-events") @router.post("/sample-events")
async def sample_events(req: SampleEventsRequest): async def sample_events(req: SampleEventsRequest):
"""Return a sample of raw events from a given data source.""" """Return a sample of raw events from a given data source."""
-73
View File
@@ -1,73 +0,0 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from datetime import datetime, timedelta
from services import s1_client
router = APIRouter()
def _date_range(hours: int | None = None, days: int | None = None) -> tuple[str, str]:
now = datetime.utcnow()
if hours:
delta = timedelta(hours=hours)
else:
delta = timedelta(days=days or 1)
return (
(now - delta).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
now.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
)
PRESET_QUERIES = [
{"label": "Top sources by volume", "query": "| group events=count() by dataSource.name | sort -events | limit 25"},
{"label": "Unlabelled events", "query": "!(dataSource.name = *) !(source = 'scalyr') | group events=count() by source | sort -events | limit 25"},
{"label": "Events by type", "query": "| group events=count() by dataSource.name, event.type | sort -events | limit 50"},
{"label": "Failed logins", "query": "| filter event.type = 'Logon' | filter event.outcome = 'FAILED' | group count() by user.name, src.ip | sort -count() | limit 25"},
{"label": "Process executions", "query": "| filter event.type = 'Process Creation' | group count() by src.process.name | sort -count() | limit 25"},
{"label": "Network connections by dest", "query": "| filter event.type = 'IP Connect' | group count() by dst.ip | sort -count() | limit 25"},
{"label": "Rules firing (30d)", "query": "| filter ruleName != '' | group alerts=count() by ruleName | sort -alerts | limit 50"},
]
class QueryRequest(BaseModel):
query: str
hours: int | None = None
days: int | None = None
max_count: int = 1000
@router.get("/presets")
def get_presets():
return {"presets": PRESET_QUERIES}
@router.post("/run")
async def run_query(req: QueryRequest):
"""Run a PowerQuery against the Singularity Data Lake."""
if not req.query.strip():
raise HTTPException(400, "Query cannot be empty")
if req.max_count > 10_000:
req.max_count = 10_000
from_dt, to_dt = _date_range(hours=req.hours, days=req.days)
try:
result = await s1_client.run_powerquery(req.query, from_dt, to_dt, max_count=req.max_count)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
err = result.get("error") if isinstance(result, dict) else None
if err:
raise HTTPException(502, f"PowerQuery error: {err}")
events = result.get("events", [])
columns = sorted({k for row in events for k in row.keys()}) if events else []
return {
"rows": len(events),
"columns": columns,
"events": events,
"from": from_dt,
"to": to_dt,
"query": req.query,
}
-1
View File
@@ -14,7 +14,6 @@ FIELDS = [
{"key": "S1_API_TOKEN", "label": "Console API Token", "secret": True, "placeholder": "eyJ..."}, {"key": "S1_API_TOKEN", "label": "Console API Token", "secret": True, "placeholder": "eyJ..."},
{"key": "SDL_XDR_URL", "label": "SDL XDR URL", "secret": False, "placeholder": "https://xdr.us1.sentinelone.net"}, {"key": "SDL_XDR_URL", "label": "SDL XDR URL", "secret": False, "placeholder": "https://xdr.us1.sentinelone.net"},
{"key": "SDL_LOG_READ_KEY", "label": "SDL Log Read Key", "secret": True, "placeholder": "1DnK0Y4e..."}, {"key": "SDL_LOG_READ_KEY", "label": "SDL Log Read Key", "secret": True, "placeholder": "1DnK0Y4e..."},
{"key": "SDL_CONFIG_READ_KEY", "label": "SDL Config Read Key", "secret": True, "placeholder": "Needs 'Manage config files' permission"},
{"key": "ANTHROPIC_API_KEY", "label": "Anthropic API Key", "secret": True, "placeholder": "sk-ant-..."}, {"key": "ANTHROPIC_API_KEY", "label": "Anthropic API Key", "secret": True, "placeholder": "sk-ant-..."},
] ]
-84
View File
@@ -1,84 +0,0 @@
"""Tiny in-process async TTL cache for backend endpoints.
Two-fold benefit:
* Identical concurrent calls share one upstream PowerQuery (single-flight).
* Repeat reads within TTL return instantly (no SDL round-trip).
Designed for read-only dashboard endpoints. Keep it stdlib-only so it adds
no dependency. Caches live until the process restarts.
Usage:
@async_ttl_cache(ttl_seconds=300)
async def get_top_sources(...): ...
"""
from __future__ import annotations
import asyncio
import functools
import time
from typing import Any, Awaitable, Callable, Tuple
# Maps cache-key -> (expires_at, value)
_STORE: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], Tuple[float, Any]] = {}
# Maps cache-key -> asyncio.Lock for single-flight
_LOCKS: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], asyncio.Lock] = {}
def _make_key(name: str, args: tuple, kwargs: dict) -> Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]]:
# Skip the special "nocache" kwarg so it doesn't fragment the cache.
kw = tuple(sorted((k, v) for k, v in kwargs.items() if k != "nocache"))
return (name, args, kw)
def async_ttl_cache(ttl_seconds: int = 300) -> Callable:
"""Decorator: cache an async function's result for ttl_seconds.
The wrapped function may accept an optional `nocache=True` kwarg to
bypass + refresh the cache for that call.
"""
def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
nocache = bool(kwargs.pop("nocache", False))
key = _make_key(fn.__qualname__, args, kwargs)
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
lock = _LOCKS.setdefault(key, asyncio.Lock())
async with lock:
# Re-check after acquiring lock — another caller may have populated.
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
value = await fn(*args, **kwargs)
_STORE[key] = (time.monotonic() + ttl_seconds, value)
return value
return wrapper
return decorator
def cache_stats() -> dict:
"""Debug helper: return current cache entries (no values)."""
now = time.monotonic()
return {
"entries": len(_STORE),
"live": [
{"key": str(k), "ttl_remaining_s": round(v[0] - now, 1)}
for k, v in _STORE.items()
if v[0] > now
],
}
def cache_clear() -> int:
"""Wipe the cache; returns the number of entries removed."""
n = len(_STORE)
_STORE.clear()
return n
-97
View File
@@ -1,97 +0,0 @@
"""Background pre-warmer for the Ingest Dashboard cache.
Opt-in via env: INGEST_PREWARM=1
Tunable via env: INGEST_PREWARM_INTERVAL_SECONDS (default 240, just under TTL)
INGEST_PREWARM_HOURS (default "1,24,168")
INGEST_PREWARM_DAYS (default "7")
INGEST_PREWARM_DAILY_VOLUME_DAYS (default "5")
The pre-warmer re-runs the heavy Ingest Dashboard queries every ~4 min so the
in-process TTL cache is always populated. First user hit then returns from
cache (sub-millisecond) instead of waiting 30-60s for SDL.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
# Use the uvicorn logger so messages show up in `docker logs` alongside requests.
log = logging.getLogger("uvicorn.error")
_PREFIX = "prewarmer:"
def _flag_enabled() -> bool:
return os.environ.get("INGEST_PREWARM", "").lower() in ("1", "true", "yes", "on")
def _int_list(env: str, default: str) -> list[int]:
raw = os.environ.get(env, default)
out = []
for tok in raw.split(","):
tok = tok.strip()
if tok and tok.isdigit():
out.append(int(tok))
return out
async def _warm_once() -> dict:
"""Run all configured warm-up queries once. Returns timing summary."""
# Local import to avoid circular dependency with FastAPI router module.
from routers.ingest import (
_top_sources_cached,
_by_event_type_cached,
_daily_volume_cached,
)
hours_list = _int_list("INGEST_PREWARM_HOURS", "1,24,168")
days_list = _int_list("INGEST_PREWARM_DAYS", "7")
dv_days = _int_list("INGEST_PREWARM_DAILY_VOLUME_DAYS", "5") or [5]
tasks: list[tuple[str, asyncio.Task]] = []
for h in hours_list:
tasks.append((f"top-sources hours={h}",
asyncio.create_task(_top_sources_cached(h, nocache=True))))
for d in days_list:
tasks.append((f"by-event-type days={d}",
asyncio.create_task(_by_event_type_cached(d, nocache=True))))
for d in dv_days:
tasks.append((f"daily-volume days={d}",
asyncio.create_task(_daily_volume_cached(d, nocache=True))))
summary: dict[str, str] = {}
for label, task in tasks:
t0 = time.monotonic()
try:
await task
summary[label] = f"OK in {time.monotonic() - t0:.1f}s"
except Exception as e:
summary[label] = f"ERR ({e.__class__.__name__}: {str(e)[:120]})"
return summary
async def _loop():
interval = int(os.environ.get("INGEST_PREWARM_INTERVAL_SECONDS", "240"))
log.info("%s starting (interval=%ds)", _PREFIX, interval)
# Tiny initial delay so we don't compete with startup work.
await asyncio.sleep(5)
while True:
try:
summary = await _warm_once()
for label, status in summary.items():
log.info("%s %s -> %s", _PREFIX, label, status)
except asyncio.CancelledError:
raise
except Exception as e:
log.warning("%s cycle failed: %s", _PREFIX, e)
await asyncio.sleep(interval)
def start_if_enabled() -> asyncio.Task | None:
"""Spawn the pre-warm background task if INGEST_PREWARM is enabled.
Returns the task handle, or None if disabled."""
if not _flag_enabled():
log.info("%s disabled (set INGEST_PREWARM=1 to enable)", _PREFIX)
return None
log.info("%s scheduling background task", _PREFIX)
return asyncio.create_task(_loop(), name="ingest-prewarmer")
+6 -28
View File
@@ -6,12 +6,6 @@ from datetime import datetime, timezone
BASE_URL = os.environ.get("S1_BASE_URL", "https://demo.sentinelone.net").rstrip("/") BASE_URL = os.environ.get("S1_BASE_URL", "https://demo.sentinelone.net").rstrip("/")
TOKEN = os.environ.get("S1_API_TOKEN", "") TOKEN = os.environ.get("S1_API_TOKEN", "")
# Configurable PowerQuery timeout — SDL queries on large tenants can exceed 2 min.
# Set SDL_PQ_TIMEOUT in .env (seconds). Default: 600.
SDL_PQ_TIMEOUT = int(os.environ.get("SDL_PQ_TIMEOUT", "600"))
# How many times to retry on ReadTimeout before giving up. Default: 1 (one retry).
SDL_PQ_TIMEOUT_RETRIES = int(os.environ.get("SDL_PQ_TIMEOUT_RETRIES", "1"))
# Scalyr/XDR PowerQuery credentials — from SDL_XDR_URL + SDL_LOG_READ_KEY # Scalyr/XDR PowerQuery credentials — from SDL_XDR_URL + SDL_LOG_READ_KEY
# in the SentinelOne console: Settings → Integrations → Data Lake API Keys # in the SentinelOne console: Settings → Integrations → Data Lake API Keys
SDL_XDR_URL = os.environ.get("SDL_XDR_URL", "https://xdr.us1.sentinelone.net").rstrip("/") SDL_XDR_URL = os.environ.get("SDL_XDR_URL", "https://xdr.us1.sentinelone.net").rstrip("/")
@@ -103,7 +97,7 @@ async def get_library_rules(page_size: int = 100) -> list:
return results return results
async def run_powerquery(query: str, from_date: str, to_date: str, max_count: int = 1000) -> dict: async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
""" """
Run a PowerQuery against the Singularity Data Lake via the Scalyr XDR API. Run a PowerQuery against the Singularity Data Lake via the Scalyr XDR API.
Uses SDL_XDR_URL + SDL_LOG_READ_KEY (Scalyr readlog token). Uses SDL_XDR_URL + SDL_LOG_READ_KEY (Scalyr readlog token).
@@ -120,15 +114,11 @@ async def run_powerquery(query: str, from_date: str, to_date: str, max_count: in
"query": query, "query": query,
"startTime": start_ms, "startTime": start_ms,
"endTime": end_ms, "endTime": end_ms,
"maxCount": max_count, "maxCount": 1000,
} }
# Use a generous read timeout for PowerQuery — large SDL scans can be slow. async with httpx.AsyncClient(timeout=120) as client:
pq_timeout = httpx.Timeout(connect=15.0, read=SDL_PQ_TIMEOUT, write=30.0, pool=15.0) for attempt in range(3):
max_attempts = 2 + SDL_PQ_TIMEOUT_RETRIES # base 2 (rate-limit) + timeout retries
async with httpx.AsyncClient(timeout=pq_timeout) as client:
for attempt in range(max_attempts):
try: try:
resp = await client.post( resp = await client.post(
f"{SDL_XDR_URL}/api/powerQuery", f"{SDL_XDR_URL}/api/powerQuery",
@@ -136,24 +126,12 @@ async def run_powerquery(query: str, from_date: str, to_date: str, max_count: in
) )
resp.raise_for_status() resp.raise_for_status()
break break
except httpx.ReadTimeout:
if attempt < max_attempts - 1:
await asyncio.sleep(5)
continue
raise RuntimeError(
f"PowerQuery timed out after {SDL_PQ_TIMEOUT}s "
f"(increase SDL_PQ_TIMEOUT in .env). Query: {query[:200]}"
)
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_attempts - 1: if e.response.status_code == 429 and attempt < 2:
await asyncio.sleep(10 * (attempt + 1)) await asyncio.sleep(10 * (attempt + 1))
continue continue
try:
detail = e.response.json()
except Exception:
detail = e.response.text[:500]
raise RuntimeError( raise RuntimeError(
f"HTTP {e.response.status_code} from {e.request.url}: {detail}" f"HTTP {e.response.status_code} from {e.request.url}: {e.response.text[:500]}"
) from e ) from e
data = resp.json() data = resp.json()
-7
View File
@@ -16,16 +16,9 @@ services:
- SDL_XDR_URL=${SDL_XDR_URL} - SDL_XDR_URL=${SDL_XDR_URL}
- SDL_LOG_READ_KEY=${SDL_LOG_READ_KEY} - SDL_LOG_READ_KEY=${SDL_LOG_READ_KEY}
- SDL_CONFIG_READ_KEY=${SDL_CONFIG_READ_KEY} - SDL_CONFIG_READ_KEY=${SDL_CONFIG_READ_KEY}
- SDL_PQ_TIMEOUT=${SDL_PQ_TIMEOUT:-600}
- SDL_PQ_TIMEOUT_RETRIES=${SDL_PQ_TIMEOUT_RETRIES:-1}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- DATABASE_URL=postgresql://siem:siem@db:5432/siem - DATABASE_URL=postgresql://siem:siem@db:5432/siem
- DETECTIONS_FILE=/app/data/detections.json - DETECTIONS_FILE=/app/data/detections.json
- INGEST_PREWARM=${INGEST_PREWARM:-0}
- INGEST_PREWARM_HOURS=${INGEST_PREWARM_HOURS:-1,24,168}
- INGEST_PREWARM_DAYS=${INGEST_PREWARM_DAYS:-7}
- INGEST_PREWARM_DAILY_VOLUME_DAYS=${INGEST_PREWARM_DAILY_VOLUME_DAYS:-5}
- INGEST_PREWARM_INTERVAL_SECONDS=${INGEST_PREWARM_INTERVAL_SECONDS:-240}
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
+240 -1266
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
-14
View File
@@ -1,14 +0,0 @@
{
attributes: {
"dataSource.category": "security",
"dataSource.name": "Palo Alto Networks",
"dataSource.vendor": "Palo Alto Networks"
}
formats: [
{
id: "traffic-11-0",
format: "$network_activity.future_use_1$,$network_activity.receive_time$,$firewall.serial_number$,$network_activity.sub_type$,$timestamp$,$src.ip.address$,$dst.ip.address$,$network_endpoint.nat_src_ip$,$network_endpoint.nat_dst_ip$,$rule.name$,$user.src_name$,$user.dst_name$,$network_activity.app_name$,$network_traffic.virtual_system_name$,$source_zone$,$destination_zone$,$network_interface.inbound_name$,$network_interface.outbound_name$,$network_activity.log_action$,$session.uid$,$network_activity.repeat_count$,$network_endpoint.src_port$,$network_endpoint.dst_port$,$network_connection_info.flag$,$network_connection_info.protocol_name$,$network_activity.action$,$network_traffic.bytes$,$network_traffic.bytes_out$,$network_traffic.bytes_in$,$network_traffic.packets$,$network_activity.start_time_dt$,$network_activity.elapsed_time$,$network_activity.category_name$,$network_activity.sequence_number$,$network_activity.action_flags$,$location.src_country$,$location.dst_country$,$network_traffic.packets_out$,$network_traffic.packets_in$,$session.expiration_reason$,$device.group_hierarchy.level_1$,$device.group_hierarchy.level_2$,$device.group_hierarchy.level_3$,$device.group_hierarchy.level_4$,$firewall.virtual_system_name$,$device.name$,$network_activity.action_source$,$virtual_machine.src_vm_uuid$,$virtual_machine.dst_vm_uuid$,$device.imsi$,$device.imei$,$session.parent_uid$,$network_activity.parent_start_time_dt$,$network_connection_info.tunnel_type$,$network_connection_info.sctp_id$,$network_connection_info.sctp_chunks$,$network_connection_info.sctp_chunks_out$,$network_connection_info.sctp_chunks_in$,$rule.uid$,$network_activity.http_connection$,$network_connection_info.app_flap_count$,$policy.uid$,$network_connection_info.link_switches$,$network_connection_info.sd_wan_cluster$,$network_connection_info.sd_wan_device_type$,$network_connection_info.sd_wan_cluster_type$,$network_connection_info.sd_wan_site$,$user.groups$,$http_request.x_forwarded_for$,$device.src_type$,$device.src_profile$,$device.src_model$,$device.src_vendor_name$,$device.src_os_edition$,$device.src_os_version$,$network_connection_info.src_hostname$,$device.src_mac$,$device.dst_type$,$device.dst_profile$,$device.dst_model$,$device.dst_vendor_name$,$network_connection_info.dst_hostname$,$network_connection_info.dst_mac$,$container.id$,$container.pod_namespace$,$container.pod_name$,$network_endpoint.src_host_list$,$network_endpoint.dst_host_list$,$network_endpoint.host_id$,$device_hardware_info.serial_number$,$policy.src_group$,$policy.dst_group$,$session.owner$,$network_activity.time$,$network_activity.a_slice.service_type$,$network_activity.a_slice.differentiator$,$network_activity.sub_category$,$network_activity.app_model$,$network_activity.severity$,$network_activity.container.id$,$network_activity.app_tunnel_type$,$network_activity.is_saas$,$network_activity.is_sanctioned$,$network_activity.is_offloaded$,$network_activity.flow_type$,$network_activity.cluster.name$",
halt: true
}
]
}
-91
View File
@@ -1,91 +0,0 @@
{
// specify a time zone if the timestamps in your log are not in GMT
//timezone: "Europe/Prague"
attributes: { "dataset.technology":"firewall", "dataset.vendor":"palo_alto", "dataset.app":"palo_alto" }
patterns: {
//maps to high_resolution_timestamp:
// timestamp: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}(\\+|-)\\d{2}:\\d{2}"
tsPatternPA: "[A-za-z]+\\s+\\d{1,2} [\\d:]+"
//application_characteristic can be a single value, a comma delimited list in quotes, or blank. Null value is handled by format: traffic-2, not by this pattern.
application_characteristic: "(\".*\")|[^,]+"
//description field from system log is wrapped in quotes and may contain commas"
description: "(\".*\")"
//discard future_use fields
misc: "[^,]*"
}
formats: [
//change pattern depending on the timestamp fomat
{
format: ".*$timestamp=tsPatternPA$(\\,)*",
},
{
//match all fields. application_characteristic can be a single value, or a comma delimited list in quotes.
id: "traffic-1",
attributes: { type: "TRAFFIC", format: "traffic-1" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,TRAFFIC,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,$bytes$,$bytes_sent$,$bytes_received$,$packets$,$start_time$,$elapsed_time$,$category$,$test$,$sequence_number$,$action_flags$,$source_country$,$destination_country$,,$packets_sent$,$packets_received$,$session_end_reason$,\\d+,\\d+,\\d+,\\d+,$virtual_system_name$,$device_name$,$action_source$,$source_vm_uuid$,$destination_vm_uuid$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$sctp_association_id$,$sctp_chunks$,$sctp_chunks_sent$,$sctp_chunks_received$,$rule_uuid$,$http_2_connection$,$app_flap_count$,$policy_id$,$link_switches$,$sd-wan_cluster$,$sd-wan_device_type$,$sd-wan_cluster_type$,$sd-wan_site$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$session_owner$,$high_resolution_timestamp$,$a_slice_service_type$,$a_slice_differentiator$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,$application_characteristic=application_characteristic$,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$,$offloaded$",
halt: true
},
{
//dont match on application_characteristic for cases where is it blank.
id: "traffic-2",
attributes: { type: "TRAFFIC", format: "traffic-2" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,TRAFFIC,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,$bytes$,$bytes_sent$,$bytes_received$,$packets$,$start_time$,$elapsed_time$,$category$,$test$,$sequence_number$,$action_flags$,$source_country$,$destination_country$,,$packets_sent$,$packets_received$,$session_end_reason$,\\d+,\\d+,\\d+,\\d+,$virtual_system_name$,$device_name$,$action_source$,$source_vm_uuid$,$destination_vm_uuid$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$sctp_association_id$,$sctp_chunks$,$sctp_chunks_sent$,$sctp_chunks_received$,$rule_uuid$,$http_2_connection$,$app_flap_count$,$policy_id$,$link_switches$,$sd-wan_cluster$,$sd-wan_device_type$,$sd-wan_cluster_type$,$sd-wan_site$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$session_owner$,$high_resolution_timestamp$,$a_slice_service_type$,$a_slice_differentiator$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$,$offloaded$",
halt: true
},
{
id: "system",
attributes: { type: "SYSTEM", format: "system" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,SYSTEM,$content_threat_type$,.*,$generated_time$,$virtual_system$,$event_id$,$object$,.*,.*,$module$,$severity$,$description=description$,$sequence_number$,$action_flags$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,.*,.*,$high_resolution_timestamp$",
halt: true
},
{
//matches THREAT logs with comma surround lists in application_characteristic and url_category_list.
// Matches THREAT logs with commas surrounding user_agent
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-0",
attributes: { type: "THREAT", format: "threat-0" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,\"$user_agent$\",$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,\"$url_category_list$\",$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs with comma surround lists in application_characteristic and url_category_list.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-1",
attributes: { type: "THREAT", format: "threat-1" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,\"$url_category_list$\",$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs without comma surround list in url_category_list.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-2",
attributes: { type: "THREAT", format: "threat-2" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,$url_category_list$,$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs without comma surround list in url_category_list or application_characteristic.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-3",
attributes: { type: "THREAT", format: "threat-3" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,$url_category_list$,$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,$application_characteristic$,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
id: "userid",
attributes: { type: "USERID", format: "userid" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,USERID,$threat_content_type$,$misc=misc$,$generated_time$,$virtual_system$,$source_ip$,$user$,$data_source_name$,$event_id$,$repeat_count$,$time_out_threshold$,$source_port$,$destination_port$,$data_source$,$data_source_type$,$sequence_number$,$action_flags$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$virtual_system_id$,$factor_type$,$factor_completion_time$,$factor_number$,$user_group_flags$,$user_by_source$,$misc=misc$,$high_resolution_timestamp$",
halt: true
},
{
//dont match on application_characteristic for cases where is it blank.
id: "basic",
attributes: { format: "basic" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,$type$,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,.*",
},
]
}
-222
View File
@@ -1,222 +0,0 @@
#!/usr/bin/env bash
# tools/sync-upstream.sh
# Pull the latest changes from upstream (mickbrowns1/SIEM-Toolkit) while
# preserving the fork's improvements, then verify the fork invariants
# still hold. Designed to be safe to run repeatedly.
#
# Usage:
# ./tools/sync-upstream.sh # rebase (clean linear history)
# ./tools/sync-upstream.sh --merge # merge-commit instead of rebase
# ./tools/sync-upstream.sh --no-rebuild # skip docker rebuild + verify
# ./tools/sync-upstream.sh --no-push # don't auto-push at the end
# ./tools/sync-upstream.sh --dry-run # show what would happen
#
# Exit codes:
# 0 fully up-to-date or sync succeeded and all invariants pass
# 1 pre-condition failed (dirty tree, wrong remote, etc.)
# 2 merge / rebase conflicts (resolve manually, then re-run with --resume)
# 3 one or more fork invariants regressed after sync
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$REPO_DIR"
# --- defaults -----------------------------------------------------------
MODE=rebase
DO_REBUILD=1
DO_PUSH=1
DRY_RUN=0
UPSTREAM_REMOTE="${UPSTREAM_REMOTE:-upstream}"
UPSTREAM_BRANCH="${UPSTREAM_BRANCH:-main}"
ORIGIN_REMOTE="${ORIGIN_REMOTE:-origin}"
BACKEND_URL="${BACKEND_URL:-http://localhost:8001}"
BACKEND_CONTAINER="${BACKEND_CONTAINER:-siem-toolkit-patched-backend-1}"
while [[ $# -gt 0 ]]; do
case "$1" in
--merge) MODE=merge ;;
--no-rebuild) DO_REBUILD=0 ;;
--no-push) DO_PUSH=0 ;;
--dry-run) DRY_RUN=1; DO_REBUILD=0; DO_PUSH=0 ;;
-h|--help)
sed -n '2,/^$/p' "$0" | sed 's/^# \{0,1\}//'
exit 0 ;;
*) echo "unknown arg: $1" >&2; exit 1 ;;
esac
shift
done
bold() { printf '\033[1m%s\033[0m\n' "$*"; }
red() { printf '\033[31m%s\033[0m\n' "$*"; }
green(){ printf '\033[32m%s\033[0m\n' "$*"; }
yellow(){ printf '\033[33m%s\033[0m\n' "$*"; }
# --- 1. pre-conditions --------------------------------------------------
bold "== 1. pre-conditions =="
if ! git remote get-url "$UPSTREAM_REMOTE" >/dev/null 2>&1; then
red "no '$UPSTREAM_REMOTE' remote configured. Add with:"
echo " git remote add upstream https://github.com/mickbrowns1/SIEM-Toolkit.git"
exit 1
fi
echo " upstream remote : $(git remote get-url "$UPSTREAM_REMOTE")"
echo " origin remote : $(git remote get-url "$ORIGIN_REMOTE")"
if [[ -n "$(git status --porcelain)" ]]; then
red "working tree is not clean. Commit or stash changes first:"
git status -s
exit 1
fi
green " working tree clean"
CUR_BRANCH=$(git rev-parse --abbrev-ref HEAD)
echo " current branch : $CUR_BRANCH"
# --- 2. snapshot --------------------------------------------------------
SAFETY_TAG="safety/$(date +%Y%m%d-%H%M%S)"
bold "== 2. safety tag =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would create tag $SAFETY_TAG"
else
git tag "$SAFETY_TAG"
echo " created $SAFETY_TAG"
fi
# --- 3. fetch upstream --------------------------------------------------
bold "== 3. fetch upstream =="
git fetch "$UPSTREAM_REMOTE" --quiet
echo " fetched ${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
HEAD_SHA=$(git rev-parse HEAD)
UP_SHA=$(git rev-parse "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
MB=$(git merge-base HEAD "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
NEW_COUNT=$(git rev-list --count "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
OUR_COUNT=$(git rev-list --count "${MB}..HEAD")
echo " HEAD : $HEAD_SHA"
echo " upstream/$UPSTREAM_BRANCH : $UP_SHA"
echo " merge-base : $MB"
echo " upstream commits : $NEW_COUNT new"
echo " our commits ahead : $OUR_COUNT"
if [[ "$NEW_COUNT" == 0 ]]; then
green "== already current with upstream =="
NEW_SYNC=0
else
NEW_SYNC=1
bold "-- new upstream commits --"
git log --oneline "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
fi
# --- 4. apply (rebase or merge) ----------------------------------------
if [[ "$NEW_SYNC" == 1 ]]; then
bold "== 4. applying upstream changes ($MODE) =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would $MODE $UPSTREAM_REMOTE/$UPSTREAM_BRANCH into $CUR_BRANCH"
else
if [[ "$MODE" == "rebase" ]]; then
if ! git rebase "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"; then
red "rebase has conflicts."
echo "Resolve, then run: git rebase --continue"
echo "Or abort with : git rebase --abort"
echo "Recover snapshot : git reset --hard $SAFETY_TAG"
exit 2
fi
else
if ! git merge --no-ff "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}" \
-m "Sync upstream $(date +%Y-%m-%d)"; then
red "merge has conflicts."
echo "Resolve, then commit. Recover with: git reset --hard $SAFETY_TAG"
exit 2
fi
fi
green " ${MODE} succeeded"
fi
fi
# --- 5. rebuild + verify invariants ------------------------------------
if [[ "$DO_REBUILD" == 1 ]]; then
bold "== 5. rebuild backend + run invariants =="
docker compose up -d --force-recreate --build backend 2>&1 | tail -5
echo " waiting 15s for startup..."
sleep 15
FAILS=0
check() {
local label="$1" cmd="$2" expect="$3"
local got
got="$(eval "$cmd" 2>/dev/null || echo '<error>')"
if [[ "$got" == "$expect" ]]; then
green " PASS $label ($got)"
else
red " FAIL $label expected='$expect' got='$got'"
FAILS=$((FAILS + 1))
fi
}
# Invariant 1: Parser dropdown excludes ueba_* artefacts (fix 70f3f83)
check "parser dropdown excludes ueba_*" \
"curl -fsS $BACKEND_URL/api/quality/parsers | python3 -c 'import sys,json; d=json.load(sys.stdin); print(sum(1 for p in d[\"parsers\"] if p.lower().startswith(\"ueba\")))'" \
"0"
# Invariant 2: MITRE coverage is <= 100 (fix f821151)
check "mitre_pct <= 100" \
"curl -fsS $BACKEND_URL/api/coverage/health | python3 -c 'import sys,json; d=json.load(sys.stdin); print(d[\"mitre_pct\"] <= 100)'" \
"True"
# Invariant 3: ingest cache endpoints exist (fix 0a01a56)
check "/api/ingest/cache-stats exists" \
"curl -fsS -o /dev/null -w '%{http_code}' $BACKEND_URL/api/ingest/cache-stats" \
"200"
# Invariant 4: /sample-unlabelled is registered as a POST route (port from
# upstream sync). GET to it should return 405 Method Not Allowed (route
# exists, wrong method) rather than 404 (route missing).
# Note: -f is omitted because 405 is the expected non-2xx status here.
check "/api/quality/sample-unlabelled registered" \
"curl -sS -o /dev/null -w '%{http_code}' -X GET $BACKEND_URL/api/quality/sample-unlabelled" \
"405"
# Invariant 5: prewarmer scheduled (fix fec3568) — only if INGEST_PREWARM=1.
# Poll up to 30s because the task logs 'starting' a few seconds after the
# FastAPI startup phase finishes (postgres + lib autoload first).
if grep -q '^INGEST_PREWARM=1' .env 2>/dev/null; then
prewarm_ok=0
for _ in 1 2 3 4 5 6; do
if docker logs "$BACKEND_CONTAINER" 2>&1 | grep -q 'prewarmer:.*starting'; then
prewarm_ok=1; break
fi
sleep 5
done
if [[ "$prewarm_ok" == 1 ]]; then
green " PASS prewarmer started"
else
red " FAIL prewarmer did not log 'starting' within 30s (INGEST_PREWARM=1 but task missing)"
FAILS=$((FAILS + 1))
fi
else
yellow " SKIP prewarmer (INGEST_PREWARM not enabled in .env)"
fi
if [[ "$FAILS" -gt 0 ]]; then
red "== $FAILS invariant(s) regressed after sync =="
echo "Recover the pre-sync state with: git reset --hard $SAFETY_TAG"
exit 3
fi
green " all invariants pass"
fi
# --- 6. push -----------------------------------------------------------
if [[ "$DO_PUSH" == 1 && "$NEW_SYNC" == 1 ]]; then
bold "== 6. push to $ORIGIN_REMOTE/$CUR_BRANCH =="
git push "$ORIGIN_REMOTE" "$CUR_BRANCH" --force-with-lease
green " pushed"
fi
bold "== done =="
echo " branch : $CUR_BRANCH"
echo " HEAD : $(git rev-parse --short HEAD)"
echo " safety snapshot: $SAFETY_TAG (delete with: git tag -d $SAFETY_TAG)"