4 Commits

Author SHA1 Message Date
marc 99d63837b5 Add tools/sync-upstream.sh: safe upstream-sync workflow
Wraps the recurring 'fetch upstream, rebase, verify invariants, push'
workflow into a single command with safety nets:

- creates a tag snapshot before mutating the branch
- aborts on dirty tree
- rebase by default (--merge for merge-commit instead)
- after sync, rebuilds the backend container and verifies 5 fork-only
  invariants are still met (parser dropdown filtered, mitre_pct <= 100,
  cache endpoints present, /sample-unlabelled present, prewarmer task
  scheduled when opted in)
- exits non-zero with the recovery command if invariants regress
- optional --dry-run / --no-rebuild / --no-push for ad-hoc inspection
2026-05-22 20:50:28 +02:00
marc fec356829c Ingest Dashboard: optional background cache pre-warmer
Adds an asyncio background task that re-runs the heavy Ingest Dashboard
queries every ~4 min (just under the 5 min TTL) so the in-process cache
is always populated. First user hit on any dashboard widget then returns
from cache (single-digit ms) instead of waiting 30-60s for SDL.

Components:
  - backend/services/prewarmer.py: standalone module, opt-in via
    INGEST_PREWARM=1; configurable windows via INGEST_PREWARM_HOURS /
    INGEST_PREWARM_DAYS / INGEST_PREWARM_DAILY_VOLUME_DAYS and interval
    via INGEST_PREWARM_INTERVAL_SECONDS. Logs through the uvicorn logger
    so cycles are visible in 'docker logs'.
  - backend/main.py: spawn the task on FastAPI startup.
  - docker-compose.yml: forward INGEST_PREWARM* env vars to the
    backend service (default off).

Measured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on Purple fMeasured on Purple AI tenant (INGEMeasured on Purple AI tenant (INGEMeasured on  (INGEST_PREWARM=0) so non-opt-in
users see no behaviour change.
2026-05-22 20:41:36 +02:00
marc 0a01a56218 Ingest Dashboard: 5min TTL cache + days->hours normalisation
Dashboard reloads on multi-day windows could take 30-60s and sometimes
returned HTTP 502 ('internal Scalyr error') when the SDL window was
expressed in days. Two-part fix:

1. In-process async TTL cache (services/async_cache.py)
   - 5 min TTL on top-sources, by-event-type, daily-volume.
   - Single-flight lock per cache key (no thundering herd).
   - Optional ?nocache=1 query param to force a refresh.
   - New endpoints: GET /api/ingest/cache-stats, DELETE /api/ingest/cache.

2. Normalise days -> hours upstream of the PowerQuery
   - SDL is unstable on day-scale windows for large group-by counts on
     this tenant but stable on the equivalent hour-scale window.
   - top-sources?days=1 used to 502; now works.

Measured on Purple AI tenant:
  top-sources?days=7  cold 55.7s -> warm 13ms (~4300x)
  t  t  t  t  t  t  t  t  t    -> 4ms (cold) / 1.4ms (warm)
2026-05-22 20:10:03 +02:00
marc f82115143c Health Score: cap MITRE Coverage at 100% by canonicalising tactics
STAR rules sometimes label tactics with non-canonical names (e.g. 'Stealth',
'Defense Impairment') which were counted as distinct tactics on top of the
14 canonical ATT&CK Enterprise ones, producing percentages > 100%
(observed 15/14 = 107.1% on Purple AI tenant).

Fix in get_health_score():
  - Restrict covered_tactics to the 14 canonical ATT&CK Enterprise tactics.
  - Map known STAR aliases ('Stealth', 'Defense Impairment') -> 'Defense Evasion'.
  - Derive TOTAL_TACTICS from the canonical set (single source of truth).

Result: tactics_covered = 14, mitre_pct = 100.0 (was 15 / 107.1).
2026-05-22 19:41:48 +02:00
7 changed files with 504 additions and 20 deletions
+8
View File
@@ -45,6 +45,14 @@ with engine.connect() as _conn:
app = FastAPI(title="SIEM Toolkit", version="1.0.0")
@app.on_event("startup")
async def start_ingest_prewarmer():
"""Start optional background pre-warmer for the Ingest Dashboard cache.
Opt-in via INGEST_PREWARM=1. See backend/services/prewarmer.py."""
from services import prewarmer
prewarmer.start_if_enabled()
@app.on_event("startup")
async def auto_load_detections():
"""
+19 -2
View File
@@ -1098,7 +1098,21 @@ def _compute_health(db) -> dict:
parser_pct = round((covered_sources / total_sources * 100) if total_sources else 0.0, 1)
# --- MITRE coverage ---
TOTAL_TACTICS = 14 # standard ATT&CK Enterprise tactic count
# Standard ATT&CK Enterprise tactics (14).
CANONICAL_TACTICS = frozenset({
"Reconnaissance", "Resource Development", "Initial Access", "Execution",
"Persistence", "Privilege Escalation", "Defense Evasion", "Credential Access",
"Discovery", "Lateral Movement", "Collection", "Command and Control",
"Exfiltration", "Impact",
})
# SentinelOne STAR rules sometimes label tactics with non-canonical names.
# Map them to canonical ATT&CK so we don't over-count and exceed 100%.
TACTIC_ALIASES = {
"Stealth": "Defense Evasion",
"Defense Impairment": "Defense Evasion",
}
TOTAL_TACTICS = len(CANONICAL_TACTICS)
rules = db.query(ParsedRule).filter_by(rule_type="library").all()
total_rules = len(rules)
covered_tactics: set = set()
@@ -1114,7 +1128,10 @@ def _compute_health(db) -> dict:
if tactics or techniques:
rules_with_mitre += 1
for t in tactics:
if t and t != "Uncategorized":
if not t or t == "Uncategorized":
continue
t = TACTIC_ALIASES.get(t, t)
if t in CANONICAL_TACTICS:
covered_tactics.add(t)
for tech in techniques:
k = tech.get("id") or tech.get("name")
+69 -18
View File
@@ -2,9 +2,15 @@ from datetime import datetime, timedelta
from fastapi import APIRouter, Query, HTTPException
from pydantic import BaseModel
from services import s1_client
from services.async_cache import async_ttl_cache, cache_stats, cache_clear
router = APIRouter()
# Dashboard endpoints can be expensive on busy tenants. Cache results in-process
# for a short TTL so reloads and parallel widgets are instant. Pass ?nocache=1
# to bypass for a forced refresh.
_DASHBOARD_TTL_SECONDS = 300
def _date_range(days: int) -> tuple[str, str]:
now = datetime.utcnow()
@@ -22,41 +28,65 @@ def _date_range_hours(hours: int) -> tuple[str, str]:
)
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _top_sources_cached(hours: int) -> dict:
"""Cache key: hours only. days is normalised to hours upstream."""
from_dt, to_dt = _date_range_hours(hours)
query = "| group events=count() by dataSource.name | sort -events | limit 25"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/top-sources")
async def get_top_sources(
days: int = Query(None, ge=1, le=90),
hours: int = Query(None, ge=1, le=24),
hours: int = Query(None, ge=1, le=720),
nocache: bool = Query(False, description="Bypass dashboard cache"),
):
"""Top log sources by event count over the given period."""
if hours is not None:
from_dt, to_dt = _date_range_hours(hours)
period_label = f"{hours}h"
"""Top log sources by event count.
Note: SDL returns 'internal Scalyr error' when this query uses day-scale
timestamps on busy tenants, but the same window expressed in hours runs
fine. We normalise days -> hours internally for stability.
"""
if hours is None and days is None:
days = 7
if hours is None:
hours = days * 24
period_label = f"{days}d"
else:
from_dt, to_dt = _date_range(days or 7)
period_label = f"{days or 7}d"
query = "| group events=count() by dataSource.name | sort -events | limit 25"
period_label = f"{hours}h"
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt)
cached = await _top_sources_cached(hours, nocache=nocache)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
return {"period": period_label, "data": result.get("events", [])}
return {"period": period_label, "data": cached["data"]}
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _by_event_type_cached(days: int) -> dict:
# Same days->hours normalisation as top-sources for tenant stability.
from_dt, to_dt = _date_range_hours(days * 24)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
result = await s1_client.run_powerquery(query, from_dt, to_dt)
return {"data": result.get("events", [])}
@router.get("/by-event-type")
async def get_by_event_type(days: int = Query(7, ge=1, le=90)):
async def get_by_event_type(
days: int = Query(7, ge=1, le=90),
nocache: bool = Query(False),
):
"""Event counts grouped by source and event type."""
from_dt, to_dt = _date_range(days)
query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100"
try:
result = await s1_client.run_powerquery(query, from_dt, to_dt)
cached = await _by_event_type_cached(days, nocache=nocache)
except Exception as e:
raise HTTPException(502, f"PowerQuery error: {e}")
return {"period_days": days, "data": result.get("events", [])}
return {"period_days": days, "data": cached["data"]}
@router.get("/daily-volume")
async def get_daily_volume(days: int = Query(5, ge=1, le=7)):
"""Total event count per day — queries run in parallel."""
@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS)
async def _daily_volume_cached(days: int) -> list:
import asyncio
now = datetime.utcnow()
@@ -78,6 +108,27 @@ async def get_daily_volume(days: int = Query(5, ge=1, le=7)):
return list(reversed(results))
@router.get("/daily-volume")
async def get_daily_volume(
days: int = Query(5, ge=1, le=7),
nocache: bool = Query(False),
):
"""Total event count per day — queries run in parallel."""
return await _daily_volume_cached(days, nocache=nocache)
@router.get("/cache-stats")
def ingest_cache_stats():
"""Inspect dashboard cache (entry count + TTL remaining per key)."""
return cache_stats()
@router.delete("/cache")
def ingest_cache_clear():
"""Forcefully wipe the dashboard cache (next call refetches from SDL)."""
return {"cleared": cache_clear()}
class FilterRule(BaseModel):
source: str = ""
event_type: str = ""
+84
View File
@@ -0,0 +1,84 @@
"""Tiny in-process async TTL cache for backend endpoints.
Two-fold benefit:
* Identical concurrent calls share one upstream PowerQuery (single-flight).
* Repeat reads within TTL return instantly (no SDL round-trip).
Designed for read-only dashboard endpoints. Keep it stdlib-only so it adds
no dependency. Caches live until the process restarts.
Usage:
@async_ttl_cache(ttl_seconds=300)
async def get_top_sources(...): ...
"""
from __future__ import annotations
import asyncio
import functools
import time
from typing import Any, Awaitable, Callable, Tuple
# Maps cache-key -> (expires_at, value)
_STORE: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], Tuple[float, Any]] = {}
# Maps cache-key -> asyncio.Lock for single-flight
_LOCKS: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], asyncio.Lock] = {}
def _make_key(name: str, args: tuple, kwargs: dict) -> Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]]:
# Skip the special "nocache" kwarg so it doesn't fragment the cache.
kw = tuple(sorted((k, v) for k, v in kwargs.items() if k != "nocache"))
return (name, args, kw)
def async_ttl_cache(ttl_seconds: int = 300) -> Callable:
"""Decorator: cache an async function's result for ttl_seconds.
The wrapped function may accept an optional `nocache=True` kwarg to
bypass + refresh the cache for that call.
"""
def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]:
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
nocache = bool(kwargs.pop("nocache", False))
key = _make_key(fn.__qualname__, args, kwargs)
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
lock = _LOCKS.setdefault(key, asyncio.Lock())
async with lock:
# Re-check after acquiring lock — another caller may have populated.
if not nocache:
hit = _STORE.get(key)
if hit and hit[0] > time.monotonic():
return hit[1]
value = await fn(*args, **kwargs)
_STORE[key] = (time.monotonic() + ttl_seconds, value)
return value
return wrapper
return decorator
def cache_stats() -> dict:
"""Debug helper: return current cache entries (no values)."""
now = time.monotonic()
return {
"entries": len(_STORE),
"live": [
{"key": str(k), "ttl_remaining_s": round(v[0] - now, 1)}
for k, v in _STORE.items()
if v[0] > now
],
}
def cache_clear() -> int:
"""Wipe the cache; returns the number of entries removed."""
n = len(_STORE)
_STORE.clear()
return n
+97
View File
@@ -0,0 +1,97 @@
"""Background pre-warmer for the Ingest Dashboard cache.
Opt-in via env: INGEST_PREWARM=1
Tunable via env: INGEST_PREWARM_INTERVAL_SECONDS (default 240, just under TTL)
INGEST_PREWARM_HOURS (default "1,24,168")
INGEST_PREWARM_DAYS (default "7")
INGEST_PREWARM_DAILY_VOLUME_DAYS (default "5")
The pre-warmer re-runs the heavy Ingest Dashboard queries every ~4 min so the
in-process TTL cache is always populated. First user hit then returns from
cache (sub-millisecond) instead of waiting 30-60s for SDL.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
# Use the uvicorn logger so messages show up in `docker logs` alongside requests.
log = logging.getLogger("uvicorn.error")
_PREFIX = "prewarmer:"
def _flag_enabled() -> bool:
return os.environ.get("INGEST_PREWARM", "").lower() in ("1", "true", "yes", "on")
def _int_list(env: str, default: str) -> list[int]:
raw = os.environ.get(env, default)
out = []
for tok in raw.split(","):
tok = tok.strip()
if tok and tok.isdigit():
out.append(int(tok))
return out
async def _warm_once() -> dict:
"""Run all configured warm-up queries once. Returns timing summary."""
# Local import to avoid circular dependency with FastAPI router module.
from routers.ingest import (
_top_sources_cached,
_by_event_type_cached,
_daily_volume_cached,
)
hours_list = _int_list("INGEST_PREWARM_HOURS", "1,24,168")
days_list = _int_list("INGEST_PREWARM_DAYS", "7")
dv_days = _int_list("INGEST_PREWARM_DAILY_VOLUME_DAYS", "5") or [5]
tasks: list[tuple[str, asyncio.Task]] = []
for h in hours_list:
tasks.append((f"top-sources hours={h}",
asyncio.create_task(_top_sources_cached(h, nocache=True))))
for d in days_list:
tasks.append((f"by-event-type days={d}",
asyncio.create_task(_by_event_type_cached(d, nocache=True))))
for d in dv_days:
tasks.append((f"daily-volume days={d}",
asyncio.create_task(_daily_volume_cached(d, nocache=True))))
summary: dict[str, str] = {}
for label, task in tasks:
t0 = time.monotonic()
try:
await task
summary[label] = f"OK in {time.monotonic() - t0:.1f}s"
except Exception as e:
summary[label] = f"ERR ({e.__class__.__name__}: {str(e)[:120]})"
return summary
async def _loop():
interval = int(os.environ.get("INGEST_PREWARM_INTERVAL_SECONDS", "240"))
log.info("%s starting (interval=%ds)", _PREFIX, interval)
# Tiny initial delay so we don't compete with startup work.
await asyncio.sleep(5)
while True:
try:
summary = await _warm_once()
for label, status in summary.items():
log.info("%s %s -> %s", _PREFIX, label, status)
except asyncio.CancelledError:
raise
except Exception as e:
log.warning("%s cycle failed: %s", _PREFIX, e)
await asyncio.sleep(interval)
def start_if_enabled() -> asyncio.Task | None:
"""Spawn the pre-warm background task if INGEST_PREWARM is enabled.
Returns the task handle, or None if disabled."""
if not _flag_enabled():
log.info("%s disabled (set INGEST_PREWARM=1 to enable)", _PREFIX)
return None
log.info("%s scheduling background task", _PREFIX)
return asyncio.create_task(_loop(), name="ingest-prewarmer")
+5
View File
@@ -21,6 +21,11 @@ services:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- DATABASE_URL=postgresql://siem:siem@db:5432/siem
- DETECTIONS_FILE=/app/data/detections.json
- INGEST_PREWARM=${INGEST_PREWARM:-0}
- INGEST_PREWARM_HOURS=${INGEST_PREWARM_HOURS:-1,24,168}
- INGEST_PREWARM_DAYS=${INGEST_PREWARM_DAYS:-7}
- INGEST_PREWARM_DAILY_VOLUME_DAYS=${INGEST_PREWARM_DAILY_VOLUME_DAYS:-5}
- INGEST_PREWARM_INTERVAL_SECONDS=${INGEST_PREWARM_INTERVAL_SECONDS:-240}
depends_on:
db:
condition: service_healthy
+222
View File
@@ -0,0 +1,222 @@
#!/usr/bin/env bash
# tools/sync-upstream.sh
# Pull the latest changes from upstream (mickbrowns1/SIEM-Toolkit) while
# preserving the fork's improvements, then verify the fork invariants
# still hold. Designed to be safe to run repeatedly.
#
# Usage:
# ./tools/sync-upstream.sh # rebase (clean linear history)
# ./tools/sync-upstream.sh --merge # merge-commit instead of rebase
# ./tools/sync-upstream.sh --no-rebuild # skip docker rebuild + verify
# ./tools/sync-upstream.sh --no-push # don't auto-push at the end
# ./tools/sync-upstream.sh --dry-run # show what would happen
#
# Exit codes:
# 0 fully up-to-date or sync succeeded and all invariants pass
# 1 pre-condition failed (dirty tree, wrong remote, etc.)
# 2 merge / rebase conflicts (resolve manually, then re-run with --resume)
# 3 one or more fork invariants regressed after sync
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$REPO_DIR"
# --- defaults -----------------------------------------------------------
MODE=rebase
DO_REBUILD=1
DO_PUSH=1
DRY_RUN=0
UPSTREAM_REMOTE="${UPSTREAM_REMOTE:-upstream}"
UPSTREAM_BRANCH="${UPSTREAM_BRANCH:-main}"
ORIGIN_REMOTE="${ORIGIN_REMOTE:-origin}"
BACKEND_URL="${BACKEND_URL:-http://localhost:8001}"
BACKEND_CONTAINER="${BACKEND_CONTAINER:-siem-toolkit-patched-backend-1}"
while [[ $# -gt 0 ]]; do
case "$1" in
--merge) MODE=merge ;;
--no-rebuild) DO_REBUILD=0 ;;
--no-push) DO_PUSH=0 ;;
--dry-run) DRY_RUN=1; DO_REBUILD=0; DO_PUSH=0 ;;
-h|--help)
sed -n '2,/^$/p' "$0" | sed 's/^# \{0,1\}//'
exit 0 ;;
*) echo "unknown arg: $1" >&2; exit 1 ;;
esac
shift
done
bold() { printf '\033[1m%s\033[0m\n' "$*"; }
red() { printf '\033[31m%s\033[0m\n' "$*"; }
green(){ printf '\033[32m%s\033[0m\n' "$*"; }
yellow(){ printf '\033[33m%s\033[0m\n' "$*"; }
# --- 1. pre-conditions --------------------------------------------------
bold "== 1. pre-conditions =="
if ! git remote get-url "$UPSTREAM_REMOTE" >/dev/null 2>&1; then
red "no '$UPSTREAM_REMOTE' remote configured. Add with:"
echo " git remote add upstream https://github.com/mickbrowns1/SIEM-Toolkit.git"
exit 1
fi
echo " upstream remote : $(git remote get-url "$UPSTREAM_REMOTE")"
echo " origin remote : $(git remote get-url "$ORIGIN_REMOTE")"
if [[ -n "$(git status --porcelain)" ]]; then
red "working tree is not clean. Commit or stash changes first:"
git status -s
exit 1
fi
green " working tree clean"
CUR_BRANCH=$(git rev-parse --abbrev-ref HEAD)
echo " current branch : $CUR_BRANCH"
# --- 2. snapshot --------------------------------------------------------
SAFETY_TAG="safety/$(date +%Y%m%d-%H%M%S)"
bold "== 2. safety tag =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would create tag $SAFETY_TAG"
else
git tag "$SAFETY_TAG"
echo " created $SAFETY_TAG"
fi
# --- 3. fetch upstream --------------------------------------------------
bold "== 3. fetch upstream =="
git fetch "$UPSTREAM_REMOTE" --quiet
echo " fetched ${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
HEAD_SHA=$(git rev-parse HEAD)
UP_SHA=$(git rev-parse "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
MB=$(git merge-base HEAD "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
NEW_COUNT=$(git rev-list --count "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}")
OUR_COUNT=$(git rev-list --count "${MB}..HEAD")
echo " HEAD : $HEAD_SHA"
echo " upstream/$UPSTREAM_BRANCH : $UP_SHA"
echo " merge-base : $MB"
echo " upstream commits : $NEW_COUNT new"
echo " our commits ahead : $OUR_COUNT"
if [[ "$NEW_COUNT" == 0 ]]; then
green "== already current with upstream =="
NEW_SYNC=0
else
NEW_SYNC=1
bold "-- new upstream commits --"
git log --oneline "${MB}..${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"
fi
# --- 4. apply (rebase or merge) ----------------------------------------
if [[ "$NEW_SYNC" == 1 ]]; then
bold "== 4. applying upstream changes ($MODE) =="
if [[ "$DRY_RUN" == 1 ]]; then
echo " [dry-run] would $MODE $UPSTREAM_REMOTE/$UPSTREAM_BRANCH into $CUR_BRANCH"
else
if [[ "$MODE" == "rebase" ]]; then
if ! git rebase "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}"; then
red "rebase has conflicts."
echo "Resolve, then run: git rebase --continue"
echo "Or abort with : git rebase --abort"
echo "Recover snapshot : git reset --hard $SAFETY_TAG"
exit 2
fi
else
if ! git merge --no-ff "${UPSTREAM_REMOTE}/${UPSTREAM_BRANCH}" \
-m "Sync upstream $(date +%Y-%m-%d)"; then
red "merge has conflicts."
echo "Resolve, then commit. Recover with: git reset --hard $SAFETY_TAG"
exit 2
fi
fi
green " ${MODE} succeeded"
fi
fi
# --- 5. rebuild + verify invariants ------------------------------------
if [[ "$DO_REBUILD" == 1 ]]; then
bold "== 5. rebuild backend + run invariants =="
docker compose up -d --force-recreate --build backend 2>&1 | tail -5
echo " waiting 15s for startup..."
sleep 15
FAILS=0
check() {
local label="$1" cmd="$2" expect="$3"
local got
got="$(eval "$cmd" 2>/dev/null || echo '<error>')"
if [[ "$got" == "$expect" ]]; then
green " PASS $label ($got)"
else
red " FAIL $label expected='$expect' got='$got'"
FAILS=$((FAILS + 1))
fi
}
# Invariant 1: Parser dropdown excludes ueba_* artefacts (fix 70f3f83)
check "parser dropdown excludes ueba_*" \
"curl -fsS $BACKEND_URL/api/quality/parsers | python3 -c 'import sys,json; d=json.load(sys.stdin); print(sum(1 for p in d[\"parsers\"] if p.lower().startswith(\"ueba\")))'" \
"0"
# Invariant 2: MITRE coverage is <= 100 (fix f821151)
check "mitre_pct <= 100" \
"curl -fsS $BACKEND_URL/api/coverage/health | python3 -c 'import sys,json; d=json.load(sys.stdin); print(d[\"mitre_pct\"] <= 100)'" \
"True"
# Invariant 3: ingest cache endpoints exist (fix 0a01a56)
check "/api/ingest/cache-stats exists" \
"curl -fsS -o /dev/null -w '%{http_code}' $BACKEND_URL/api/ingest/cache-stats" \
"200"
# Invariant 4: /sample-unlabelled is registered as a POST route (port from
# upstream sync). GET to it should return 405 Method Not Allowed (route
# exists, wrong method) rather than 404 (route missing).
# Note: -f is omitted because 405 is the expected non-2xx status here.
check "/api/quality/sample-unlabelled registered" \
"curl -sS -o /dev/null -w '%{http_code}' -X GET $BACKEND_URL/api/quality/sample-unlabelled" \
"405"
# Invariant 5: prewarmer scheduled (fix fec3568) — only if INGEST_PREWARM=1.
# Poll up to 30s because the task logs 'starting' a few seconds after the
# FastAPI startup phase finishes (postgres + lib autoload first).
if grep -q '^INGEST_PREWARM=1' .env 2>/dev/null; then
prewarm_ok=0
for _ in 1 2 3 4 5 6; do
if docker logs "$BACKEND_CONTAINER" 2>&1 | grep -q 'prewarmer:.*starting'; then
prewarm_ok=1; break
fi
sleep 5
done
if [[ "$prewarm_ok" == 1 ]]; then
green " PASS prewarmer started"
else
red " FAIL prewarmer did not log 'starting' within 30s (INGEST_PREWARM=1 but task missing)"
FAILS=$((FAILS + 1))
fi
else
yellow " SKIP prewarmer (INGEST_PREWARM not enabled in .env)"
fi
if [[ "$FAILS" -gt 0 ]]; then
red "== $FAILS invariant(s) regressed after sync =="
echo "Recover the pre-sync state with: git reset --hard $SAFETY_TAG"
exit 3
fi
green " all invariants pass"
fi
# --- 6. push -----------------------------------------------------------
if [[ "$DO_PUSH" == 1 && "$NEW_SYNC" == 1 ]]; then
bold "== 6. push to $ORIGIN_REMOTE/$CUR_BRANCH =="
git push "$ORIGIN_REMOTE" "$CUR_BRANCH" --force-with-lease
green " pushed"
fi
bold "== done =="
echo " branch : $CUR_BRANCH"
echo " HEAD : $(git rev-parse --short HEAD)"
echo " safety snapshot: $SAFETY_TAG (delete with: git tag -d $SAFETY_TAG)"