diff --git a/backend/routers/ingest.py b/backend/routers/ingest.py index a665731..034ed7d 100644 --- a/backend/routers/ingest.py +++ b/backend/routers/ingest.py @@ -2,9 +2,15 @@ from datetime import datetime, timedelta from fastapi import APIRouter, Query, HTTPException from pydantic import BaseModel from services import s1_client +from services.async_cache import async_ttl_cache, cache_stats, cache_clear router = APIRouter() +# Dashboard endpoints can be expensive on busy tenants. Cache results in-process +# for a short TTL so reloads and parallel widgets are instant. Pass ?nocache=1 +# to bypass for a forced refresh. +_DASHBOARD_TTL_SECONDS = 300 + def _date_range(days: int) -> tuple[str, str]: now = datetime.utcnow() @@ -22,41 +28,65 @@ def _date_range_hours(hours: int) -> tuple[str, str]: ) +@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS) +async def _top_sources_cached(hours: int) -> dict: + """Cache key: hours only. days is normalised to hours upstream.""" + from_dt, to_dt = _date_range_hours(hours) + query = "| group events=count() by dataSource.name | sort -events | limit 25" + result = await s1_client.run_powerquery(query, from_dt, to_dt) + return {"data": result.get("events", [])} + + @router.get("/top-sources") async def get_top_sources( days: int = Query(None, ge=1, le=90), - hours: int = Query(None, ge=1, le=24), + hours: int = Query(None, ge=1, le=720), + nocache: bool = Query(False, description="Bypass dashboard cache"), ): - """Top log sources by event count over the given period.""" - if hours is not None: - from_dt, to_dt = _date_range_hours(hours) - period_label = f"{hours}h" + """Top log sources by event count. + + Note: SDL returns 'internal Scalyr error' when this query uses day-scale + timestamps on busy tenants, but the same window expressed in hours runs + fine. We normalise days -> hours internally for stability. + """ + if hours is None and days is None: + days = 7 + if hours is None: + hours = days * 24 + period_label = f"{days}d" else: - from_dt, to_dt = _date_range(days or 7) - period_label = f"{days or 7}d" - query = "| group events=count() by dataSource.name | sort -events | limit 25" + period_label = f"{hours}h" try: - result = await s1_client.run_powerquery(query, from_dt, to_dt) + cached = await _top_sources_cached(hours, nocache=nocache) except Exception as e: raise HTTPException(502, f"PowerQuery error: {e}") - return {"period": period_label, "data": result.get("events", [])} + return {"period": period_label, "data": cached["data"]} + + +@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS) +async def _by_event_type_cached(days: int) -> dict: + # Same days->hours normalisation as top-sources for tenant stability. + from_dt, to_dt = _date_range_hours(days * 24) + query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100" + result = await s1_client.run_powerquery(query, from_dt, to_dt) + return {"data": result.get("events", [])} @router.get("/by-event-type") -async def get_by_event_type(days: int = Query(7, ge=1, le=90)): +async def get_by_event_type( + days: int = Query(7, ge=1, le=90), + nocache: bool = Query(False), +): """Event counts grouped by source and event type.""" - from_dt, to_dt = _date_range(days) - query = "| group events=count() by dataSource.name, event.type | sort -events | limit 100" try: - result = await s1_client.run_powerquery(query, from_dt, to_dt) + cached = await _by_event_type_cached(days, nocache=nocache) except Exception as e: raise HTTPException(502, f"PowerQuery error: {e}") - return {"period_days": days, "data": result.get("events", [])} + return {"period_days": days, "data": cached["data"]} -@router.get("/daily-volume") -async def get_daily_volume(days: int = Query(5, ge=1, le=7)): - """Total event count per day — queries run in parallel.""" +@async_ttl_cache(ttl_seconds=_DASHBOARD_TTL_SECONDS) +async def _daily_volume_cached(days: int) -> list: import asyncio now = datetime.utcnow() @@ -78,6 +108,27 @@ async def get_daily_volume(days: int = Query(5, ge=1, le=7)): return list(reversed(results)) +@router.get("/daily-volume") +async def get_daily_volume( + days: int = Query(5, ge=1, le=7), + nocache: bool = Query(False), +): + """Total event count per day — queries run in parallel.""" + return await _daily_volume_cached(days, nocache=nocache) + + +@router.get("/cache-stats") +def ingest_cache_stats(): + """Inspect dashboard cache (entry count + TTL remaining per key).""" + return cache_stats() + + +@router.delete("/cache") +def ingest_cache_clear(): + """Forcefully wipe the dashboard cache (next call refetches from SDL).""" + return {"cleared": cache_clear()} + + class FilterRule(BaseModel): source: str = "" event_type: str = "" diff --git a/backend/services/async_cache.py b/backend/services/async_cache.py new file mode 100644 index 0000000..b019067 --- /dev/null +++ b/backend/services/async_cache.py @@ -0,0 +1,84 @@ +"""Tiny in-process async TTL cache for backend endpoints. + +Two-fold benefit: + * Identical concurrent calls share one upstream PowerQuery (single-flight). + * Repeat reads within TTL return instantly (no SDL round-trip). + +Designed for read-only dashboard endpoints. Keep it stdlib-only so it adds +no dependency. Caches live until the process restarts. + +Usage: + @async_ttl_cache(ttl_seconds=300) + async def get_top_sources(...): ... +""" + +from __future__ import annotations +import asyncio +import functools +import time +from typing import Any, Awaitable, Callable, Tuple + + +# Maps cache-key -> (expires_at, value) +_STORE: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], Tuple[float, Any]] = {} +# Maps cache-key -> asyncio.Lock for single-flight +_LOCKS: dict[Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]], asyncio.Lock] = {} + + +def _make_key(name: str, args: tuple, kwargs: dict) -> Tuple[str, Tuple[Any, ...], Tuple[Tuple[str, Any], ...]]: + # Skip the special "nocache" kwarg so it doesn't fragment the cache. + kw = tuple(sorted((k, v) for k, v in kwargs.items() if k != "nocache")) + return (name, args, kw) + + +def async_ttl_cache(ttl_seconds: int = 300) -> Callable: + """Decorator: cache an async function's result for ttl_seconds. + + The wrapped function may accept an optional `nocache=True` kwarg to + bypass + refresh the cache for that call. + """ + def decorator(fn: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]: + @functools.wraps(fn) + async def wrapper(*args, **kwargs): + nocache = bool(kwargs.pop("nocache", False)) + key = _make_key(fn.__qualname__, args, kwargs) + + if not nocache: + hit = _STORE.get(key) + if hit and hit[0] > time.monotonic(): + return hit[1] + + lock = _LOCKS.setdefault(key, asyncio.Lock()) + async with lock: + # Re-check after acquiring lock — another caller may have populated. + if not nocache: + hit = _STORE.get(key) + if hit and hit[0] > time.monotonic(): + return hit[1] + + value = await fn(*args, **kwargs) + _STORE[key] = (time.monotonic() + ttl_seconds, value) + return value + + return wrapper + return decorator + + +def cache_stats() -> dict: + """Debug helper: return current cache entries (no values).""" + now = time.monotonic() + return { + "entries": len(_STORE), + "live": [ + {"key": str(k), "ttl_remaining_s": round(v[0] - now, 1)} + for k, v in _STORE.items() + if v[0] > now + ], + } + + +def cache_clear() -> int: + """Wipe the cache; returns the number of entries removed.""" + n = len(_STORE) + _STORE.clear() + return n