45 Commits

Author SHA1 Message Date
marc 99d63837b5 Add tools/sync-upstream.sh: safe upstream-sync workflow
Wraps the recurring 'fetch upstream, rebase, verify invariants, push'
workflow into a single command with safety nets:

- creates a tag snapshot before mutating the branch
- aborts on dirty tree
- rebase by default (--merge for merge-commit instead)
- after sync, rebuilds the backend container and verifies 5 fork-only
  invariants are still met (parser dropdown filtered, mitre_pct <= 100,
  cache endpoints present, /sample-unlabelled present, prewarmer task
  scheduled when opted in)
- exits non-zero with the recovery command if invariants regress
- optional --dry-run / --no-rebuild / --no-push for ad-hoc inspection
2026-05-22 20:50:28 +02:00
marc fec356829c Ingest Dashboard: optional background cache pre-warmer
Adds an asyncio background task that re-runs the heavy Ingest Dashboard
queries every ~4 min (just under the 5 min TTL) so the in-process cache
is always populated. First user hit on any dashboard widget then returns
from cache (single-digit ms) instead of waiting 30-60s for SDL.

Components:
  - backend/services/prewarmer.py: standalone module, opt-in via
    INGEST_PREWARM=1; configurable windows via INGEST_PREWARM_HOURS /
    INGEST_PREWARM_DAYS / INGEST_PREWARM_DAILY_VOLUME_DAYS and interval
    via INGEST_PREWARM_INTERVAL_SECONDS. Logs through the uvicorn logger
    so cycles are visible in 'docker logs'.
  - backend/main.py: spawn the task on FastAPI startup.
  - docker-compose.yml: forward INGEST_PREWARM* env vars to the
    backend service (default off).

Measured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on Purple fMeasured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on  (INGEST_PREWARM=0) so non-opt-in
users see no behaviour change.
2026-05-22 20:41:36 +02:00
marc 0a01a56218 Ingest Dashboard: 5min TTL cache + days->hours normalisation
Dashboard reloads on multi-day windows could take 30-60s and sometimes
returned HTTP 502 ('internal Scalyr error') when the SDL window was
expressed in days. Two-part fix:

1. In-process async TTL cache (services/async_cache.py)
   - 5 min TTL on top-sources, by-event-type, daily-volume.
   - Single-flight lock per cache key (no thundering herd).
   - Optional ?nocache=1 query param to force a refresh.
   - New endpoints: GET /api/ingest/cache-stats, DELETE /api/ingest/cache.

2. Normalise days -> hours upstream of the PowerQuery
   - SDL is unstable on day-scale windows for large group-by counts on
     this tenant but stable on the equivalent hour-scale window.
   - top-sources?days=1 used to 502; now works.

Measured on Purple AI tenant:
  top-sources?days=7  cold 55.7s -> warm 13ms (~4300x)
  t  t  t  t  t  t  t  t  t    -> 4ms (cold) / 1.4ms (warm)
2026-05-22 20:10:03 +02:00
marc f82115143c Health Score: cap MITRE Coverage at 100% by canonicalising tactics
STAR rules sometimes label tactics with non-canonical names (e.g. 'Stealth',
'Defense Impairment') which were counted as distinct tactics on top of the
14 canonical ATT&CK Enterprise ones, producing percentages > 100%
(observed 15/14 = 107.1% on Purple AI tenant).

Fix in get_health_score():
  - Restrict covered_tactics to the 14 canonical ATT&CK Enterprise tactics.
  - Map known STAR aliases ('Stealth', 'Defense Impairment') -> 'Defense Evasion'.
  - Derive TOTAL_TACTICS from the canonical set (single source of truth).

Result: tactics_covered = 14, mitre_pct = 100.0 (was 15 / 107.1).
2026-05-22 19:41:48 +02:00
marc 70f3f83db3 Parser Test Runner: filter non-parser SDL artefacts from dropdown
SDL /logParsers/ also returns UEBA analytics tables, saved searches and
dashboard configs. They're not valid Test Runner inputs and pollute the
dropdown. Filter list_parser_files in two tiers:
 1) Name denylist (ueba_*, searches, *_baselines_*, *_features_*,
    *_scores_*, bsi-*, *-overview, smoke/test tables).
 2) Content scan: file must contain attributes:/patterns:/formats:/
    patternRefs:/rewrites:/parser: in first 4 KB.

Result: 97 files -> 41 real parsers, 0 false pos/neg.
2026-05-22 19:36:58 +02:00
marc 7c1687efce Sync upstream features; preserve fork KV scanner, parsers, verifier
Brought in 35 upstream commits (MITRE heatmap, health score, dependency map,
PowerQuery playground, onboarding tracker, product grouping, modern UI redesign).

Preserved fork additions:
  backend/routers/quality.py  KV scanner, pattern refs, JS keys, JSON mode,
                              /parsers + /sync-from-sdl endpoints
  parsers/                    96 OCSF + tenant parsers
  tools/stormshield-verify/   end-to-end ingest regression test
  .gitignore                  un-ignored parsers/*
  CHANGES.md, PATCHES.md
2026-05-22 18:19:52 +02:00
Mick a7ebcac9a6 Revert "Add product grouping to rule displays across coverage and threat pages"
This reverts commit 7620d1fcc8.
2026-05-22 12:08:56 -04:00
Mick b494c751aa Revert "Preserve parser_detected across syncs to prevent coverage regression"
This reverts commit 21c8644443.
2026-05-22 12:08:56 -04:00
Mick 21c8644443 Preserve parser_detected across syncs to prevent coverage regression
Before re-creating ActiveSource rows, snapshot existing parser_detected
values. When writing new rows, take max(new, previous) so a source that
was once confirmed as parsed (event.type present in the data lake) never
loses its Covered status due to a sampling gap, partial query result, or
SDL PowerQuery timeout during Sync All.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 12:07:03 -04:00
Mick 7620d1fcc8 Add product grouping to rule displays across coverage and threat pages
- Extract product label from rule data_sources in coverage.py via new
  _product_from_data_sources() helper (prefers non-SentinelOne entries
  so product-specific rules get a meaningful label)
- Coverage Map detections column: rules now grouped by product with
  collapsible chevron headers showing fired/silent counts
- Threat Coverage Rule Firing Status: collapsible product group headers
  with active/silent summary; shows all 2066 rules across 30 products
- Threat Coverage Dependency Map: collapsible product groups, at-risk
  products sorted first with risk count in header
- Ingest Dashboard: fix source name truncation — table cells now wrap
  with break-all and title tooltip; bar chart labels extended to 16
  chars with ellipsis and full-name tooltip on hover

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:56:27 -04:00
Mick bb2c00f2fa Collapse MITRE tactic cards by default — click to expand
Each card shows tactic name, technique count, and rule badge in the header.
Clicking the header toggles the technique chips with an animated chevron.
The existing '+N more' expander still works within the expanded card.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:31:23 -04:00
Mick 1a2b289f32 Rename pipeline sections to Full Pipeline / Partial Pipeline 2026-05-22 11:27:05 -04:00
Mick 800d3c545a Split onboarding pipeline into detection-mapped vs parser-only groups
Sources without detection rules no longer show stages 5-6 as failures:
- Backend: has_detection_rules flag added per source; progress (pct) calculated
  over 4 core stages for sources with no rules; detection stages marked na:true
- Frontend: pipeline splits into two sections —
    'With Detection Coverage' (6-stage, full pipeline)
    'Parser Only' (4-stage, stages 5-6 shown as — N/A)
  Each section has its own Show/Hide completed toggle
- Collapsed by default; Show Pipeline toggle reveals both sections

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:26:26 -04:00
Mick 62e29d131d Collapse onboarding pipeline table by default
Shows summary stats (Fully Onboarded / In Progress / Not Started) immediately
on page load; table is hidden until user clicks 'Show Pipeline'. Keeps the
Onboarding page scannable without scrolling past a large table to reach the
prompt template.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:23:52 -04:00
Mick d0299e0f23 Add health score, coverage trends, dependency map, PowerQuery playground, onboarding tracker
Tenant Health Score:
- CoverageSnapshot table stores daily health metrics (parser %, MITRE %, firing %)
- _compute_health() weighted formula: 40% parser coverage + 35% MITRE + 25% firing
  (reweighted 55/45 when firing cache empty)
- GET /api/coverage/health returns score + delta vs previous snapshot
- GET /api/coverage/snapshots returns chronological history for sparklines
- POST /api/coverage/snapshot for manual recording
- Auto-snapshot recorded at end of every sync-sources call
- Overview dashboard: prominent health score card with color coding, component
  breakdown, delta indicator, and inline SVG sparkline (last 30 points)

Rule Dependency Map:
- GET /api/coverage/dependency-map flips the coverage map — rule → required sources
- Each source flagged healthy/inactive/no_parser; at_risk = any source missing
- New section on Threat Coverage tab with at-risk filter toggle

PowerQuery Playground:
- New query.py router: GET /presets (7 curated queries) + POST /run
- New Query nav tab with time-range pills, preset buttons, localStorage history,
  monospace textarea, auto-column results table, client-side CSV export

Onboarding Tracker:
- GET /api/coverage/onboarding-status returns per-source pipeline progress
  across 6 stages: Data Received → Parser File → Parser Active → Source
  Labeled → Detection Rules → Rules Firing
- New section on Onboarding tab with emoji stage dots, progress bars,
  collapsed completed sources with show/hide toggle

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 11:09:43 -04:00
Mick b4314c07df Update README to reflect current feature set
- Add Threat Coverage tab (MITRE heatmap + rule firing status)
- Document Sync All button, SDL Config API parser sync, SDL_CONFIG_READ_KEY
- Update Parser Coverage Map: unlabelled events banner, Attributes Missing filter,
  detections column with firing status badges
- Add Parser Quality sections: unlabelled event sampler, attributes missing audit,
  JSON/NDJSON parser test runner
- Add environment variables reference table (SDL_PQ_TIMEOUT, SDL_CONFIG_READ_KEY)
- Update architecture diagram to include SDL Config File API
- Simplify setup: Sync All replaces manual multi-step first run
- Update project layout to reflect RuleFiringCache model and current file structure
- Switch docker-compose commands to `docker compose` (v2 syntax)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:46:56 -04:00
Mick 7b4eceefb8 Fix MITRE extraction to use actual S1 API structure + use generatedAlerts for firing status
MITRE fix:
- S1 platform-rules API returns rule["mitre"] = [{tactic, techniques:[{id,title}]}]
  not the flat field names we were checking — updated _extract_mitre to handle
  this as the primary path, keeping flat field fallback for STAR rules
- generatedAlerts field on each platform rule stored in raw JSON during import

Firing status fix:
- sync-rule-firing now reads generatedAlerts from ParsedRule.raw as fast path
  (instant, no SDL PowerQuery needed) since it's returned directly by the
  platform-rules API on every library sync
- SDL PowerQuery retained as fallback for rules imported from detections.json

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:42:48 -04:00
Mick 7922de315e Add MITRE ATT&CK heatmap and detection rule firing status
MITRE ATT&CK heatmap:
- _extract_mitre() helper extracts tactics/techniques from S1 API rules
  handling multiple field name conventions (tactic, mitreTechniques, etc.)
- _import_from_api_rules and _import_detections now store tactics/techniques
  in raw JSON alongside data_sources
- GET /api/coverage/mitre returns tactic/technique breakdown ordered by
  ATT&CK kill chain with coverage stats
- New "Threat Coverage" tab in frontend: stat cards (total rules, MITRE
  mapped, tactics covered, techniques covered), tactic cards grid with
  left-border color coding and technique chips with "+N more" expander

Detection rule firing status:
- RuleFiringCache table tracks alert_count per rule_name
- POST /api/coverage/sync-rule-firing queries SDL PowerQuery with 3
  field-name patterns to find rule firing data; upserts into cache
- GET /api/coverage/rule-firing-cache returns cache sorted by alert count
- /map now includes alert_count per rule and firing_cache_populated flag
- Coverage map Detections column: when cache populated, shows alert count
  in green or ⚠ amber for rules that have never fired

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:25:45 -04:00
Mick 2c40bf81ee Cherry-pick improvements from PR #2 (marcredhat)
- s1_client: configurable PowerQuery timeout via SDL_PQ_TIMEOUT env var
  (default 600s, was hardcoded 120s) with separate connect/read timeouts
  via httpx.Timeout; retry on ReadTimeout via SDL_PQ_TIMEOUT_RETRIES;
  better error messages include query snippet and parse non-JSON responses
- ingest: fix simulate-filter SDL syntax (== → =, drop leading | on base
  expression, surface PowerQuery error field, cleaner empty-filter fallback)
- docker-compose: pass SDL_PQ_TIMEOUT and SDL_PQ_TIMEOUT_RETRIES through
  to backend container with sensible defaults

Not taken from PR #2:
- .gitignore parsers/* change — would untrack the 7 committed parser files
- s1_client/quality/coverage changes already present in main from prior work

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:11:42 -04:00
Mick c5a4f796a0 Add unlabelled event detection, stub parser quality, Sync All, and modern UI redesign
Key changes:
- Unlabelled event banner: shows count only after Sample Events is clicked; uses broad SDL filter expression; time window synced to sync-days dropdown
- Parser Quality: new "Attributes Missing" subsection listing all parsers without dataSource.name regardless of event volume
- Coverage map: filter buttons (All / Complete Parser / Attributes Missing); stat card renamed to "Incomplete Parser"; stub count excluded from sync when no active sources
- Sync All button: runs SDL parser sync → library sync → live sources sync in sequence
- Reset now clears ActiveSource table and resets unlabelled count cache
- run_powerquery: configurable max_count param (default 1000, 50M for count queries)
- _DS_NAME_RE: supports both quoted and unquoted dataSource.name keys in parser files
- Full modern UI redesign: slate palette, gradient cards, ring borders, pill nav, colored stat accents
- Updated 7 tracked parser files synced from SDL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 10:00:21 -04:00
Mick 0013adbe7e Merge pull request #1 from marcredhat/fix/json-parser-and-pq-syntax
Fix Parser Test Runner JSON mode, Filter Simulator PQ syntax, and parser dropdown
2026-05-20 15:25:39 -04:00
Mick 6cd9da82da Auto-load detection library from S1 API, improve coverage map accuracy
- Fetch detection library rules from platform-rules API at startup (falls
  back to extracted.json); adds Sync Detection Library button for refresh
- Parser column simplified to ✓ Parsed / ✗ Not Parsed
- Detection counts now use library rules only (exclude custom STAR rules)
- Add close-match suggestions for dataSource.name mismatches (e.g. CloudTrail
  → AWS CloudTrail, Microsoft 365 Collaboration → Microsoft O365)
- Exclude SentinelOne Ranger AD from coverage map (native S1 source)
- Add success feedback banners to Load SDL Parsers and Sync Library buttons
- Remove rule_counts.json manual override; extracted.json is source of truth
- Remove Load Detections button; rules auto-import on backend startup
- Add get_account_id() and get_platform_rules() to s1_client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 15:14:10 -04:00
marc d8d62478c0 Add helper scripts: SDL parser sync, PQ probes, test-parser smoke tests 2026-05-20 19:41:00 +02:00
marc 8dbd38f3bb Fix Parser Test Runner JSON mode, Filter Simulator PQ syntax, dropdown source
- backend/routers/quality.py
 * Add GET /api/quality/parsers (lists actual files in /app/parsers)
 * Support SDL JSON auto-extract parsers ($=json{parse=json}$)
 * Apply parser rewrite blocks with correct $0/$N backref translation
 * Accept single JSON / JSON array / NDJSON in test-parser body
 * Flatten JSON inside 'message' for Field Population coverage
- backend/routers/ingest.py
 * Rewrite simulate-filter PowerQuery to valid SDL syntax
 * Correct field name: src.name -> dataSource.name
- frontend/index.html
 * Parser dropdown loads from /api/quality/parsers
 * Add 'Last 7d' lookback option
 * Render JSON-mode test results with badges + payload counter
2026-05-20 19:40:24 +02:00
Mick 6e137438b1 Add Detection Fields Missing column + STAR_LIBRARY_ONLY setting
Coverage Map:
- New "Detection Fields Missing" column shows dotted-path SDL fields that
  associated STAR rules reference but the parser does not provide
- Only dotted field paths (src.ip, winEventLog.channel) are considered;
  single-word correlation variables and metadata tokens are excluded
- Schema fields always present in events (dataSource.name, event.type etc)
  are excluded from the missing list

Settings:
- New STAR_LIBRARY_ONLY field (select: true/false) controls whether
  Load Library STAR Rules filters to @sentinelone.com creators or loads all
- Rendered as a dropdown in the Settings form with a hint description
- saveSettings now always persists select field values (not just non-empty)
- load-star-rules reads STAR_LIBRARY_ONLY env var as its default

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:46:05 -04:00
Mick a50fd35934 Filter STAR rules to Library only (creator @sentinelone.com)
load-star-rules now defaults to library_only=true, filtering rules where
the creator email ends in @sentinelone.com. Custom tenant rules are excluded
by default. Pass ?library_only=false to load all rules.
Button label updated to "Load Library STAR Rules" to make intent clear.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:42:09 -04:00
Mick 4d6125eb4d Add Default Parser Only and No Parser filters to Coverage Map
Filters are now: All | Custom Parser | Default Parser Only | No Parser

- Custom Parser: covered sources with a loaded SDL parser file
- Default Parser Only: covered via event.type detection in data lake
  but no custom parser file — built-in or cloud-managed parser running
- No Parser: parser_needed sources (no parser found at all)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:35:30 -04:00
Mick 1a68fbea2d Rewrite README in the Queen's English, inspired by Pineapple Boy
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:28:15 -04:00
Mick 3f80e4c344 Add README with full feature documentation
Covers setup, architecture, all five pages (Coverage Map, Ingest Dashboard,
Parser Quality, Onboarding, Settings), expected results for each tool,
rebuild commands, and project layout.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:25:28 -04:00
Mick 74c3a8d6a3 Auto-discover fields from log sample when source selected in Field Population Rate
Selecting a source triggers a 20-event sample; actual field names from the
log are merged with SDL schema defaults (log fields first) and pre-filled
into the fields input. Falls back to SDL defaults if no events found.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:23:36 -04:00
Mick 1aca7154c2 Default Live Event Sampler to 10 events
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:21:51 -04:00
Mick 799e413041 Add per-row copy button to Live Event Sampler message column
Message column is pinned last, shows 80 chars with tooltip for the full
value, and has a ⎘ copy button that flashes ✓ on success. Other field
cells are unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:18:44 -04:00
Mick 5421b2de61 Populate source dropdowns in Parser Quality from synced active sources
Live Event Sampler and Field Population Rate now load sources from the
coverage map on page render instead of free-text inputs. Sources are sorted
by event count (busiest first) and show event totals. Falls back to a hint
message if no sources have been synced yet.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:16:50 -04:00
Mick 1b07a59991 Use parsed event detection in data lake as coverage signal
- sync-sources now runs a parallel PowerQuery checking for event.type
  population per source; count stored in new active_sources.parser_detected
- Coverage map marks a source as covered if parser_detected > 0, even
  without a matching local parser file (handles built-in/cloud parsers)
- UI parser cell shows "Parsed (N typed events detected)" for data-lake-
  detected parsers vs named local parser files
- Runtime ALTER TABLE migration adds parser_detected column to existing DBs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 13:06:29 -04:00
Mick 81e3656c46 Fix coverage map matching: three-tier lookup for parser-to-source mapping
1. Exact dataSource.name match
2. Normalized substring on parser's dataSource.name attribute
3. Normalized substring on parser filename (catches files with wrong ds name)

Fixes CloudTrail (filename aws_cloudtrail-latest matches "cloudtrail") and
Palo Alto Networks Firewall (ds name "Palo Alto Networks" matches via substring).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:56:51 -04:00
Mick 999c0f7b83 Add Parser Quality page: Live Event Sampler, Field Population Rate, Parser Test Runner
- New /api/quality router with three endpoints:
  sample-events: pull raw events from a source via PowerQuery
  field-population: measure % of events with each SDL field populated;
    surfaces dataSource.name correctly (100% when filtered by it) and
    returns fields_seen_in_sample so you can see what IS being extracted
  test-parser: converts SDL \$field=pattern\$ format strings to Python
    named-group regex and tests against a pasted raw log line
- New "Parser Quality" nav item and page with all three tools
- Home page card added for Parser Quality
- Field population UI shows per-field colour-coded progress bars plus
  a chip list of fields actually present in the sample

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:53:48 -04:00
Mick 058b1e7cf1 Default Ingest Dashboard to 1h view on load
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:46:30 -04:00
Mick a5d0be0a7c Show events-by-source bar chart in 1h mode instead of blank message
When the 1h time filter is active the volume chart now renders the top-sources
data as a by-source bar chart (up to 12 sources) with the gradient fill and a
"Events by Source (Last 1h)" heading. Chart labels are auto-detected as dates
or source names so truncation is applied correctly for both modes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:45:55 -04:00
Mick ac97196435 Improve coverage map matching, bar chart gradients, and add 1h time filter
- Coverage map: replace filename fuzzy-match with exact dataSource.name
  lookup read directly from parser file attributes; grok/dottedJson parsers
  now flagged as "parser_needed" with format type shown in the UI
- Bar chart: SVG linearGradient (light purple → deep violet) replaces flat fill
- Ingest dashboard: add 1h button (first option) backed by new backend
  hours= query param on /api/ingest/top-sources; daily-volume chart shows
  informational message when in 1h mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:43:10 -04:00
Mick f0bd56aee8 Rewrite coverage map as source-centric view
Previously showed field-level coverage (rule fields vs parser fields).
Now shows per-dataSource.name coverage: is a parser loaded for each
active ingest source?

- New ActiveSource DB model stores live sources from SDL
- New POST /api/coverage/sync-sources endpoint runs PowerQuery to fetch
  current dataSource.names and their event counts, stores in DB
- GET /api/coverage/map now returns per-source status:
    covered       = a loaded parser matches this source name
    parser_needed = source is ingesting but no parser is loaded
- Parser matching uses fuzzy substring (handles "palo"→"Palo Alto Networks Firewall")
- Coverage table shows: source name, 7d event count, status, matched parser + field count, STAR rules
- Frontend: new "Sync Live Sources" button, updated stats cards, updated filter tabs
- Removed field-level view (was confusing — parser_needed on a field ≠ missing parser for a source)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 12:31:48 -04:00
Mick 2262892859 Improve daily volume bar chart readability
- Add event count label on top of each bar (e.g. 220 or 1.2k)
- Add Y-axis grid lines and tick labels so scale is readable
- Label shows MM/DD date format for compact display
- Chart heading now reads "events ingested per day" to clarify
  these are individual daily counts, not cumulative totals

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 11:59:12 -04:00
Mick 08c7a8a5b5 Add Filter Simulator help panel on Ingest Dashboard
Adds a collapsible "How does this work?" panel explaining:
- What the simulator does (live PowerQuery count → GB projection)
- When to use it (after spotting a noisy source in Top Sources)
- How to fill in Source name (copy from dataSource.name column)
- What Event type does (optional narrowing)
- How the GB estimate is calculated
- Warning that it is read-only — no filters are applied automatically

Also updates Source name placeholder to show a concrete example.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 11:56:52 -04:00
Mick 735e364b71 Fix Ingest Dashboard timeout causing failed to fetch
- daily-volume: run per-day PowerQueries in parallel with asyncio.gather
  instead of sequentially with sleeps — 3 days now completes in ~16s vs 140s+
- Default view changed from 7d to 3d; day buttons updated to [3, 5, 7]
- igLoad: fire daily-volume and top-sources simultaneously with Promise.allSettled
  so both panels load in parallel rather than one after the other
- Each panel shows "Querying data lake…" spinner while loading
- Each panel renders independently — one failure doesn't block the other

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 11:53:37 -04:00
Mick 2e55e21a77 Add Settings page with .env manager
- Sidebar: ⚙ Settings link pinned to bottom of nav
- Settings page: view all config keys (secrets masked), edit and save directly to .env
- Show/hide toggle for secret fields (tokens, keys)
- First-time setup banner with cp .env.example .env instructions when .env is missing
- Manual setup section with step-by-step terminal commands and where to find each credential
- New .env.example template with comments for all required variables
- Backend: GET/POST /api/settings/config router reads/writes mounted .env file
- docker-compose: mounts .env into backend container at /app/.env for write access

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 11:43:41 -04:00
Mick c182d837ee Initial commit: SIEM Toolkit for SentinelOne
Dockerized SecOps toolkit with:
- Coverage Map: STAR rule vs SDL parser field coverage analysis
- Ingest Dashboard: PowerQuery-powered event volume and source breakdown
- Onboarding Assistant: AI-guided log source onboarding with Claude
- Parser management via SDL MCP integration

Stack: FastAPI + PostgreSQL backend, nginx-served HTML frontend, Docker Compose.
PowerQuery runs via Scalyr XDR API (SDL_XDR_URL + SDL_LOG_READ_KEY).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 11:39:26 -04:00
18 changed files with 3921 additions and 674 deletions
-5
View File
@@ -22,8 +22,3 @@ SDL_LOG_READ_KEY=
# Anthropic (for Onboarding Accelerator AI features)
# ─ https://console.anthropic.com/settings/api-keys
ANTHROPIC_API_KEY=
# SDL Configuration Read key — used by /api/quality/sync-from-sdl to
# download parser files from /logParsers/ on the SDL tenant.
# Generate in S1 console: Settings -> Integrations -> Data Lake API Keys (Configuration Read scope).
SDL_CONFIG_READ_KEY=
+153 -125
View File
@@ -2,7 +2,7 @@
> *Inspired by Pineapple Boy!* 🍍
A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps engineers. Runs as a Docker Compose stack against your SentinelOne demo or production tenant and provides real-time insight into parser coverage, ingest volume, and data quality — all without leaving a single interface.
A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps engineers. Runs as a Docker Compose stack against your SentinelOne demo or production tenant and provides real-time insight into parser coverage, detection library mapping, ingest volume, and data quality — all without leaving a single interface.
---
@@ -10,10 +10,11 @@ A self-hosted troubleshooting and visibility tool for SentinelOne AI-SIEM SecOps
| Page | Purpose |
|---|---|
| **Overview** | Live health stats — coverage percentage, active sources, top uncovered sources by volume |
| **Parser Coverage Map** | Which active data sources have a parser? Which don't? |
| **Overview** | Live health stats — coverage %, active sources, top uncovered sources by volume |
| **Parser Coverage Map** | Which active data sources have a parser? Detection rule mapping per source. Unlabelled event detection. |
| **Ingest Dashboard** | Event volume, top sources, cost projection, filter simulator |
| **Parser Quality** | Live event sampler, field population rate, parser test runner |
| **Parser Quality** | Live event sampler, field population rate, parser test runner, attributes missing audit |
| **Threat Coverage** | MITRE ATT&CK heatmap across all detection library rules, rule firing status (active vs never-fired) |
| **Onboarding Accelerator** | Prompt template for onboarding new log sources with Claude Code |
| **Settings** | Manage your `.env` credentials directly from the interface |
@@ -27,13 +28,15 @@ browser → nginx (port 3001) → single-page HTML/JS application
FastAPI backend (port 8001)
┌───────────────────────────┐
│ PostgreSQL (SQLAlchemy) │ parser fields, active sources
│ PostgreSQL (SQLAlchemy) │ rules, parser fields, active sources,
│ │ firing cache, coverage snapshots
└───────────────────────────┘
┌───────────────────────────┐
│ SentinelOne APIs │
│ • Management API
│ • XDR PowerQuery │
│ • Management API v2.1 STAR rules, detection library, platform rules
│ • Scalyr XDR PowerQuery │ live event queries, source volumes
│ • SDL Config File API │ parser file sync (/logParsers/)
└───────────────────────────┘
```
@@ -46,48 +49,57 @@ All services run via Docker Compose. The `parsers/` directory is volume-mounted
### 1. Clone and Configure
```bash
git clone
cd SIEM-Toolkit-patched
git clone https://github.com/mickbrowns1/SIEM-Toolkit.git
cd SIEM-Toolkit
cp .env.example .env
```
Edit `.env` with your credentials:
```env
S1_BASE_URL= # Your console URL
S1_API_TOKEN=... # Service user API token (account scope or higher)
SDL_XDR_URL= # XDR endpoint
SDL_LOG_READ_KEY= # Data Lake read key
S1_BASE_URL=https://demo.sentinelone.net # Your console URL
S1_API_TOKEN=eyJ... # Service user API token (account or site scope)
SDL_XDR_URL=https://xdr.us1.sentinelone.net # Scalyr XDR endpoint
SDL_LOG_READ_KEY=1j2IU0S... # Data Lake read key (query events)
SDL_CONFIG_READ_KEY=... # Data Lake config key (sync parser files)
SDL_PQ_TIMEOUT=600 # PowerQuery timeout in seconds (default: 600)
SDL_PQ_TIMEOUT_RETRIES=1 # Retries on timeout (default: 1)
ANTHROPIC_API_KEY= # Optional — not currently used
# SDL Configuration Read key — used by /api/quality/sync-from-sdl to
# download parser files from /logParsers/ on the SDL tenant.
# Generate in S1 console: Settings -> Integrations -> Data Lake API Keys (Configuration Read scope).
SDL_CONFIG_READ_KEY=
```
**S1_API_TOKEN** — generate at *Settings → Users → Service Users* in the console.
Ideally, the service user API token must be at **account scope** or higher. Site-scoped tokens will have limited visibility into rules and may see reduced source counts.
**S1_API_TOKEN** — generate at *Settings → Users → Service Users*. Account scope gives broadest access; site scope works for most features with some limitations.
**SDL_LOG_READ_KEY**
**SDL_LOG_READ_KEY** — found at *Settings → Integrations → Data Lake API Keys → Log Read*.
**SDL_CONFIG_READ_KEY** — found at *Settings → Integrations → Data Lake API Keys → Configuration Read*. Required to sync parser files directly from SDL via the Coverage Map. Without it, you can still load parser files manually from the `parsers/` directory.
### 2. Add Parser Files
Place your SDL parser JSON files into the `parsers/` directory. The backend reads them directly at query time — no rebuild is necessary.
### 2. Start the Stack
```bash
cp ~/my-parsers/*.json parsers/
```
### 3. Start the Stack
```bash
docker-compose up -d --build
docker compose up -d --build
```
Open **http://localhost:3001** in your browser and you're off.
### 3. First Run — Sync Everything
Click **Sync All** on the Parser Coverage Map. This runs three steps in sequence:
1. **Sync SDL Parsers** — downloads all `/logParsers/` parser files from your SDL tenant into the `parsers/` volume (requires `SDL_CONFIG_READ_KEY`)
2. **Sync Detection Library** — imports all platform detection rules from the S1 API, including MITRE ATT&CK tactic/technique mappings and per-rule alert counts
3. **Sync Live Sources** — queries the data lake for every `dataSource.name` active in the last 7 days
### 4. Detection Library (alternative: local file)
If the live API import fails (e.g. token scope is too narrow), the toolkit falls back to a local `detections.json` generated from the [detection-validator](https://github.com/mickbrowns1/detection-validator) repository:
```bash
mkdir -p data
cp /path/to/detection-validator/data/detections/extracted.json data/detections.json
```
The `data/` directory is gitignored and never committed.
---
## Features
@@ -106,29 +118,44 @@ If any sources are uncovered, the **Top Sources Needing a Parser** table lists t
### Parser Coverage Map
Answers the question: *does each active data source have a parser running?*
Answers the question: *does each active data source have a parser running, and is it covered by detection rules?*
**How it works:**
#### Syncing
1. **Sync Live Sources**executes a PowerQuery against your data lake to retrieve every `dataSource.name` seen in the last 7 days, along with event counts.
2. **Load SDL Parsers**reads parser files from `parsers/`, extracts the `dataSource.name` attribute from each, and stores the field list in the database.
- **Sync All** — runs all three sync operations in sequence (SDL parsers → detection library → live sources) with one click
- **Sync SDL Parsers** — downloads parser files from `/logParsers/` on your SDL tenant via the Config File API
- **Sync Detection Library** — imports platform rules from the S1 API with MITRE mappings and alert counts
- **Sync Live Sources** — queries the data lake for active `dataSource.name` values and event counts
#### Matching Logic (three-tier)
**Matching logic (three-tier):**
1. Exact `dataSource.name` match between the active source and the parser attribute
2. Normalised substring match (ignores spaces, dashes, and case) between the active source name and the parser's `dataSource.name`
3. Normalised substring match against the parser filename — catches files where the `dataSource.name` attribute is incorrect or missing
2. Normalised substring match (ignores spaces, dashes, case) between active source name and parser `dataSource.name`
3. Normalised substring match against the parser filename
**Parser detection from data:** During sync, a parallel PowerQuery checks whether each source has events with `event.type` populated in the data lake. If so, a parser is confirmed as running — the source is marked **Covered** even without a local parser file. This handles built-in and cloud-managed parsers that are not present in your `parsers/` folder.
#### Parser Detection from Data
**Status values:**
- 🟢 **Covered** — custom parser confirmed (local file or detected via parsed events in the data lake)
- 🔴 **Parser Needed** — no parser found, or only a grok/dottedJson format (which typically indicates an incomplete parser)
During sync, a parallel PowerQuery checks whether each source has events with `event.type` populated in the data lake. If so, a parser is confirmed running — the source is marked **Covered** even without a local parser file. This handles built-in and cloud-managed parsers not present in `parsers/`.
**Filters:** Use the filter pills to focus on Custom Parser only, Default Parser Only (data lake detected), or No Parser.
#### Status Values
**Deep link:** Click any source name in the table to open it directly in Parser Quality with all dropdowns pre-populated.
- 🟢 **Covered** — parser confirmed (local file or detected via parsed fields in the data lake)
- 🟡 **Incomplete Parser** — parser file exists but is missing `dataSource.name` attribute
- 🔴 **Parser Needed** — no parser found, or only a grok/dottedJson format
**Expected results:** After syncing sources and loading parsers, sources with active SDL parsers will appear as Covered. Sources sending raw, unparsed data — where only `message` and `timestamp` appear in the data lake — will appear as Parser Needed.
#### Filter Pills
- **All** — show every source
- **Complete Parser** — sources with a working custom or detected parser
- **Attributes Missing** — sources whose parser file lacks `dataSource.name`
#### Detections Column
Each source row shows how many detection library rules target it, with close-match suggestions when the `dataSource.name` doesn't align exactly with the library's naming. Once the **Rule Firing Status** cache is populated (via Threat Coverage page), each rule badge also shows its alert count — rules that have never fired are highlighted in amber (⚠).
#### Unlabelled Events Banner
A banner at the bottom of the coverage map lets you sample events that arrived with no `dataSource.name` — these are events whose parser is missing the `dataSource.name` attribute. Click **Sample Events** to run the query; the time window matches the Sync Live Sources period.
---
@@ -136,79 +163,88 @@ Answers the question: *does each active data source have a parser running?*
Answers the question: *where is my event volume coming from, and what would happen if I filtered some of it?*
**Time range:** 1h (default), 3d, 5d, 7d
**Time range:** 1h, 3d, 5d, 7d
**Daily Event Volume** — bar chart of total events per day. In 1h mode, this switches to a by-source breakdown of the current hour's activity.
**Daily Event Volume** — bar chart of total events per day.
**Top Sources** a table of the 25 highest-volume `dataSource.name` values with event count and estimated GB (calculated at 0.5 GB per million events).
**Top Sources** — the 25 highest-volume `dataSource.name` values with event count and estimated GB (at 0.5 GB per million events).
**Filter Simulator** — enter a source name and an optional event type, then press Simulate. The backend runs a live PowerQuery counting matching events and projects:
- Matched events in the selected period
- Estimated GB that would be saved
- Projected monthly events and GB if the filter were applied permanently
This is entirely read-only — no filter is created or applied. Use the results to inform an exclusion rule you apply manually in the console.
**Expected results:** Top sources should reflect what you see in the SentinelOne console PowerQuery tool. The filter simulator provides a reasonable GB estimate assuming uniform event size across the source.
**Filter Simulator** — enter a source name and an optional event type, then press Simulate. The backend runs a live PowerQuery counting matching events and projects matched events, estimated GB saved, and projected monthly figures. Entirely read-only — no filter is created or applied.
---
### Parser Quality
Three tools for diagnosing parser extraction failures.
Four tools for diagnosing and auditing parser health.
#### Live Event Sampler
Pulls raw events from a selected source directly from the data lake and renders every field that came back. The `message` column is pinned to the right of the table, with a **⎘ copy** button on each row for convenient extraction of raw log lines.
Pulls raw events from a selected source directly from the data lake and renders every field that came back. Empty fields display as `∅` in grey — immediately highlighting fields the parser is failing to populate. The `message` column is pinned to the right with a **⎘ copy** button on each row.
- **Empty fields** are displayed as `∅` in grey — immediately highlighting fields the parser is failing to populate
- **Healthy source:** many fields populated (`src.ip`, `user.name`, `event.type`, etc.), with `message` present as the raw log backup
- **Unhealthy source:** only `timestamp` and `message` populated — the parser is not extracting anything of value
#### Unlabelled Event Sampler
Samples events that have *no* `dataSource.name` — events the SDL received but couldn't attribute to any parser. Uses the filter expression `!(dataSource.name = *) !(source = 'scalyr')` to eliminate internal SDL noise. Returns a sample plus a count of how many such events exist in the time window.
#### Field Population Rate
Samples up to 500 events from a source and measures what percentage of them have each field populated. Results are sorted worst-first so the most pressing gaps are immediately visible.
Samples up to 500 events from a source and measures what percentage have each field populated, sorted worst-first.
When you select a source, the tool automatically discovers which fields exist in that source's events and pre-fills the field list — merged with SDL schema defaults. The list is fully editable before running the analysis.
**Colour coding:**
- 🟢 ≥ 80% — healthy extraction
- 🟡 4079% — partial extraction; check your regex patterns
- 🔴 < 40% — field is rarely populated; the parser is likely not matching this log format variant
**Healthy parser:** Key fields such as `src.ip`, `event.type`, and `user.name` should sit between 70100%. Niche fields like `src.process.cmdline` or `tgt.file.path` will naturally be lower, as not every event type produces them.
**Broken parser:** All SDL fields at 0%, with only `timestamp` and `message` visible in the "fields seen in sample" chip list at the bottom of the results.
- 🟡 4079% — partial; check regex patterns
- 🔴 < 40% — rarely populated; parser likely not matching this log format variant
#### Parser Test Runner
Paste a raw log line, select a loaded parser, and press Test. The backend extracts SDL `$field=pattern$` format strings from the parser file, converts them to Python named-group regular expressions, and tries each against your log line.
Paste a raw log line, select a loaded parser, and press Test. Supports:
- **Regex parsers** — extracts SDL `$field=pattern$` format strings and matches against your log line
- **JSON parsers** — parses JSON input directly, flattens to dotted keys, and applies any `input/output/match/replace` rewrite rules
- **NDJSON** — multiple JSON objects separated by newlines
- **Matched:** displays the format string that matched and every field extracted with its value
- **No match:** none of the parser's format strings apply to this log line — the log may contain a format variant the parser does not yet cover
#### Attributes Missing
> **Note:** Only parsers using SDL custom format strings are supported by the test runner. Grok and dottedJson parsers are not currently testable here.
A sub-section listing all parser files in the `parsers/` directory that have a `formats:` section but no `dataSource.name` attribute. These parsers are loaded into SDL but won't attach a source label to events they process — surfaced here regardless of whether they have active traffic.
---
### Threat Coverage
Two views for understanding detection effectiveness across your estate.
#### MITRE ATT&CK Heatmap
Shows which MITRE ATT&CK tactics and techniques are covered by your detection library. Rules are imported from the S1 platform-rules API, which returns structured MITRE metadata per rule.
- **Tactic cards** — ordered by ATT&CK kill chain (Reconnaissance → Impact), colour-coded by rule count
- **Technique chips** — each technique ID and name within a tactic; expands to show all if > 12
- **Stats** — Total Library Rules, Rules with MITRE Mapping, Tactics Covered, Techniques Covered
Click **Sync Detection Library** to re-import rules and refresh MITRE data.
#### Rule Firing Status
Shows which detection rules have actually triggered alerts — and which have never fired.
Click **Sync Alert Firing Status**. The backend reads `generatedAlerts` directly from the platform-rules API data stored during the last Detection Library sync — no SDL PowerQuery needed. Results are cached in the database.
- **Active** (green) — rule has fired at least once in the monitored period
- **Silent** (amber) — rule has never fired; may be misconfigured or require a data source not yet active
The Coverage Map Detections column also reflects this data — fired rule counts appear inline on each source row.
---
### Onboarding Accelerator
A prompt template for using Claude Code to onboard a new log source. Copy the template, paste a sample of raw log lines, and Claude Code will generate:
- An SDL parser skeleton in augmented-JSON format
- Field mappings to the SDL common schema
- Parser test assertions
No Anthropic API key is required — this uses Claude Code directly from your terminal.
A prompt template for using Claude Code to onboard a new log source. Copy the template, paste sample raw log lines, and Claude Code will generate an SDL parser skeleton with field mappings and test assertions. No Anthropic API key required.
---
### Settings
Read and write your `.env` credentials from the interface. Secret fields (API tokens, keys) are masked by default with a show/hide toggle. Changes are written to the mounted `.env` file and take effect after restarting the backend:
Read and write your `.env` credentials from the interface. Secret fields are masked by default with a show/hide toggle. Changes are written to the mounted `.env` file and take effect after restarting the backend:
```bash
docker-compose up -d --build backend
docker compose up -d --build backend
```
---
@@ -217,15 +253,15 @@ docker-compose up -d --build backend
```bash
# Full rebuild
docker-compose up -d --build
docker compose up -d --build
# Backend only (after Python changes)
docker-compose up -d --build backend
docker compose build backend && docker compose up -d backend
# Frontend only (after HTML/JS changes)
docker-compose up -d --build frontend
docker compose build frontend && docker compose up -d frontend
# Reset the database
# Reset the database (clears all synced data)
curl -X DELETE http://localhost:8001/api/coverage/reset
```
@@ -236,21 +272,23 @@ curl -X DELETE http://localhost:8001/api/coverage/reset
```
.
├── backend/
│ ├── main.py # FastAPI application, router registration
│ ├── db.py # SQLAlchemy models
│ ├── main.py # FastAPI app, router registration, startup migrations
│ ├── db.py # SQLAlchemy models (ParsedRule, ActiveSource,
│ │ # ParserField, RuleFiringCache, IngestSnapshot)
│ ├── routers/
│ │ ├── coverage.py # Parser coverage map endpoints
│ │ ├── ingest.py # Ingest dashboard + filter simulator
│ │ ├── quality.py # Parser quality tools
│ │ ├── coverage.py # Coverage map, MITRE heatmap, firing status, SDL sync
│ │ ├── ingest.py # Ingest dashboard, filter simulator
│ │ ├── quality.py # Parser quality tools, unlabelled event sampler
│ │ └── settings.py # .env read/write
│ └── services/
│ ├── s1_client.py # SentinelOne + Scalyr API client
│ ├── s1_client.py # SentinelOne Management API + Scalyr PowerQuery client
│ └── rule_parser.py # SDL format string field extraction
├── frontend/
│ └── index.html # Single-page application (Tailwind, vanilla JS)
├── parsers/ # SDL parser files (volume-mounted)
├── parsers/ # SDL parser files (volume-mounted, gitignored)
├── data/ # detections.json fallback (gitignored)
├── db/
│ └── init.sql # Postgres initialisation (tables created by SQLAlchemy)
│ └── init.sql # Postgres initialisation
├── docker-compose.yml
├── .env.example
└── README.md
@@ -258,35 +296,25 @@ curl -X DELETE http://localhost:8001/api/coverage/reset
---
```
Nothing pushes parsers to the SDL tenant
The data flow is strictly one-way: SDL tenant → local disk.
## Environment Variables Reference
What actually happens
┌──────────────────┐ GET /api/listFiles/logParsers/ ┌──────────────────┐
│ SDL tenant │ ───────────────────────────────────▶ │ tools/sync_sdl_ │
│ │ GET /api/getFile/logParsers/... │ parsers.py │
└──────────────────┘ └────────┬─────────┘
│ writes
./parsers/<name>
│ bind-mount
/app/parsers (in container)
│ read-only
┌──────────────────────────────────┐
│ POST /api/quality/test-parser │
│ POST /api/quality/sync-from-sdl │
│ GET /api/quality/parsers │
└──────────────────────────────────┘
| Variable | Required | Description |
|---|---|---|
| `S1_BASE_URL` | ✅ | SentinelOne console URL (e.g. `https://demo.sentinelone.net`) |
| `S1_API_TOKEN` | ✅ | Service user API token — account scope recommended |
| `SDL_XDR_URL` | ✅ | Scalyr XDR endpoint (e.g. `https://xdr.us1.sentinelone.net`) |
| `SDL_LOG_READ_KEY` | ✅ | Data Lake log read key — for PowerQuery event queries |
| `SDL_CONFIG_READ_KEY` | ⚪ | Data Lake config read key — for SDL parser file sync |
| `SDL_PQ_TIMEOUT` | ⚪ | PowerQuery read timeout in seconds (default: `600`) |
| `SDL_PQ_TIMEOUT_RETRIES` | ⚪ | Extra retries on timeout (default: `1`) |
| `ANTHROPIC_API_KEY` | ⚪ | Not currently used |
Endpoint / What it really does
Sync from SDL (POST /api/quality/sync-from-sdl) Downloads parsers from the tenant into /app/parsers/
Load SDL Parsers (UI button) Just re-indexes whatever files already exist in /app/parsers/
Test Parser (POST /api/quality/test-parser) Runs the parser logic locally in Python; tenant never touched
tools/sync_sdl_parsers.py (helper) Downloads parsers; never uploads
```
---
## Notes
- Parser files in `parsers/` are read at query time — add or update files without rebuilding.
- The filter simulator is entirely read-only and makes no changes to your tenant.
- `SDL_CONFIG_READ_KEY` requires the *Manage config files* permission in the console. Without it, Sync SDL Parsers is skipped but all other features remain available.
- Site-scoped tokens work for most features. Account-scoped tokens are needed for the detection library API and provide broader source visibility.
- The `parsers/` directory is gitignored except for specific tracked parser files. SDL dashboard and saved-search files downloaded during sync are intentionally not committed.
+28 -1
View File
@@ -1,5 +1,5 @@
import os
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Boolean
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
@@ -37,6 +37,7 @@ class ActiveSource(Base):
event_count = Column(Integer, default=0)
synced_at = Column(DateTime, default=datetime.utcnow)
parser_detected = Column(Integer, default=0) # >0 means parsed events seen in data lake
unlabelled = Column(Boolean, default=False) # True = events had no dataSource.name
class IngestSnapshot(Base):
@@ -47,6 +48,32 @@ class IngestSnapshot(Base):
recorded_at = Column(DateTime, default=datetime.utcnow)
class RuleFiringCache(Base):
__tablename__ = "rule_firing_cache"
id = Column(Integer, primary_key=True)
rule_name = Column(String, unique=True, index=True)
alert_count = Column(Integer, default=0)
period_days = Column(Integer, default=30)
checked_at = Column(DateTime, default=datetime.utcnow)
class CoverageSnapshot(Base):
__tablename__ = "coverage_snapshots"
id = Column(Integer, primary_key=True)
recorded_at = Column(DateTime, default=datetime.utcnow, index=True)
health_score = Column(Float, default=0.0)
parser_pct = Column(Float, default=0.0) # % sources with working parser
mitre_pct = Column(Float, default=0.0) # % ATT&CK tactics covered
firing_pct = Column(Float, default=0.0) # % rules that have fired
active_sources = Column(Integer, default=0)
covered_sources = Column(Integer, default=0)
rules_loaded = Column(Integer, default=0)
tactics_covered = Column(Integer, default=0)
techniques_covered = Column(Integer, default=0)
rules_with_mitre = Column(Integer, default=0)
rules_fired = Column(Integer, default=0)
def get_db():
db = SessionLocal()
try:
+40 -2
View File
@@ -1,7 +1,7 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from db import engine, Base, get_db, ParsedRule
from routers import coverage, ingest, settings, quality
from db import engine, Base, get_db, ParsedRule, RuleFiringCache, CoverageSnapshot
from routers import coverage, ingest, settings, quality, query
Base.metadata.create_all(bind=engine)
@@ -11,11 +11,48 @@ with engine.connect() as _conn:
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS parser_detected INTEGER DEFAULT 0"
))
_conn.execute(text(
"ALTER TABLE active_sources ADD COLUMN IF NOT EXISTS unlabelled BOOLEAN DEFAULT FALSE"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS rule_firing_cache ("
"id SERIAL PRIMARY KEY, "
"rule_name VARCHAR UNIQUE, "
"alert_count INTEGER DEFAULT 0, "
"period_days INTEGER DEFAULT 30, "
"checked_at TIMESTAMP"
")"
))
_conn.execute(text(
"CREATE TABLE IF NOT EXISTS coverage_snapshots ("
"id SERIAL PRIMARY KEY, "
"recorded_at TIMESTAMP, "
"health_score FLOAT DEFAULT 0, "
"parser_pct FLOAT DEFAULT 0, "
"mitre_pct FLOAT DEFAULT 0, "
"firing_pct FLOAT DEFAULT 0, "
"active_sources INTEGER DEFAULT 0, "
"covered_sources INTEGER DEFAULT 0, "
"rules_loaded INTEGER DEFAULT 0, "
"tactics_covered INTEGER DEFAULT 0, "
"techniques_covered INTEGER DEFAULT 0, "
"rules_with_mitre INTEGER DEFAULT 0, "
"rules_fired INTEGER DEFAULT 0"
")"
))
_conn.commit()
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
@app.on_event("startup")
async def start_ingest_prewarmer():
"""Start optional background pre-warmer for the Ingest Dashboard cache.
Opt-in via INGEST_PREWARM=1. See backend/services/prewarmer.py."""
from services import prewarmer
prewarmer.start_if_enabled()
@app.on_event("startup")
async def auto_load_detections():
"""
@@ -61,6 +98,7 @@ app.include_router(coverage.router, prefix="/api/coverage", tags=["Coverage"])
app.include_router(ingest.router, prefix="/api/ingest", tags=["Ingest"])
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
app.include_router(quality.router, prefix="/api/quality", tags=["Quality"])
app.include_router(query.router, prefix="/api/query", tags=["Query"])
@app.get("/health")
+787 -20
View File
@@ -4,7 +4,7 @@ from fastapi import APIRouter, UploadFile, File, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from datetime import datetime
from db import get_db, ParsedRule, ParserField, ActiveSource
from db import get_db, ParsedRule, ParserField, ActiveSource, RuleFiringCache, CoverageSnapshot
from services import s1_client, rule_parser
DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
@@ -12,6 +12,75 @@ DETECTIONS_FILE = os.environ.get("DETECTIONS_FILE", "/app/data/detections.json")
router = APIRouter()
def _extract_mitre(rule: dict) -> tuple[list[str], list[dict]]:
"""Extract (tactics, techniques) from a raw S1 rule dict.
Primary format (platform-rules API):
rule["mitre"] = [
{"tactic": "Execution", "techniques": [{"id": "T1204", "title": "User Execution"}]},
...
]
Falls back to flat field names used by older API versions / STAR rules.
"""
tactics: list[str] = []
techniques: list[dict] = []
# ── Primary: structured mitre array (platform-rules API) ──────────────────
mitre_list = rule.get("mitre")
if isinstance(mitre_list, list):
for item in mitre_list:
if not isinstance(item, dict):
continue
tac = item.get("tactic")
if isinstance(tac, str) and tac.strip():
tactics.append(tac.strip())
for tech in item.get("techniques", []):
if isinstance(tech, dict):
tid = str(tech.get("id", "") or "").strip()
tname = str(tech.get("title") or tech.get("name") or tid).strip()
if tid or tname:
techniques.append({"id": tid, "name": tname})
# ── Fallback: flat field names (STAR rules / older API versions) ──────────
if not tactics:
for key in ("tactic", "tactics", "mitreTactic", "mitreTactics"):
val = rule.get(key)
if isinstance(val, str) and val:
tactics.extend(v.strip() for v in val.split(",") if v.strip())
elif isinstance(val, list):
for v in val:
if isinstance(v, str) and v:
tactics.append(v.strip())
elif isinstance(v, dict):
n = v.get("name") or v.get("tactic") or ""
if n:
tactics.append(n.strip())
if not techniques:
for key in ("technique", "techniques", "mitreTechnique", "mitreTechniques", "mitreAttack"):
val = rule.get(key)
if isinstance(val, list):
for v in val:
if isinstance(v, str) and v.strip():
techniques.append({"id": v.strip(), "name": v.strip()})
elif isinstance(v, dict):
tid = str(v.get("id") or v.get("techniqueId") or "").strip()
tname = str(v.get("name") or v.get("title") or v.get("technique") or tid).strip()
if tid or tname:
techniques.append({"id": tid, "name": tname})
# Deduplicate
seen_ids: set = set()
unique_techniques = []
for t in techniques:
key_t = t["id"] or t["name"]
if key_t not in seen_ids:
seen_ids.add(key_t)
unique_techniques.append(t)
return list(dict.fromkeys(tactics)), unique_techniques
def _star_query_texts(rule: dict) -> list[str]:
"""
Extract all PowerQuery/filter strings from a STAR rule.
@@ -94,12 +163,20 @@ def _import_from_api_rules(db, rules: list) -> int:
seen_ids.add(rule_id)
sources = rule.get("sources") or []
tactics, techniques = _extract_mitre(rule)
# generatedAlerts is returned directly by the platform-rules API
generated_alerts = rule.get("generatedAlerts")
db.add(ParsedRule(
rule_id=rule_id,
name=rule.get("name", "unnamed"),
rule_type="library",
fields_used=[], # API rules don't expose field-level info
raw=json.dumps({"data_sources": sources}),
raw=json.dumps({
"data_sources": sources,
"tactics": tactics,
"techniques": techniques,
"generated_alerts": generated_alerts,
}),
))
loaded += 1
if loaded % 500 == 0:
@@ -142,12 +219,17 @@ def _import_detections(db, detections_file: str) -> int:
continue
seen_ids.add(rule_id)
tactics, techniques = _extract_mitre(rule)
db.add(ParsedRule(
rule_id=rule_id,
name=rule.get("name", "unnamed"),
rule_type="library",
fields_used=list(all_fields),
raw=json.dumps({"data_sources": list(set(data_sources))}),
raw=json.dumps({
"data_sources": list(set(data_sources)),
"tactics": tactics,
"techniques": techniques,
}),
))
loaded += 1
if loaded % 500 == 0:
@@ -207,16 +289,109 @@ async def upload_sigma(files: list[UploadFile] = File(...), db: Session = Depend
return {"loaded": len(loaded), "rules": loaded}
def _fetch_parsers_from_console(parsers_dir: str) -> dict:
"""
Fetch every parser under /logParsers/ from the SDL console and write them
to parsers_dir. Uses SDL_CONFIG_READ_KEY (needs 'Manage config files' permission)
and SDL_XDR_URL from the environment.
Returns {"fetched": N, "failed": [...], "skipped": reason_or_None}
"""
import urllib.request, urllib.error, json as _json, os as _os
# Read live from .env file so Settings-page saves are picked up without restart
def _env_val(key: str) -> str:
val = _os.environ.get(key, "")
if not val:
env_path = _os.environ.get("ENV_FILE_PATH", "/app/.env")
try:
for line in open(env_path).read().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
if k.strip() == key:
val = v.strip()
break
except Exception:
pass
return val
config_key = _env_val("SDL_CONFIG_READ_KEY")
base_url = _env_val("SDL_XDR_URL").rstrip("/")
if not config_key:
return {"fetched": 0, "failed": [], "skipped": "SDL_CONFIG_READ_KEY not set"}
if not base_url:
return {"fetched": 0, "failed": [], "skipped": "SDL_XDR_URL not set"}
def _post(path: str, params: dict) -> dict:
url = f"{base_url}{path}"
body = _json.dumps({**params, "token": config_key}).encode()
req = urllib.request.Request(url, data=body, headers={
"Authorization": f"Bearer {config_key}",
"Content-Type": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return _json.loads(r.read())
except urllib.error.HTTPError as e:
err_body = e.read().decode(errors="replace")[:300]
raise RuntimeError(f"HTTP {e.code} {path}: {err_body}")
# List all parser paths
res = _post("/api/listFiles", {"pathPrefix": "/logParsers/"})
# Support multiple response shapes: {"paths": [...]} or {"files": [...]}
raw_paths = res.get("paths") or res.get("files") or []
# Each element may be a plain string or a dict with a "path"/"name" key
paths = []
for p in raw_paths:
if isinstance(p, dict):
p = p.get("path") or p.get("name") or ""
if isinstance(p, str) and p.startswith("/logParsers/"):
paths.append(p)
_os.makedirs(parsers_dir, exist_ok=True)
fetched, failed = 0, []
for p in paths:
name = p.rsplit("/", 1)[-1] or "_unnamed"
try:
r = _post("/api/getFile", {"path": p})
content = r.get("content")
if content is None:
failed.append({"path": p, "error": "no content", "raw": r})
continue
with open(_os.path.join(parsers_dir, name), "w", encoding="utf-8") as fh:
fh.write(content)
fetched += 1
except Exception as e:
failed.append({"path": p, "error": str(e)})
# Surface the raw API response so callers can see exactly what was returned.
# Truncate paths list so the response stays readable (first 200).
debug_info = {
"response_keys": list(res.keys()),
"paths_found": len(paths),
"paths_listed": paths[:200],
}
return {"fetched": fetched, "failed": failed, "skipped": None, "debug": debug_info}
@router.post("/load-parsers-from-sdl")
async def load_parsers_from_sdl(db: Session = Depends(get_db)):
"""
Load SDL parsers from the local /app/parsers directory (mounted from ./parsers/).
Files are placed there by the MCP-based loader or by manual copy.
Falls back to a clear error if the directory is empty.
Sync SDL parsers from the console (if SDL_CONFIG_READ_KEY is set) then index
every file in the local /app/parsers directory into the DB.
"""
import os
parsers_dir = "/app/parsers"
# ── Step 1: fetch from console (best-effort) ────────────────────────────
fetch_result = _fetch_parsers_from_console(parsers_dir)
# ── Step 2: load whatever is on disk into the DB ─────────────────────────
try:
entries = [
e for e in os.scandir(parsers_dir)
@@ -225,12 +400,19 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
except FileNotFoundError:
raise HTTPException(503, "parsers/ directory not found — check Docker volume mount")
if not entries and fetch_result["skipped"]:
raise HTTPException(
422,
f"No parser files found in parsers/ directory and console sync was skipped "
f"({fetch_result['skipped']}). "
"Add SDL_CONFIG_READ_KEY in Settings (needs 'Manage config files' permission) "
"or upload a parser file manually."
)
if not entries:
raise HTTPException(
422,
"No parser files found in parsers/ directory. "
"Use 'Load SDL Parsers via MCP' in Claude Code to populate it, "
"or upload a parser file manually."
"No parser files found in parsers/ directory after console sync. "
"Check SDL_CONFIG_READ_KEY permissions ('Manage config files' required)."
)
loaded = []
@@ -258,7 +440,12 @@ async def load_parsers_from_sdl(db: Session = Depends(get_db)):
errors.append({"parser": entry.name, "error": str(e)})
db.commit()
return {"loaded": len(loaded), "parsers": loaded, "errors": errors}
return {
"loaded": len(loaded),
"parsers": loaded,
"errors": errors,
"console_fetch": fetch_result,
}
@router.post("/upload-parser")
@@ -329,6 +516,9 @@ _S1_NATIVE_SOURCES = {
"SentinelOne Ranger AD",
}
# Cached count of events with no dataSource.name — updated on each sync
_unlabelled_event_count: int = -1 # -1 = not yet queried
@router.post("/sync-sources")
async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
@@ -378,28 +568,55 @@ async def sync_sources(days: int = 7, db: Session = Depends(get_db)):
parser_detected=parsed_by_source.get(name, 0),
))
seen += 1
db.commit()
return {"synced": seen, "sources": [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]}
synced_names = [r["dataSource.name"] for r in rows if r.get("dataSource.name") and r["dataSource.name"] not in _S1_NATIVE_SOURCES]
# Auto-record a coverage snapshot after every live-sources sync
try:
h = _compute_health(db)
db.add(CoverageSnapshot(
health_score=h["health_score"],
parser_pct=h["parser_pct"],
mitre_pct=h["mitre_pct"],
firing_pct=h["firing_pct"] or 0.0,
active_sources=h["active_sources"],
covered_sources=h["covered_sources"],
rules_loaded=h["rules_loaded"],
tactics_covered=h["tactics_covered"],
techniques_covered=h["techniques_covered"],
rules_with_mitre=h["rules_with_mitre"],
rules_fired=h["rules_fired"],
))
db.commit()
except Exception:
pass # snapshot failure should never break sync
return {"synced": seen, "sources": synced_names}
def _build_parser_ds_index() -> dict[str, dict]:
def _build_parser_ds_index() -> tuple[dict[str, dict], list[dict]]:
"""
Read all parser files from /app/parsers/ and build an index:
dataSource.name (exact, from parser attributes){parser_name, format_type}
Read all parser files from /app/parsers/ and build:
- index: dataSource.name → {parser_name, format_type} (complete parsers)
- stubs: list of {parser_name} for files with no dataSource.name attribute
Format type is "grok", "dottedJson", or "custom".
Sources with grok/dottedJson parsers are flagged as needing a proper parser.
"""
import os, re
parsers_dir = "/app/parsers"
_DS_NAME_RE = re.compile(r'"dataSource\.name"\s*:\s*"([^"]+)"')
_DS_NAME_RE = re.compile(r'"?dataSource\.name"?\s*:\s*"([^"]+)"')
_FORMAT_TYPE_RE = re.compile(r'"type"\s*:\s*"([^"]+)"')
# Only treat a file as a parser if it has a formats section — rules out dashboards/saved-searches
_HAS_FORMATS_RE = re.compile(r'\bformats\s*:', re.IGNORECASE)
index: dict[str, dict] = {}
stubs: list[dict] = []
try:
entries = [e for e in os.scandir(parsers_dir) if e.is_file() and not e.name.startswith(".")]
except FileNotFoundError:
return index
return index, stubs
for entry in entries:
try:
@@ -408,9 +625,15 @@ def _build_parser_ds_index() -> dict[str, dict]:
except Exception:
continue
# Skip files that have no formats section — they're dashboards/queries, not parsers
if not _HAS_FORMATS_RE.search(content):
continue
# Extract dataSource.name (may appear multiple times — take first)
ds_match = _DS_NAME_RE.search(content)
if not ds_match:
# Has formats but no dataSource.name — genuine stub parser
stubs.append({"parser_name": entry.name})
continue
ds_name = ds_match.group(1).strip()
@@ -425,7 +648,7 @@ def _build_parser_ds_index() -> dict[str, dict]:
index[ds_name] = {"parser_name": entry.name, "format_type": fmt}
return index
return index, stubs
@router.get("/map")
@@ -441,17 +664,32 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_fields_rows = db.query(ParserField).all()
rules = db.query(ParsedRule).all()
firing_cache: dict[str, int] = {
row.rule_name: row.alert_count
for row in db.query(RuleFiringCache).all()
}
firing_cache_populated = len(firing_cache) > 0
# parser_name → set of field names (for field count display)
parser_index: dict[str, set] = {}
for pf in parser_fields_rows:
parser_index.setdefault(pf.parser_name, set()).add(pf.field_name)
# Build dataSource.name → {parser_name, format_type} index from parser files
ds_index = _build_parser_ds_index()
ds_index, stub_parsers = _build_parser_ds_index()
def _normalize(s: str) -> str:
return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace(".", "")
def _find_stub_match(source_name: str) -> dict | None:
"""Return stub parser info if a stub filename fuzzy-matches this source name."""
sn = _normalize(source_name)
for stub in stub_parsers:
fn = _normalize(stub["parser_name"])
if fn in sn or sn in fn:
return stub
return None
def _find_parser_info(source_name: str) -> dict | None:
"""
Match priority:
@@ -514,6 +752,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
parser_info = _find_parser_info(src.source_name)
parser_in_data = (src.parser_detected or 0) > 0
stub_info = _find_stub_match(src.source_name) if not parser_info else None
if parser_info and parser_info["format_type"] == "custom":
status = "covered"
matched_parser = parser_info["parser_name"]
@@ -528,6 +768,12 @@ def get_coverage_map(db: Session = Depends(get_db)):
status = "covered"
matched_parser = parser_info["parser_name"] if parser_info else "detected in data"
format_type = parser_info["format_type"] if parser_info else "unknown"
elif stub_info:
# A parser file exists but has no dataSource.name — it's a stub/incomplete
status = "stub_parser"
matched_parser = stub_info["parser_name"]
format_type = None
stub_info["suggested_ds_name"] = src.source_name
else:
status = "parser_needed"
matched_parser = None
@@ -536,9 +782,13 @@ def get_coverage_map(db: Session = Depends(get_db)):
if status == "covered":
covered_count += 1
else:
needed_count += 1
needed_count += 1 # stub_parser and parser_needed both count as needing work
rules_for_src: list = [r for r in rule_by_source.get(src.source_name, []) if r["type"] == "library"]
rules_for_src: list = [
{**r, "alert_count": firing_cache.get(r["rule"], 0)}
for r in rule_by_source.get(src.source_name, [])
if r["type"] == "library"
]
# Close-match suggestions — shown when there are no library rules for this source.
close_matches: list = []
@@ -614,6 +864,8 @@ def get_coverage_map(db: Session = Depends(get_db)):
"status": status,
"parser": matched_parser,
"format_type": format_type,
"unlabelled": bool(src.unlabelled),
"stub_suggested_ds_name": stub_info.get("suggested_ds_name") if stub_info and status == "stub_parser" else None,
"parser_fields": len(parser_provides),
"parser_detected": src.parser_detected or 0,
"rules": rules_for_src,
@@ -624,15 +876,23 @@ def get_coverage_map(db: Session = Depends(get_db)):
"synced_at": src.synced_at.isoformat() if src.synced_at else None,
})
# Only surface stub parsers that matched an active source with real events —
# unmatched stubs with zero events are noise and are suppressed.
synced_at = active_sources[0].synced_at.isoformat() if active_sources else None
stub_count = sum(1 for s in sources_out if s["status"] == "stub_parser")
return {
"summary": {
"active_sources": len(active_sources),
"covered": covered_count,
"parser_needed": needed_count,
"stub_parsers": stub_count,
"unlabelled_events": _unlabelled_event_count,
"parsers_loaded": len(parser_index),
"rules_loaded": len(rules),
"firing_cache_populated": firing_cache_populated,
},
"sources": sources_out,
"synced_at": synced_at,
@@ -640,9 +900,516 @@ def get_coverage_map(db: Session = Depends(get_db)):
}
@router.get("/stub-parsers")
def get_stub_parsers():
"""Return all parser files that have a formats: section but no dataSource.name attribute.
Used by Parser Quality — Attributes Missing section. Independent of active sources."""
_, stubs = _build_parser_ds_index()
return {"stubs": stubs, "count": len(stubs)}
@router.get("/mitre")
def get_mitre_coverage(db: Session = Depends(get_db)):
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
TACTIC_ORDER = [
"Reconnaissance", "Resource Development", "Initial Access", "Execution",
"Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access",
"Discovery", "Lateral Movement", "Collection", "Command and Control",
"Exfiltration", "Impact", "Uncategorized",
]
tactic_map: dict[str, dict] = {}
no_mitre_count = 0
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
tactics = raw_data.get("tactics", [])
techniques = raw_data.get("techniques", [])
if not tactics and not techniques:
no_mitre_count += 1
continue
if not tactics:
tactics = ["Uncategorized"]
for tactic in tactics:
if tactic not in tactic_map:
tactic_map[tactic] = {"techniques": {}, "rule_count": 0}
tactic_map[tactic]["rule_count"] += 1
for tech in techniques:
key_t = tech["id"] or tech["name"]
if key_t:
tactic_map[tactic]["techniques"][key_t] = tech["name"] or key_t
def _tactic_sort_key(name: str) -> int:
try:
return TACTIC_ORDER.index(name)
except ValueError:
return len(TACTIC_ORDER)
tactics_out = []
total_techniques = 0
for tactic_name in sorted(tactic_map.keys(), key=_tactic_sort_key):
tech_dict = tactic_map[tactic_name]["techniques"]
techniques_list = [
{"id": k if (k.startswith("T") and len(k) >= 4) else "", "name": v}
for k, v in sorted(tech_dict.items())
]
total_techniques += len(techniques_list)
tactics_out.append({
"tactic": tactic_name,
"rule_count": tactic_map[tactic_name]["rule_count"],
"technique_count": len(techniques_list),
"techniques": techniques_list,
})
return {
"tactics": tactics_out,
"total_rules": len(rules),
"rules_with_mitre": len(rules) - no_mitre_count,
"rules_without_mitre": no_mitre_count,
"total_techniques": total_techniques,
"tactic_count": len(tactics_out),
}
@router.post("/sync-rule-firing")
async def sync_rule_firing(period_days: int = 30, db: Session = Depends(get_db)):
"""Populate rule firing cache from the generatedAlerts field stored during
the last Detection Library sync (platform-rules API). This is instant and
requires no SDL PowerQuery. Falls back to SDL PowerQuery if the stored data
is missing (e.g. rules were imported from the detections.json file fallback)."""
from datetime import datetime
checked_at = datetime.utcnow()
result_rows = []
source = "api"
# ── Fast path: use generatedAlerts stored in ParsedRule.raw ───────────────
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
ga = raw_data.get("generated_alerts")
if ga is not None: # present means rule was imported from the live API
result_rows.append({"rule_name": rule.name, "alerts": int(ga)})
# ── Fallback: SDL PowerQuery (rules imported from detections.json) ─────────
if not result_rows:
source = "powerquery"
from datetime import timedelta
now = datetime.utcnow()
from_dt = (now - timedelta(days=period_days)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
to_dt = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
FIRING_QUERIES = [
("| filter ruleName != '' | group alerts=count() by ruleName | sort -alerts | limit 2000", "ruleName"),
("| filter threatInfo.detectionEngineRule.name != '' | group alerts=count() by threatInfo.detectionEngineRule.name | sort -alerts | limit 2000", "threatInfo.detectionEngineRule.name"),
]
for query, name_field in FIRING_QUERIES:
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt, max_count=10_000_000)
rows = result.get("events", []) if isinstance(result, dict) else []
if rows:
result_rows = [
{"rule_name": r.get(name_field, ""), "alerts": r.get("alerts", 0)}
for r in rows if r.get(name_field)
]
if result_rows:
break
except Exception:
continue
if not result_rows:
return {
"synced": 0,
"rules_with_alerts": 0,
"source": source,
"message": "No alert data found. Run Sync Detection Library first to import generatedAlerts from the S1 API.",
}
# Upsert into cache
db.query(RuleFiringCache).delete()
for row in result_rows:
db.add(RuleFiringCache(
rule_name=row["rule_name"],
alert_count=row["alerts"],
period_days=period_days,
checked_at=checked_at,
))
db.commit()
fired = sum(1 for r in result_rows if r["alerts"] > 0)
return {
"synced": len(result_rows),
"rules_with_alerts": fired,
"rules_never_fired": len(result_rows) - fired,
"source": source,
"period_days": period_days,
}
@router.get("/rule-firing-cache")
def get_rule_firing_cache(db: Session = Depends(get_db)):
"""Return all cached rule firing data sorted by alert count descending."""
rows = db.query(RuleFiringCache).order_by(RuleFiringCache.alert_count.desc()).all()
total_rules = db.query(ParsedRule).filter_by(rule_type="library").count()
fired = [r for r in rows if r.alert_count > 0]
never_fired_count = total_rules - len(fired)
period_days = rows[0].period_days if rows else 30
checked_at = rows[0].checked_at.isoformat() if rows and rows[0].checked_at else None
return {
"rules": [
{
"rule_name": r.rule_name,
"alert_count": r.alert_count,
"period_days": r.period_days,
"checked_at": r.checked_at.isoformat() if r.checked_at else None,
}
for r in rows
],
"summary": {
"rules_monitored": len(rows),
"fired_in_period": len(fired),
"never_fired": never_fired_count,
"period_days": period_days,
"checked_at": checked_at,
},
}
def _compute_health(db) -> dict:
"""Compute current health score from DB state.
Weights:
40% parser coverage — what % of active sources have a working parser
35% MITRE coverage — what % of the 14 standard ATT&CK tactics are covered
25% rule firing — what % of library rules have fired (0 if cache empty)
"""
# --- Parser coverage ---
all_sources = db.query(ActiveSource).all()
total_sources = len(all_sources)
# "covered" = parser_detected > 0 (parser running in data lake)
covered_sources = sum(1 for s in all_sources if (s.parser_detected or 0) > 0)
parser_pct = round((covered_sources / total_sources * 100) if total_sources else 0.0, 1)
# --- MITRE coverage ---
# Standard ATT&CK Enterprise tactics (14).
CANONICAL_TACTICS = frozenset({
"Reconnaissance", "Resource Development", "Initial Access", "Execution",
"Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access",
"Discovery", "Lateral Movement", "Collection", "Command and Control",
"Exfiltration", "Impact",
})
# SentinelOne STAR rules sometimes label tactics with non-canonical names.
# Map them to canonical ATT&CK so we don't over-count and exceed 100%.
TACTIC_ALIASES = {
"Stealth": "Defense Evasion",
"Defense Impairment": "Defense Evasion",
}
TOTAL_TACTICS = len(CANONICAL_TACTICS)
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
total_rules = len(rules)
covered_tactics: set = set()
covered_techniques: set = set()
rules_with_mitre = 0
for rule in rules:
try:
raw = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw = {}
tactics = raw.get("tactics", [])
techniques = raw.get("techniques", [])
if tactics or techniques:
rules_with_mitre += 1
for t in tactics:
if not t or t == "Uncategorized":
continue
t = TACTIC_ALIASES.get(t, t)
if t in CANONICAL_TACTICS:
covered_tactics.add(t)
for tech in techniques:
k = tech.get("id") or tech.get("name")
if k:
covered_techniques.add(k)
tactics_covered = len(covered_tactics)
techniques_covered = len(covered_techniques)
mitre_pct = round((tactics_covered / TOTAL_TACTICS * 100), 1)
# --- Rule firing ---
firing_rows = db.query(RuleFiringCache).all()
cache_populated = len(firing_rows) > 0
rules_fired = sum(1 for r in firing_rows if r.alert_count > 0)
if cache_populated and total_rules > 0:
firing_pct = round(rules_fired / total_rules * 100, 1)
else:
firing_pct = 0.0
# --- Weighted health score ---
if cache_populated:
score = round(0.40 * parser_pct + 0.35 * mitre_pct + 0.25 * firing_pct, 1)
else:
# Without firing data, reweight between parser + MITRE
score = round(0.55 * parser_pct + 0.45 * mitre_pct, 1)
return {
"health_score": score,
"parser_pct": parser_pct,
"mitre_pct": mitre_pct,
"firing_pct": firing_pct if cache_populated else None,
"active_sources": total_sources,
"covered_sources": covered_sources,
"rules_loaded": total_rules,
"tactics_covered": tactics_covered,
"techniques_covered": techniques_covered,
"rules_with_mitre": rules_with_mitre,
"rules_fired": rules_fired,
"firing_cache_populated": cache_populated,
"components": {
"parser_coverage": {"value": parser_pct, "weight": 0.40 if cache_populated else 0.55, "label": "Parser Coverage"},
"mitre_coverage": {"value": mitre_pct, "weight": 0.35 if cache_populated else 0.45, "label": "MITRE Coverage"},
"rule_firing": {"value": firing_pct if cache_populated else None, "weight": 0.25 if cache_populated else 0.0, "label": "Rule Firing Rate"},
}
}
@router.get("/health")
def get_health_score(db: Session = Depends(get_db)):
"""Return the current tenant health score and component breakdown."""
h = _compute_health(db)
# Most recent snapshot for trend comparison
prev = db.query(CoverageSnapshot).order_by(CoverageSnapshot.recorded_at.desc()).offset(1).first()
delta = None
if prev:
delta = round(h["health_score"] - prev.health_score, 1)
h["delta_from_previous"] = delta
return h
@router.post("/snapshot")
def record_snapshot(db: Session = Depends(get_db)):
"""Record a coverage snapshot. Called automatically at end of sync-sources."""
h = _compute_health(db)
snap = CoverageSnapshot(
health_score=h["health_score"],
parser_pct=h["parser_pct"],
mitre_pct=h["mitre_pct"],
firing_pct=h["firing_pct"] or 0.0,
active_sources=h["active_sources"],
covered_sources=h["covered_sources"],
rules_loaded=h["rules_loaded"],
tactics_covered=h["tactics_covered"],
techniques_covered=h["techniques_covered"],
rules_with_mitre=h["rules_with_mitre"],
rules_fired=h["rules_fired"],
)
db.add(snap)
db.commit()
return {"recorded": True, "health_score": h["health_score"]}
@router.get("/snapshots")
def get_snapshots(limit: int = 30, db: Session = Depends(get_db)):
"""Return the last N daily snapshots for sparkline charts."""
rows = (
db.query(CoverageSnapshot)
.order_by(CoverageSnapshot.recorded_at.desc())
.limit(limit)
.all()
)
return {
"snapshots": [
{
"date": r.recorded_at.strftime("%Y-%m-%d"),
"health_score": r.health_score,
"parser_pct": r.parser_pct,
"mitre_pct": r.mitre_pct,
"firing_pct": r.firing_pct,
"active_sources": r.active_sources,
"covered_sources": r.covered_sources,
}
for r in reversed(rows) # chronological order
]
}
@router.get("/dependency-map")
def get_dependency_map(db: Session = Depends(get_db)):
"""
Flip of the coverage map: for each detection library rule, show which
data sources it requires. Flags rules as 'at_risk' if any required
source has no parser or has zero recent events.
"""
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
active_sources = {s.source_name: s for s in db.query(ActiveSource).all()}
ds_index, _ = _build_parser_ds_index()
# Build set of source names that are "healthy" (have events + parser)
healthy_sources: set = set()
for name, src in active_sources.items():
has_parser = name in ds_index or (src.parser_detected or 0) > 0
if has_parser and (src.event_count or 0) > 0:
healthy_sources.add(name)
out = []
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
data_sources = raw_data.get("data_sources", [])
tactics = raw_data.get("tactics", [])
techniques = raw_data.get("techniques", [])
generated_alerts = raw_data.get("generated_alerts")
source_statuses = []
at_risk = False
for ds in data_sources:
src = active_sources.get(ds)
if src is None:
status = "inactive"
at_risk = True
elif ds not in healthy_sources:
status = "no_parser"
at_risk = True
else:
status = "healthy"
source_statuses.append({"source": ds, "status": status})
# Rules with no source requirements are not "at risk" (platform-wide rules)
if not data_sources:
at_risk = False
out.append({
"rule": rule.name,
"rule_id": rule.rule_id,
"sources": source_statuses,
"source_count": len(data_sources),
"tactics": tactics,
"techniques": [t.get("id", "") for t in techniques if t.get("id")],
"generated_alerts": generated_alerts,
"at_risk": at_risk,
"no_sources": len(data_sources) == 0,
})
# Sort: at-risk first, then by source count desc, then alphabetical
out.sort(key=lambda r: (not r["at_risk"], -r["source_count"], r["rule"]))
at_risk_count = sum(1 for r in out if r["at_risk"])
healthy_count = sum(1 for r in out if not r["at_risk"] and not r["no_sources"])
return {
"rules": out,
"total": len(out),
"at_risk": at_risk_count,
"healthy": healthy_count,
"no_source_requirements": sum(1 for r in out if r["no_sources"]),
}
@router.get("/onboarding-status")
def get_onboarding_status(db: Session = Depends(get_db)):
"""
Pipeline status for each active source across 6 lifecycle stages.
Returns per-source progress for the onboarding tracker view.
"""
import re as _re
active_sources = db.query(ActiveSource).order_by(ActiveSource.event_count.desc()).all()
ds_index, stub_parsers = _build_parser_ds_index()
stub_names = {s["parser_name"] for s in stub_parsers}
firing_cache = {r.rule_name: r.alert_count for r in db.query(RuleFiringCache).all()}
# rule_by_source: source_name → list of rule names
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
rule_by_source: dict = {}
for rule in rules:
try:
raw_data = json.loads(rule.raw) if rule.raw else {}
except Exception:
raw_data = {}
for ds in raw_data.get("data_sources", []):
rule_by_source.setdefault(ds, []).append(rule.name)
def _normalize(s):
return _re.sub(r"[^a-z0-9]", "", s.lower())
def _find_parser(source_name):
if source_name in ds_index:
return ds_index[source_name]
sn = _normalize(source_name)
for ds_name, info in ds_index.items():
if _normalize(ds_name) in sn or sn in _normalize(ds_name):
return info
return None
out = []
for src in active_sources:
parser_info = _find_parser(src.source_name)
parser_active = (src.parser_detected or 0) > 0
has_ds_name = parser_info is not None and parser_info.get("parser_name") not in stub_names
rules_for_src = rule_by_source.get(src.source_name, [])
rules_firing = any(firing_cache.get(r, 0) > 0 for r in rules_for_src)
has_detection_rules = len(rules_for_src) > 0
# Core stages (apply to every source)
core_stages = [
{"stage": "Data Received", "done": (src.event_count or 0) > 0},
{"stage": "Parser File Exists", "done": parser_info is not None},
{"stage": "Parser Active", "done": parser_active},
{"stage": "Source Labeled", "done": has_ds_name and parser_active},
]
# Detection stages (only meaningful when detection rules exist)
detection_stages = [
{"stage": "Detection Rules", "done": has_detection_rules, "na": False},
{"stage": "Rules Firing", "done": rules_firing, "na": False},
]
if has_detection_rules:
stages = core_stages + detection_stages
total = 6
else:
# Mark detection stages as N/A — don't count against progress
stages = core_stages + [
{"stage": "Detection Rules", "done": False, "na": True},
{"stage": "Rules Firing", "done": False, "na": True},
]
total = 4 # progress calculated over core stages only
completed = sum(1 for s in stages if s.get("done") and not s.get("na"))
out.append({
"source": src.source_name,
"event_count": src.event_count,
"stages": stages,
"completed": completed,
"total": total,
"has_detection_rules": has_detection_rules,
"pct": round(completed / total * 100) if total else 0,
})
# Sort: incomplete first, then by event volume
out.sort(key=lambda x: (x["completed"] == x["total"], -x["event_count"]))
return {
"sources": out,
"fully_onboarded": sum(1 for s in out if s["completed"] == s["total"]),
"in_progress": sum(1 for s in out if 0 < s["completed"] < s["total"]),
"not_started": sum(1 for s in out if s["completed"] == 0),
}
@router.delete("/reset")
def reset_data(db: Session = Depends(get_db)):
db.query(ParsedRule).delete()
db.query(ParserField).delete()
db.query(ActiveSource).delete()
db.commit()
global _unlabelled_event_count
_unlabelled_event_count = -1
return {"cleared": True}
+69 -18
View File
@@ -2,9 +2,15 @@ from datetime import datetime, timedelta
from fastapi import APIRouter, Query, HTTPException
from pydantic import BaseModel
from services import s1_client
from services.async_cache import async_ttl_cache, cache_stats, cache_clear
router = APIRouter()
# Dashboard endpoints can be expensive on busy tenants. Cache results in-process
# for a short TTL so reloads and parallel widgets are instant. Pass ?nocache=1
# to bypass for a forced refresh.
_DASHBOARD_TTL_SECONDS = 300
def _date_range(days: int) -> tuple[str, str]:
now = datetime.utcnow()
@@ -22,41 +28,65 @@ def _date_range_hours(hours: int) -> tuple[str, str]:
)
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _top_sources_cached(hours: int) -> dict:
"""Cache key: hours only. days is normalised to hours upstream."""
from_dt, to_dt = _date_range_hours(hours)
query = "| group events=count() by dataSource.name | sort -events | limit 25"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/top-sources")
async def get_top_sources(
days: int = Query(None, ge=1, le=90),
hours: int = Query(None, ge=1, le=24),
hours: int = Query(None, ge=1, le=720),
nocache: bool = Query(False, description="Bypass dashboard cache"),
):
"""Top log sources by event count over the given period."""
if hours is not None:
from_dt, to_dt = _date_range_hours(hours)
period_label = f"{hours}h"
"""Top log sources by event count.
Note: SDL returns 'internal Scalyr error' when this query uses day-scale
timestamps on busy tenants, but the same window expressed in hours runs
fine. We normalise days -> hours internally for stability.
"""
if hours is None and days is None:
days = 7
if hours is None:
hours = days * 24
period_label = f"{days}d"
else:
from_dt, to_dt = _date_range(days or 7)
period_label = f"{days or 7}d"
query = "| group events=count() by dataSource.name | sort -events | limit 25"
period_label = f"{hours}h"
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt)
cached = await _top_sources_cached(hours, nocache=nocache)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
return {"period": period_label, "data": result.get("events", [])}
return {"period": period_label, "data": cached["data"]}
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _by_event_type_cached(days: int) -> dict:
# Same days->hours normalisation as top-sources for tenant stability.
from_dt, to_dt = _date_range_hours(days * 24)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/by-event-type")
async def get_by_event_type(days: int = Query(7, ge=1, le=90)):
async def get_by_event_type(
days: int = Query(7, ge=1, le=90),
nocache: bool = Query(False),
):
"""Event counts grouped by source and event type."""
from_dt, to_dt = _date_range(days)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt)
cached = await _by_event_type_cached(days, nocache=nocache)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
return {"period_days": days, "data": result.get("events", [])}
return {"period_days": days, "data": cached["data"]}
@router.get("/daily-volume")
async def get_daily_volume(days: int = Query(5, ge=1, le=7)):
"""Total event count per day — queries run in parallel."""
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _daily_volume_cached(days: int) -> list:
import asyncio
now = datetime.utcnow()
@@ -78,6 +108,27 @@ async def get_daily_volume(days: int = Query(5, ge=1, le=7)):
return list(reversed(results))
@router.get("/daily-volume")
async def get_daily_volume(
days: int = Query(5, ge=1, le=7),
nocache: bool = Query(False),
):
"""Total event count per day — queries run in parallel."""
return await _daily_volume_cached(days, nocache=nocache)
@router.get("/cache-stats")
def ingest_cache_stats():
"""Inspect dashboard cache (entry count + TTL remaining per key)."""
return cache_stats()
@router.delete("/cache")
def ingest_cache_clear():
"""Forcefully wipe the dashboard cache (next call refetches from SDL)."""
return {"cleared": cache_clear()}
class FilterRule(BaseModel):
source: str = ""
event_type: str = ""
+79 -5
View File
@@ -11,16 +11,52 @@ router = APIRouter()
PARSERS_DIR = "/app/parsers"
# Files in /app/parsers/ are also used to hold non-parser SDL artefacts
# (UEBA analytics tables, saved searches, dashboard configs) that the SDL
# config-files API returns from the same directory. Detect real parsers by
# looking for parser-config keywords in the file header.
_PARSER_MARKER_RE = re.compile(
r"^\s*(attributes|patterns|formats|patternRefs|rewrites|parser)\s*[:=]",
re.MULTILINE,
)
# Names known to be non-parser SDL configs even if the marker check is fooled.
_PARSER_NAME_DENYLIST = re.compile(
r"^(ueba[_\-]|searches$|alerts$|.*_baselines?_|.*_features?_|.*_scores?_|"
r"bsi[_\-]|.*-overview$|.*[_\-]membership$|.*[_\-]risk$|.*[_\-]smoke[_\-]test$|"
r".*[_\-]test[_\-](default|merge|replace|same))",
re.IGNORECASE,
)
def _looks_like_parser(path: str, name: str) -> bool:
"""Return True if a file under /app/parsers/ is actually a parser config."""
if _PARSER_NAME_DENYLIST.match(name):
return False
try:
with open(path, "r", encoding="utf-8", errors="replace") as fh:
head = fh.read(4096)
except OSError:
return False
return bool(_PARSER_MARKER_RE.search(head))
@router.get("/parsers")
def list_parser_files():
"""List parser filenames available under /app/parsers/ for the Test Runner."""
"""List parser filenames under /app/parsers/ for the Test Runner.
Excludes non-parser SDL artefacts (UEBA tables, searches, dashboards).
"""
try:
names = sorted(
e.name for e in os.scandir(PARSERS_DIR)
candidates = [
e for e in os.scandir(PARSERS_DIR)
if e.is_file() and not e.name.startswith(".")
)
]
except FileNotFoundError:
names = []
return {"parsers": [], "count": 0}
names = sorted(
e.name for e in candidates
if _looks_like_parser(e.path, e.name)
)
return {"parsers": names, "count": len(names)}
@@ -294,6 +330,44 @@ def _to_py_backref(s: str) -> str:
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sample-unlabelled")
async def sample_unlabelled(req: SampleEventsRequest):
"""Return a sample of events that have no dataSource.name — these need parsers.
Also runs a count query so the caller can update the banner with the real total.
"""
import asyncio
from routers import coverage as _coverage
filter_expr = "!(dataSource.name = *) !(source = 'scalyr')"
from_dt, to_dt = _date_range_hours(req.hours)
sample_result, count_result = await asyncio.gather(
s1_client.run_powerquery(f"{filter_expr} | limit {req.limit}", from_dt, to_dt),
s1_client.run_powerquery(f"{filter_expr} | group events=count()", from_dt, to_dt, max_count=50_000_000),
)
rows = sample_result if isinstance(sample_result, list) else (sample_result.get("rows") or sample_result.get("events") or [])
events = [_flatten_event(row) for row in rows]
non_empty_keys: set = set()
for ev in events:
for k, v in ev.items():
if v is not None and v != "" and v != "null":
non_empty_keys.add(k)
events = [{k: v for k, v in ev.items() if k in non_empty_keys} for ev in events]
count_rows = count_result.get("events", []) if isinstance(count_result, dict) else []
total = count_rows[0].get("events", 0) if count_rows else 0
_coverage._unlabelled_event_count = total
return {
"events": events,
"count": len(events),
"total": total,
"hours": req.hours,
"columns_seen": sorted(non_empty_keys),
}
@router.post("/sample-events")
async def sample_events(req: SampleEventsRequest):
"""Return a sample of raw events from a given data source."""
+73
View File
@@ -0,0 +1,73 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from datetime import datetime, timedelta
from services import s1_client
router = APIRouter()
def _date_range(hours: int | None = None, days: int | None = None) -> tuple[str, str]:
now = datetime.utcnow()
if hours:
delta = timedelta(hours=hours)
else:
delta = timedelta(days=days or 1)
return (
(now - delta).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
now.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
)
PRESET_QUERIES = [
{"label": "Top sources by volume", "query": "| group events=count() by dataSource.name | sort -events | limit 25"},
{"label": "Unlabelled events", "query": "!(dataSource.name = *) !(source = 'scalyr') | group events=count() by source | sort -events | limit 25"},
{"label": "Events by type", "query": "| group events=count() by dataSource.name, event.type | sort -events | limit 50"},
{"label": "Failed logins", "query": "| filter event.type = 'Logon' | filter event.outcome = 'FAILED' | group count() by user.name, src.ip | sort -count() | limit 25"},
{"label": "Process executions", "query": "| filter event.type = 'Process Creation' | group count() by src.process.name | sort -count() | limit 25"},
{"label": "Network connections by dest", "query": "| filter event.type = 'IP Connect' | group count() by dst.ip | sort -count() | limit 25"},
{"label": "Rules firing (30d)", "query": "| filter ruleName != '' | group alerts=count() by ruleName | sort -alerts | limit 50"},
]
class QueryRequest(BaseModel):
query: str
hours: int | None = None
days: int | None = None
max_count: int = 1000
@router.get("/presets")
def get_presets():
return {"presets": PRESET_QUERIES}
@router.post("/run")
async def run_query(req: QueryRequest):
"""Run a PowerQuery against the Singularity Data Lake."""
if not req.query.strip():
raise HTTPException(400, "Query cannot be empty")
if req.max_count > 10_000:
req.max_count = 10_000
from_dt, to_dt = _date_range(hours=req.hours, days=req.days)
try:
result = await s1_client.run_powerquery(req.query, from_dt, to_dt, max_count=req.max_count)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
err = result.get("error") if isinstance(result, dict) else None
if err:
raise HTTPException(502, f"PowerQuery error: {err}")
events = result.get("events", [])
columns = sorted({k for row in events for k in row.keys()}) if events else []
return {
"rows": len(events),
"columns": columns,
"events": events,
"from": from_dt,
"to": to_dt,
"query": req.query,
}
+1
View File
@@ -14,6 +14,7 @@ FIELDS = [
{"key": "S1_API_TOKEN", "label": "Console API Token", "secret": True, "placeholder": "eyJ..."},
{"key": "SDL_XDR_URL", "label": "SDL XDR URL", "secret": False, "placeholder": "https://xdr.us1.sentinelone.net"},
{"key": "SDL_LOG_READ_KEY", "label": "SDL Log Read Key", "secret": True, "placeholder": "1DnK0Y4e..."},
{"key": "SDL_CONFIG_READ_KEY", "label": "SDL Config Read Key", "secret": True, "placeholder": "Needs 'Manage config files' permission"},
{"key": "ANTHROPIC_API_KEY", "label": "Anthropic API Key", "secret": True, "placeholder": "sk-ant-..."},
]
+84
View File
@@ -0,0 +1,84 @@
"""Tiny in-process async TTL cache for backend endpoints.
Two-fold benefit:
* Identical concurrent calls share one upstream PowerQuery (single-flight).
* Repeat reads within TTL return instantly (no SDL round-trip).
Designed for read-only dashboard endpoints. Keep it stdlib-only so it adds
no dependency. Caches live until the process restarts.
Usage:
@async_ttl_cache(ttl_seconds=300)
async def get_top_sources(...): ...
"""
from __future__ import annotations
import asyncio
import functools
import time
from typing import Any, Awaitable, Callable, Tuple
# Maps cache-key -> (expires_at, value)
_STORE: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], Tuple[float, Any]] = {}
# Maps cache-key -> asyncio.Lock for single-flight
_LOCKS: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], asyncio.Lock] = {}
def _make_key(name: str, args: tuple, kwargs: dict) -> Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]]:
# Skip the special "nocache" kwarg so it doesn't fragment the cache.
kw = tuple(sorted((k, v) for k, v in kwargs.items() if k != "nocache"))
return (name, args, kw)
def async_ttl_cache(ttl_seconds: int = 300) -> Callable:
"""Decorator: cache an async function's result for ttl_seconds.
The wrapped function may accept an optional `nocache=True` kwarg to
bypass + refresh the cache for that call.
"""
def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
nocache = bool(kwargs.pop("nocache", False))
key = _make_key(fn.__qualname__, args, kwargs)
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
lock = _LOCKS.setdefault(key, asyncio.Lock())
async with lock:
# Re-check after acquiring lock — another caller may have populated.
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
value = await fn(*args, **kwargs)
_STORE[key] = (time.monotonic() + ttl_seconds, value)
return value
return wrapper
return decorator
def cache_stats() -> dict:
"""Debug helper: return current cache entries (no values)."""
now = time.monotonic()
return {
"entries": len(_STORE),
"live": [
{"key": str(k), "ttl_remaining_s": round(v[0] - now, 1)}
for k, v in _STORE.items()
if v[0] > now
],
}
def cache_clear() -> int:
"""Wipe the cache; returns the number of entries removed."""
n = len(_STORE)
_STORE.clear()
return n
+97
View File
@@ -0,0 +1,97 @@
"""Background pre-warmer for the Ingest Dashboard cache.
Opt-in via env: INGEST_PREWARM=1
Tunable via env: INGEST_PREWARM_INTERVAL_SECONDS (default 240, just under TTL)
INGEST_PREWARM_HOURS (default "1,24,168")
INGEST_PREWARM_DAYS (default "7")
INGEST_PREWARM_DAILY_VOLUME_DAYS (default "5")
The pre-warmer re-runs the heavy Ingest Dashboard queries every ~4 min so the
in-process TTL cache is always populated. First user hit then returns from
cache (sub-millisecond) instead of waiting 30-60s for SDL.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
# Use the uvicorn logger so messages show up in `docker logs` alongside requests.
log = logging.getLogger("uvicorn.error")
_PREFIX = "prewarmer:"
def _flag_enabled() -> bool:
return os.environ.get("INGEST_PREWARM", "").lower() in ("1", "true", "yes", "on")
def _int_list(env: str, default: str) -> list[int]:
raw = os.environ.get(env, default)
out = []
for tok in raw.split(","):
tok = tok.strip()
if tok and tok.isdigit():
out.append(int(tok))
return out
async def _warm_once() -> dict:
"""Run all configured warm-up queries once. Returns timing summary."""
# Local import to avoid circular dependency with FastAPI router module.
from routers.ingest import (
_top_sources_cached,
_by_event_type_cached,
_daily_volume_cached,
)
hours_list = _int_list("INGEST_PREWARM_HOURS", "1,24,168")
days_list = _int_list("INGEST_PREWARM_DAYS", "7")
dv_days = _int_list("INGEST_PREWARM_DAILY_VOLUME_DAYS", "5") or [5]
tasks: list[tuple[str, asyncio.Task]] = []
for h in hours_list:
tasks.append((f"top-sources hours={h}",
asyncio.create_task(_top_sources_cached(h, nocache=True))))
for d in days_list:
tasks.append((f"by-event-type days={d}",
asyncio.create_task(_by_event_type_cached(d, nocache=True))))
for d in dv_days:
tasks.append((f"daily-volume days={d}",
asyncio.create_task(_daily_volume_cached(d, nocache=True))))
summary: dict[str, str] = {}
for label, task in tasks:
t0 = time.monotonic()
try:
await task
summary[label] = f"OK in {time.monotonic() - t0:.1f}s"
except Exception as e:
summary[label] = f"ERR ({e.__class__.__name__}: {str(e)[:120]})"
return summary
async def _loop():
interval = int(os.environ.get("INGEST_PREWARM_INTERVAL_SECONDS", "240"))
log.info("%s starting (interval=%ds)", _PREFIX, interval)
# Tiny initial delay so we don't compete with startup work.
await asyncio.sleep(5)
while True:
try:
summary = await _warm_once()
for label, status in summary.items():
log.info("%s %s -> %s", _PREFIX, label, status)
except asyncio.CancelledError:
raise
except Exception as e:
log.warning("%s cycle failed: %s", _PREFIX, e)
await asyncio.sleep(interval)
def start_if_enabled() -> asyncio.Task | None:
"""Spawn the pre-warm background task if INGEST_PREWARM is enabled.
Returns the task handle, or None if disabled."""
if not _flag_enabled():
log.info("%s disabled (set INGEST_PREWARM=1 to enable)", _PREFIX)
return None
log.info("%s scheduling background task", _PREFIX)
return asyncio.create_task(_loop(), name="ingest-prewarmer")
+30 -8
View File
@@ -6,6 +6,12 @@ from datetime import datetime, timezone
BASE_URL = os.environ.get("S1_BASE_URL", "https://demo.sentinelone.net").rstrip("/")
TOKEN = os.environ.get("S1_API_TOKEN", "")
# Configurable PowerQuery timeout — SDL queries on large tenants can exceed 2 min.
# Set SDL_PQ_TIMEOUT in .env (seconds). Default: 600.
SDL_PQ_TIMEOUT = int(os.environ.get("SDL_PQ_TIMEOUT", "600"))
# How many times to retry on ReadTimeout before giving up. Default: 1 (one retry).
SDL_PQ_TIMEOUT_RETRIES = int(os.environ.get("SDL_PQ_TIMEOUT_RETRIES", "1"))
# Scalyr/XDR PowerQuery credentials — from SDL_XDR_URL + SDL_LOG_READ_KEY
# in the SentinelOne console: Settings → Integrations → Data Lake API Keys
SDL_XDR_URL = os.environ.get("SDL_XDR_URL", "https://xdr.us1.sentinelone.net").rstrip("/")
@@ -97,7 +103,7 @@ async def get_library_rules(page_size: int = 100) -> list:
return results
async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
async def run_powerquery(query: str, from_date: str, to_date: str, max_count: int = 1000) -> dict:
"""
Run a PowerQuery against the Singularity Data Lake via the Scalyr XDR API.
Uses SDL_XDR_URL + SDL_LOG_READ_KEY (Scalyr readlog token).
@@ -114,11 +120,15 @@ async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
"query": query,
"startTime": start_ms,
"endTime": end_ms,
"maxCount": 1000,
"maxCount": max_count,
}
async with httpx.AsyncClient(timeout=120) as client:
for attempt in range(3):
# Use a generous read timeout for PowerQuery — large SDL scans can be slow.
pq_timeout = httpx.Timeout(connect=15.0, read=SDL_PQ_TIMEOUT, write=30.0, pool=15.0)
max_attempts = 2 + SDL_PQ_TIMEOUT_RETRIES # base 2 (rate-limit) + timeout retries
async with httpx.AsyncClient(timeout=pq_timeout) as client:
for attempt in range(max_attempts):
try:
resp = await client.post(
f"{SDL_XDR_URL}/api/powerQuery",
@@ -126,12 +136,24 @@ async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
)
resp.raise_for_status()
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < 2:
await asyncio.sleep(10 * (attempt + 1))
except httpx.ReadTimeout:
if attempt < max_attempts - 1:
await asyncio.sleep(5)
continue
raise RuntimeError(
f"HTTP {e.response.status_code} from {e.request.url}: {e.response.text[:500]}"
f"PowerQuery timed out after {SDL_PQ_TIMEOUT}s "
f"(increase SDL_PQ_TIMEOUT in .env). Query: {query[:200]}"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_attempts - 1:
await asyncio.sleep(10 * (attempt + 1))
continue
try:
detail = e.response.json()
except Exception:
detail = e.response.text[:500]
raise RuntimeError(
f"HTTP {e.response.status_code} from {e.request.url}: {detail}"
) from e
data = resp.json()
+7
View File
@@ -16,9 +16,16 @@ services:
- SDL_XDR_URL=${SDL_XDR_URL}
- SDL_LOG_READ_KEY=${SDL_LOG_READ_KEY}
- SDL_CONFIG_READ_KEY=${SDL_CONFIG_READ_KEY}
- SDL_PQ_TIMEOUT=${SDL_PQ_TIMEOUT:-600}
- SDL_PQ_TIMEOUT_RETRIES=${SDL_PQ_TIMEOUT_RETRIES:-1}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- DATABASE_URL=postgresql://siem:siem@db:5432/siem
- DETECTIONS_FILE=/app/data/detections.json
- INGEST_PREWARM=${INGEST_PREWARM:-0}
- INGEST_PREWARM_HOURS=${INGEST_PREWARM_HOURS:-1,24,168}
- INGEST_PREWARM_DAYS=${INGEST_PREWARM_DAYS:-7}
- INGEST_PREWARM_DAILY_VOLUME_DAYS=${INGEST_PREWARM_DAILY_VOLUME_DAYS:-5}
- INGEST_PREWARM_INTERVAL_SECONDS=${INGEST_PREWARM_INTERVAL_SECONDS:-240}
depends_on:
db:
condition: service_healthy
+1266 -240
View File
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+14
View File
@@ -0,0 +1,14 @@
{
attributes: {
"dataSource.category": "security",
"dataSource.name": "Palo Alto Networks",
"dataSource.vendor": "Palo Alto Networks"
}
formats: [
{
id: "traffic-11-0",
format: "$network_activity.future_use_1$,$network_activity.receive_time$,$firewall.serial_number$,$network_activity.sub_type$,$timestamp$,$src.ip.address$,$dst.ip.address$,$network_endpoint.nat_src_ip$,$network_endpoint.nat_dst_ip$,$rule.name$,$user.src_name$,$user.dst_name$,$network_activity.app_name$,$network_traffic.virtual_system_name$,$source_zone$,$destination_zone$,$network_interface.inbound_name$,$network_interface.outbound_name$,$network_activity.log_action$,$session.uid$,$network_activity.repeat_count$,$network_endpoint.src_port$,$network_endpoint.dst_port$,$network_connection_info.flag$,$network_connection_info.protocol_name$,$network_activity.action$,$network_traffic.bytes$,$network_traffic.bytes_out$,$network_traffic.bytes_in$,$network_traffic.packets$,$network_activity.start_time_dt$,$network_activity.elapsed_time$,$network_activity.category_name$,$network_activity.sequence_number$,$network_activity.action_flags$,$location.src_country$,$location.dst_country$,$network_traffic.packets_out$,$network_traffic.packets_in$,$session.expiration_reason$,$device.group_hierarchy.level_1$,$device.group_hierarchy.level_2$,$device.group_hierarchy.level_3$,$device.group_hierarchy.level_4$,$firewall.virtual_system_name$,$device.name$,$network_activity.action_source$,$virtual_machine.src_vm_uuid$,$virtual_machine.dst_vm_uuid$,$device.imsi$,$device.imei$,$session.parent_uid$,$network_activity.parent_start_time_dt$,$network_connection_info.tunnel_type$,$network_connection_info.sctp_id$,$network_connection_info.sctp_chunks$,$network_connection_info.sctp_chunks_out$,$network_connection_info.sctp_chunks_in$,$rule.uid$,$network_activity.http_connection$,$network_connection_info.app_flap_count$,$policy.uid$,$network_connection_info.link_switches$,$network_connection_info.sd_wan_cluster$,$network_connection_info.sd_wan_device_type$,$network_connection_info.sd_wan_cluster_type$,$network_connection_info.sd_wan_site$,$user.groups$,$http_request.x_forwarded_for$,$device.src_type$,$device.src_profile$,$device.src_model$,$device.src_vendor_name$,$device.src_os_edition$,$device.src_os_version$,$network_connection_info.src_hostname$,$device.src_mac$,$device.dst_type$,$device.dst_profile$,$device.dst_model$,$device.dst_vendor_name$,$network_connection_info.dst_hostname$,$network_connection_info.dst_mac$,$container.id$,$container.pod_namespace$,$container.pod_name$,$network_endpoint.src_host_list$,$network_endpoint.dst_host_list$,$network_endpoint.host_id$,$device_hardware_info.serial_number$,$policy.src_group$,$policy.dst_group$,$session.owner$,$network_activity.time$,$network_activity.a_slice.service_type$,$network_activity.a_slice.differentiator$,$network_activity.sub_category$,$network_activity.app_model$,$network_activity.severity$,$network_activity.container.id$,$network_activity.app_tunnel_type$,$network_activity.is_saas$,$network_activity.is_sanctioned$,$network_activity.is_offloaded$,$network_activity.flow_type$,$network_activity.cluster.name$",
halt: true
}
]
}
+91
View File
@@ -0,0 +1,91 @@
{
// specify a time zone if the timestamps in your log are not in GMT
//timezone: "Europe/Prague"
attributes: { "dataset.technology":"firewall", "dataset.vendor":"palo_alto", "dataset.app":"palo_alto" }
patterns: {
//maps to high_resolution_timestamp:
// timestamp: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}(\\+|-)\\d{2}:\\d{2}"
tsPatternPA: "[A-za-z]+\\s+\\d{1,2} [\\d:]+"
//application_characteristic can be a single value, a comma delimited list in quotes, or blank. Null value is handled by format: traffic-2, not by this pattern.
application_characteristic: "(\".*\")|[^,]+"
//description field from system log is wrapped in quotes and may contain commas"
description: "(\".*\")"
//discard future_use fields
misc: "[^,]*"
}
formats: [
//change pattern depending on the timestamp fomat
{
format: ".*$timestamp=tsPatternPA$(\\,)*",
},
{
//match all fields. application_characteristic can be a single value, or a comma delimited list in quotes.
id: "traffic-1",
attributes: { type: "TRAFFIC", format: "traffic-1" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,TRAFFIC,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,$bytes$,$bytes_sent$,$bytes_received$,$packets$,$start_time$,$elapsed_time$,$category$,$test$,$sequence_number$,$action_flags$,$source_country$,$destination_country$,,$packets_sent$,$packets_received$,$session_end_reason$,\\d+,\\d+,\\d+,\\d+,$virtual_system_name$,$device_name$,$action_source$,$source_vm_uuid$,$destination_vm_uuid$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$sctp_association_id$,$sctp_chunks$,$sctp_chunks_sent$,$sctp_chunks_received$,$rule_uuid$,$http_2_connection$,$app_flap_count$,$policy_id$,$link_switches$,$sd-wan_cluster$,$sd-wan_device_type$,$sd-wan_cluster_type$,$sd-wan_site$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$session_owner$,$high_resolution_timestamp$,$a_slice_service_type$,$a_slice_differentiator$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,$application_characteristic=application_characteristic$,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$,$offloaded$",
halt: true
},
{
//dont match on application_characteristic for cases where is it blank.
id: "traffic-2",
attributes: { type: "TRAFFIC", format: "traffic-2" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,TRAFFIC,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,$bytes$,$bytes_sent$,$bytes_received$,$packets$,$start_time$,$elapsed_time$,$category$,$test$,$sequence_number$,$action_flags$,$source_country$,$destination_country$,,$packets_sent$,$packets_received$,$session_end_reason$,\\d+,\\d+,\\d+,\\d+,$virtual_system_name$,$device_name$,$action_source$,$source_vm_uuid$,$destination_vm_uuid$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$sctp_association_id$,$sctp_chunks$,$sctp_chunks_sent$,$sctp_chunks_received$,$rule_uuid$,$http_2_connection$,$app_flap_count$,$policy_id$,$link_switches$,$sd-wan_cluster$,$sd-wan_device_type$,$sd-wan_cluster_type$,$sd-wan_site$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$session_owner$,$high_resolution_timestamp$,$a_slice_service_type$,$a_slice_differentiator$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$,$offloaded$",
halt: true
},
{
id: "system",
attributes: { type: "SYSTEM", format: "system" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,SYSTEM,$content_threat_type$,.*,$generated_time$,$virtual_system$,$event_id$,$object$,.*,.*,$module$,$severity$,$description=description$,$sequence_number$,$action_flags$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,.*,.*,$high_resolution_timestamp$",
halt: true
},
{
//matches THREAT logs with comma surround lists in application_characteristic and url_category_list.
// Matches THREAT logs with commas surrounding user_agent
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-0",
attributes: { type: "THREAT", format: "threat-0" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,\"$user_agent$\",$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,\"$url_category_list$\",$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs with comma surround lists in application_characteristic and url_category_list.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-1",
attributes: { type: "THREAT", format: "threat-1" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,\"$url_category_list$\",$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs without comma surround list in url_category_list.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-2",
attributes: { type: "THREAT", format: "threat-2" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,$url_category_list$,$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,\"$application_characteristic$\",$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
//matches THREAT logs without comma surround list in url_category_list or application_characteristic.
//PAN OS 10.2 will add $cloud_report_id to the end of this log format. If this no longer matches threat logs, check for the extra field at the end.
id: "threat-3",
attributes: { type: "THREAT", format: "threat-3" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,THREAT,$threat_content_type$,\\d+,$Generated_time$,$source_address$,$destination_address$,$nat_source_address$,$nat_destination_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$misc=misc$,$Session_id$,$repeat_count$,$source_port$,$destination_port$,$nat_source_port$,$nat_destination_port$,$flags$,$ip_protocol$,$action$,$url_filename$,$threat_id$,$category$,$severity$,$direction$,$sequence_number$,$action_flags$,$source_location$,$destination_location$,$misc=misc$,$Content_type$,$pcap_id$,$file_digest$,$cloud$,$url_index$,$user_agent$,$file_type$,$x-forwarded-for$,$referer$,$sender$,$subject$,$recipient$,$report_id$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$misc=misc$,$Source_vm_uuid$,$destination_vm_uuid$,$http_method$,$tunnel_id_imsi$,$monitor_tag_imei$,$parent_session_id$,$parent_start_time$,$tunnel_type$,$threat_category$,$content_version$,$misc=misc$,$Sctp_association_id$,$payload_protocol_id$,$http_headers$,$url_category_list$,$rule_uuid$,$http_2_connection$,$dynamic_user_group_name$,$xff_address$,$source_device_category$,$source_device_profile$,$source_device_model$,$source_device_vendor$,$source_device_os_family$,$source_device_os_version$,$source_hostname$,$source_mac_address$,$destination_device_category$,$destination_device_profile$,$destination_device_model$,$destination_device_vendor$,$destination_device_os_family$,$destination_device_os_version$,$destination_hostname$,$destination_mac_address$,$container_id$,$pod_namespace$,$pod_name$,$source_external_dynamic_list$,$destination_external_dynamic_list$,$host_id$,$serial_number$,$domain_edl$,$source_dynamic_address_group$,$destination_dynamic_address_group$,$partial_hash$,$high_resolution_timestamp$,$reason$,$justification$,$a_slice_service_type$,$application_subcategory$,$application_category$,$application_technology$,$application_risk$,$application_characteristic$,$application_container$,$tunneled_application$,$application_saas$,$application_sanctioned_state$",
halt: true
},
{
id: "userid",
attributes: { type: "USERID", format: "userid" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$serial_number$,USERID,$threat_content_type$,$misc=misc$,$generated_time$,$virtual_system$,$source_ip$,$user$,$data_source_name$,$event_id$,$repeat_count$,$time_out_threshold$,$source_port$,$destination_port$,$data_source$,$data_source_type$,$sequence_number$,$action_flags$,$device_group_hierarchy_level_1$,$device_group_hierarchy_level_2$,$device_group_hierarchy_level_3$,$device_group_hierarchy_level_4$,$virtual_system_name$,$device_name$,$virtual_system_id$,$factor_type$,$factor_completion_time$,$factor_number$,$user_group_flags$,$user_by_source$,$misc=misc$,$high_resolution_timestamp$",
halt: true
},
{
//dont match on application_characteristic for cases where is it blank.
id: "basic",
attributes: { format: "basic" },
format: "\\<[^\\>]+\\>$timestamp=tsPatternPA$ $hostname$ $log_version$,$receive_time$,$firewall_serial_number$,$type$,$sub_type$,\\d+,$generate_time$,$source_address$,$destination_address$,$source_nat_address$,$destination_nat_address$,$rule_name$,$source_user$,$destination_user$,$application$,$virtual_system$,$source_zone$,$destination_zone$,$inbound_interface$,$outbound_interface$,$log_action$,$time_stamp$,$session_id$,$repeat_count$,$source_port$,$destination_port$,$source_nat_port$,$destination_nat_port$,$flag$,$protocol$,$action$,.*",
},
]
}
+222
View File
@@ -0,0 +1,222 @@
#!/usr/bin/env bash
# tools/sync-upstream.sh
# Pull the latest changes from upstream (mickbrowns1/SIEM-Toolkit) while
# preserving the fork's improvements, then verify the fork invariants
# still hold. Designed to be safe to run repeatedly.
#
# Usage:
# ./tools/sync-upstream.sh # rebase (clean linear history)
# ./tools/sync-upstream.sh --merge # merge-commit instead of rebase
# ./tools/sync-upstream.sh --no-rebuild # skip docker rebuild + verify
# ./tools/sync-upstream.sh --no-push # don't auto-push at the end
# ./tools/sync-upstream.sh --dry-run # show what would happen
#
# Exit codes:
# 0 fully up-to-date or sync succeeded and all invariants pass
# 1 pre-condition failed (dirty tree, wrong remote, etc.)
# 2 merge / rebase conflicts (resolve manually, then re-run with --resume)
# 3 one or more fork invariants regressed after sync
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$REPO_DIR"
# --- defaults -----------------------------------------------------------
MODE=rebase
DO_REBUILD=1
DO_PUSH=1
DRY_RUN=0
UPSTREAM_REMOTE="${UPSTREAM_REMOTE:-upstream}"
UPSTREAM_BRANCH="${UPSTREAM_BRANCH:-main}"
ORIGIN_REMOTE="${ORIGIN_REMOTE:-origin}"
BACKEND_URL="${BACKEND_URL:-http://localhost:8001}"
BACKEND_CONTAINER="${BACKEND_CONTAINER:-siem-toolkit-patched-backend-1}"
while [[ $# -gt 0 ]]; do
case "$1" in
--merge) MODE=merge ;;
--no-rebuild) DO_REBUILD=0 ;;
--no-push) DO_PUSH=0 ;;
--dry-run) DRY_RUN=1; DO_REBUILD=0; DO_PUSH=0 ;;
-h|--help)
sed -n '2,/^$/p' "$0" | sed 's/^# \{0,1\}//'
exit 0 ;;
*) echo "unknown arg: $1" >&2; exit 1 ;;
esac
shift
done
bold() { printf '\033[1m%s\033[0m\n' "$*"; }
red() { printf '\033[31m%s\033[0m\n' "$*"; }
green(){ printf '\033[32m%s\033[0m\n' "$*"; }
yellow(){ printf '\033[33m%s\033[0m\n' "$*"; }
# --- 1. pre-conditions --------------------------------------------------
bold "== 1. pre-conditions =="
if ! git remote get-url "$UPSTREAM_REMOTE" >/dev/null 2>&1; then
red "no '$UPSTREAM_REMOTE' remote configured. Add with:"
echo " git remote add upstream https://github.com/mickbrowns1/SIEM-Toolkit.git"
exit 1
fi
echo " upstream remote : $(git remote get-url "$UPSTREAM_REMOTE")"
echo " origin remote : $(git remote get-url "$ORIGIN_REMOTE")"
if [[ -n "$(git status --porcelain)" ]]; then
red "working tree is not clean. Commit or stash changes first:"
git status -s
exit 1
fi
green " working tree clean"
CUR_BRANCH=$(git rev-parse --abbrev-ref HEAD)
echo " current branch : $CUR_BRANCH"
# --- 2. snapshot --------------------------------------------------------
SAFETY_TAG="safety/$(date +%Y%m%d-%H%M%S)"
bold "== 2. safety tag =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would create tag $SAFETY_TAG"
else
git tag "$SAFETY_TAG"
echo " created $SAFETY_TAG"
fi
# --- 3. fetch upstream --------------------------------------------------
bold "== 3. fetch upstream =="
git fetch "$UPSTREAM_REMOTE" --quiet
echo " fetched ${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
HEAD_SHA=$(git rev-parse HEAD)
UP_SHA=$(git rev-parse "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
MB=$(git merge-base HEAD "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
NEW_COUNT=$(git rev-list --count "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
OUR_COUNT=$(git rev-list --count "${MB}..HEAD")
echo " HEAD : $HEAD_SHA"
echo " upstream/$UPSTREAM_BRANCH : $UP_SHA"
echo " merge-base : $MB"
echo " upstream commits : $NEW_COUNT new"
echo " our commits ahead : $OUR_COUNT"
if [[ "$NEW_COUNT" == 0 ]]; then
green "== already current with upstream =="
NEW_SYNC=0
else
NEW_SYNC=1
bold "-- new upstream commits --"
git log --oneline "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
fi
# --- 4. apply (rebase or merge) ----------------------------------------
if [[ "$NEW_SYNC" == 1 ]]; then
bold "== 4. applying upstream changes ($MODE) =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would $MODE $UPSTREAM_REMOTE/$UPSTREAM_BRANCH into $CUR_BRANCH"
else
if [[ "$MODE" == "rebase" ]]; then
if ! git rebase "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"; then
red "rebase has conflicts."
echo "Resolve, then run: git rebase --continue"
echo "Or abort with : git rebase --abort"
echo "Recover snapshot : git reset --hard $SAFETY_TAG"
exit 2
fi
else
if ! git merge --no-ff "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}" \
-m "Sync upstream $(date +%Y-%m-%d)"; then
red "merge has conflicts."
echo "Resolve, then commit. Recover with: git reset --hard $SAFETY_TAG"
exit 2
fi
fi
green " ${MODE} succeeded"
fi
fi
# --- 5. rebuild + verify invariants ------------------------------------
if [[ "$DO_REBUILD" == 1 ]]; then
bold "== 5. rebuild backend + run invariants =="
docker compose up -d --force-recreate --build backend 2>&1 | tail -5
echo " waiting 15s for startup..."
sleep 15
FAILS=0
check() {
local label="$1" cmd="$2" expect="$3"
local got
got="$(eval "$cmd" 2>/dev/null || echo '<error>')"
if [[ "$got" == "$expect" ]]; then
green " PASS $label ($got)"
else
red " FAIL $label expected='$expect' got='$got'"
FAILS=$((FAILS + 1))
fi
}
# Invariant 1: Parser dropdown excludes ueba_* artefacts (fix 70f3f83)
check "parser dropdown excludes ueba_*" \
"curl -fsS $BACKEND_URL/api/quality/parsers | python3 -c 'import sys,json; d=json.load(sys.stdin); print(sum(1 for p in d[\"parsers\"] if p.lower().startswith(\"ueba\")))'" \
"0"
# Invariant 2: MITRE coverage is <= 100 (fix f821151)
check "mitre_pct <= 100" \
"curl -fsS $BACKEND_URL/api/coverage/health | python3 -c 'import sys,json; d=json.load(sys.stdin); print(d[\"mitre_pct\"] <= 100)'" \
"True"
# Invariant 3: ingest cache endpoints exist (fix 0a01a56)
check "/api/ingest/cache-stats exists" \
"curl -fsS -o /dev/null -w '%{http_code}' $BACKEND_URL/api/ingest/cache-stats" \
"200"
# Invariant 4: /sample-unlabelled is registered as a POST route (port from
# upstream sync). GET to it should return 405 Method Not Allowed (route
# exists, wrong method) rather than 404 (route missing).
# Note: -f is omitted because 405 is the expected non-2xx status here.
check "/api/quality/sample-unlabelled registered" \
"curl -sS -o /dev/null -w '%{http_code}' -X GET $BACKEND_URL/api/quality/sample-unlabelled" \
"405"
# Invariant 5: prewarmer scheduled (fix fec3568) — only if INGEST_PREWARM=1.
# Poll up to 30s because the task logs 'starting' a few seconds after the
# FastAPI startup phase finishes (postgres + lib autoload first).
if grep -q '^INGEST_PREWARM=1' .env 2>/dev/null; then
prewarm_ok=0
for _ in 1 2 3 4 5 6; do
if docker logs "$BACKEND_CONTAINER" 2>&1 | grep -q 'prewarmer:.*starting'; then
prewarm_ok=1; break
fi
sleep 5
done
if [[ "$prewarm_ok" == 1 ]]; then
green " PASS prewarmer started"
else
red " FAIL prewarmer did not log 'starting' within 30s (INGEST_PREWARM=1 but task missing)"
FAILS=$((FAILS + 1))
fi
else
yellow " SKIP prewarmer (INGEST_PREWARM not enabled in .env)"
fi
if [[ "$FAILS" -gt 0 ]]; then
red "== $FAILS invariant(s) regressed after sync =="
echo "Recover the pre-sync state with: git reset --hard $SAFETY_TAG"
exit 3
fi
green " all invariants pass"
fi
# --- 6. push -----------------------------------------------------------
if [[ "$DO_PUSH" == 1 && "$NEW_SYNC" == 1 ]]; then
bold "== 6. push to $ORIGIN_REMOTE/$CUR_BRANCH =="
git push "$ORIGIN_REMOTE" "$CUR_BRANCH" --force-with-lease
green " pushed"
fi
bold "== done =="
echo " branch : $CUR_BRANCH"
echo " HEAD : $(git rev-parse --short HEAD)"
echo " safety snapshot: $SAFETY_TAG (delete with: git tag -d $SAFETY_TAG)"