mirror of
https://github.com/marcredhat/SIEM-toolkit-patched
synced 2026-06-10 05:17:18 +00:00
v0.1 Mick Marc merged
This commit is contained in:
@@ -0,0 +1,209 @@
|
||||
import re
|
||||
import json
|
||||
import yaml
|
||||
from typing import Set, List
|
||||
|
||||
_DS_PATTERN = re.compile(
|
||||
r"dataSource\.name\s*[=in]+\s*[\('\"]([^'\"),]+)['\")]",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
# STAR PowerQuery operators that follow a field name
|
||||
_STAR_OPS = [
|
||||
"ContainsCIS", "NotContainsCIS", "Contains", "NotContains",
|
||||
"StartsWith", "EndsWith", "In", "NotIn",
|
||||
"IsEmpty", "IsNotEmpty", "Matches", "NotMatches",
|
||||
"GreaterOrEqual", "LessOrEqual", "GreaterThan", "LessThan",
|
||||
"Between", "=", "!=",
|
||||
]
|
||||
_STAR_KEYWORD = {"and", "or", "not", "true", "false", "null"}
|
||||
_OP_PATTERN = re.compile(
|
||||
r"([\w.]+)\s*(?:" + "|".join(re.escape(op) for op in _STAR_OPS) + r")\b"
|
||||
r"|([\w.]+)\s*=", # also catch field= (no-space form used in subQuery strings)
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def extract_star_fields(query: str) -> Set[str]:
|
||||
"""Extract field names referenced in a STAR PowerQuery/subQuery string."""
|
||||
fields: Set[str] = set()
|
||||
for match in _OP_PATTERN.finditer(query):
|
||||
field = match.group(1) or match.group(2)
|
||||
if field and field.lower() not in _STAR_KEYWORD and not field[0].isdigit():
|
||||
fields.add(field)
|
||||
return fields
|
||||
|
||||
|
||||
def extract_sigma_fields(sigma_content: str) -> Set[str]:
|
||||
"""Extract field names from a Sigma rule YAML."""
|
||||
try:
|
||||
rule = yaml.safe_load(sigma_content)
|
||||
except Exception:
|
||||
return set()
|
||||
|
||||
fields: Set[str] = set()
|
||||
detection = rule.get("detection", {}) if isinstance(rule, dict) else {}
|
||||
|
||||
def _walk(node):
|
||||
if isinstance(node, dict):
|
||||
for key, val in node.items():
|
||||
if key == "condition":
|
||||
continue
|
||||
# Strip pipe modifiers: CommandLine|contains → CommandLine
|
||||
clean = key.split("|")[0]
|
||||
if clean and clean not in ("keywords",):
|
||||
fields.add(clean)
|
||||
_walk(val)
|
||||
elif isinstance(node, list):
|
||||
for item in node:
|
||||
_walk(item)
|
||||
|
||||
_walk(detection)
|
||||
return fields
|
||||
|
||||
|
||||
def extract_data_sources(texts: List[str]) -> List[str]:
|
||||
"""Extract dataSource.name values from a list of query strings."""
|
||||
sources: Set[str] = set()
|
||||
for text in texts:
|
||||
for match in _DS_PATTERN.finditer(text):
|
||||
sources.add(match.group(1).strip())
|
||||
return sorted(sources)
|
||||
|
||||
|
||||
_SDL_FIELD_PAT = re.compile(r'\$([a-zA-Z][a-zA-Z0-9._]*)(?:=[^$]*)?\$')
|
||||
_SDL_ATTR_KEY_PAT = re.compile(r'"([a-zA-Z][a-zA-Z0-9._]+)"\s*:')
|
||||
# Matches both quoted and unquoted output/to keys in rewrites:
|
||||
# output: "user.name" OR "output": "user.name"
|
||||
# "to": "src_endpoint.ip"
|
||||
_SDL_REWRITE_OUT_PAT = re.compile(
|
||||
r'(?:"output"|output|"to"|"replace")\s*:\s*"([a-zA-Z][a-zA-Z0-9._]+)"'
|
||||
)
|
||||
|
||||
|
||||
def extract_parser_fields_from_content(content: str) -> Set[str]:
|
||||
"""
|
||||
Extract output field names from SDL augmented-JSON parser content string.
|
||||
Handles:
|
||||
- $field.name$ and $field.name=pattern$ from format strings
|
||||
- "output": "field.name" and output: "field.name" from rewrites
|
||||
- quoted attribute keys from attributes{} blocks
|
||||
"""
|
||||
fields: Set[str] = set()
|
||||
|
||||
# Fields from format strings: $field.name$ or $field.name=pattern_var$
|
||||
for match in _SDL_FIELD_PAT.finditer(content):
|
||||
field = match.group(1)
|
||||
# Skip pattern variable names (no dot, short, all lowercase)
|
||||
if "." in field or field[0].isupper() or len(field) > 6:
|
||||
fields.add(field)
|
||||
|
||||
# Rewrite output targets: output: "field.name" / "output": "field.name"
|
||||
_skip_values = {"$0", "1", "2", "3", "4", "99"}
|
||||
for match in _SDL_REWRITE_OUT_PAT.finditer(content):
|
||||
val = match.group(1)
|
||||
if val not in _skip_values and "." in val:
|
||||
fields.add(val)
|
||||
|
||||
# Quoted attribute keys (skip single-word SDL builtins)
|
||||
_skip_keys = {"id", "format", "halt", "input", "output", "match", "replace",
|
||||
"timezone", "attribute", "attributes", "patterns", "formats",
|
||||
"rewrites", "type", "version"}
|
||||
for match in _SDL_ATTR_KEY_PAT.finditer(content):
|
||||
key = match.group(1)
|
||||
if key not in _skip_keys and ("." in key or len(key) > 8):
|
||||
fields.add(key)
|
||||
|
||||
return fields
|
||||
|
||||
|
||||
_SKIP_FIELD_NAMES = {
|
||||
"id", "format", "halt", "input", "output", "match", "replace",
|
||||
"timezone", "attribute", "attributes", "patterns", "formats",
|
||||
"rewrites", "type", "version", "source", "dataset", "predicate",
|
||||
"transformations", "mappings", "observables", "fields", "constant",
|
||||
"copy", "from", "to", "value", "field", "name",
|
||||
}
|
||||
|
||||
|
||||
def _extract_rewrite_fields(rewrites) -> Set[str]:
|
||||
"""Extract 'output' field names from a rewrites list."""
|
||||
fields: Set[str] = set()
|
||||
if not isinstance(rewrites, list):
|
||||
return fields
|
||||
for rw in rewrites:
|
||||
if not isinstance(rw, dict):
|
||||
continue
|
||||
# Standard SDL rewrite: {"input": "...", "output": "field.name"}
|
||||
out = rw.get("output") or rw.get("to")
|
||||
if out and isinstance(out, str) and "." in out and out not in _SKIP_FIELD_NAMES:
|
||||
fields.add(out)
|
||||
return fields
|
||||
|
||||
|
||||
def _walk_mappings(node) -> Set[str]:
|
||||
"""Recursively extract copy.to and constant.field from SDL mappings blocks."""
|
||||
fields: Set[str] = set()
|
||||
if isinstance(node, dict):
|
||||
# transformations copy: {"copy": {"from": "...", "to": "field.name"}}
|
||||
if "copy" in node and isinstance(node["copy"], dict):
|
||||
to = node["copy"].get("to")
|
||||
if to and isinstance(to, str) and "." in to:
|
||||
fields.add(to)
|
||||
# transformations constant: {"constant": {"value": ..., "field": "field.name"}}
|
||||
if "constant" in node and isinstance(node["constant"], dict):
|
||||
f = node["constant"].get("field")
|
||||
if f and isinstance(f, str) and "." in f:
|
||||
fields.add(f)
|
||||
for v in node.values():
|
||||
fields |= _walk_mappings(v)
|
||||
elif isinstance(node, list):
|
||||
for item in node:
|
||||
fields |= _walk_mappings(item)
|
||||
return fields
|
||||
|
||||
|
||||
def extract_parser_fields(parser_json: dict) -> Set[str]:
|
||||
"""
|
||||
Extract output field names from an SDL parser JSON dict.
|
||||
Handles: attributes lists, fields lists, mappings targets,
|
||||
rewrites[].output, rewrites[].to, copy.to, constant.field.
|
||||
"""
|
||||
fields: Set[str] = set()
|
||||
|
||||
# Legacy: attributes as list of {name: ...}
|
||||
for attr in parser_json.get("attributes", []):
|
||||
if isinstance(attr, dict) and "name" in attr:
|
||||
fields.add(attr["name"])
|
||||
|
||||
# Legacy: fields list
|
||||
for field in parser_json.get("fields", []):
|
||||
if isinstance(field, str):
|
||||
fields.add(field)
|
||||
elif isinstance(field, dict) and "name" in field:
|
||||
fields.add(field["name"])
|
||||
|
||||
# Legacy: flat mappings list with "target"
|
||||
for mapping in parser_json.get("mappings", []):
|
||||
if isinstance(mapping, dict) and "target" in mapping:
|
||||
fields.add(mapping["target"])
|
||||
|
||||
# SDL rewrites[].output in top-level formats[]
|
||||
for fmt in parser_json.get("formats", []):
|
||||
if isinstance(fmt, dict):
|
||||
fields |= _extract_rewrite_fields(fmt.get("rewrites", []))
|
||||
|
||||
# SDL mappings block (nested transformations with copy.to / constant.field)
|
||||
mappings_block = parser_json.get("mappings", {})
|
||||
if isinstance(mappings_block, dict):
|
||||
fields |= _walk_mappings(mappings_block)
|
||||
|
||||
# observables[].name
|
||||
for obs in parser_json.get("observables", {}).get("fields", []):
|
||||
if isinstance(obs, dict) and "name" in obs:
|
||||
n = obs["name"]
|
||||
if "." in n:
|
||||
fields.add(n)
|
||||
|
||||
return fields
|
||||
@@ -0,0 +1,344 @@
|
||||
import os
|
||||
import asyncio
|
||||
import httpx
|
||||
from datetime import datetime, timezone
|
||||
|
||||
BASE_URL = os.environ.get("S1_BASE_URL", "https://demo.sentinelone.net").rstrip("/")
|
||||
TOKEN = os.environ.get("S1_API_TOKEN", "")
|
||||
|
||||
# Scalyr/XDR PowerQuery credentials — from SDL_XDR_URL + SDL_LOG_READ_KEY
|
||||
# in the SentinelOne console: Settings → Integrations → Data Lake API Keys
|
||||
SDL_XDR_URL = os.environ.get("SDL_XDR_URL", "https://xdr.us1.sentinelone.net").rstrip("/")
|
||||
SDL_LOG_READ_KEY = os.environ.get("SDL_LOG_READ_KEY", "")
|
||||
|
||||
# SDL Configuration Read Key — used to list/fetch parser files under /logParsers/
|
||||
# (separate from SDL_LOG_READ_KEY which is for querying events only).
|
||||
# Find it in the S1 console: Settings → Integrations → Data Lake API Keys → Configuration Read.
|
||||
SDL_CONFIG_READ_KEY = os.environ.get("SDL_CONFIG_READ_KEY", "")
|
||||
|
||||
# Management Console API uses ApiToken auth
|
||||
HEADERS = {
|
||||
"Authorization": f"ApiToken {TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def _iso_to_epoch_ms(iso_str: str) -> int:
|
||||
"""Convert ISO-8601 UTC string to epoch milliseconds for Scalyr API."""
|
||||
dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
|
||||
return int(dt.timestamp() * 1000)
|
||||
|
||||
|
||||
async def get_star_rules(page_size: int = 100) -> list:
|
||||
"""Fetch custom STAR rules from /cloud-detection/rules, paginating via cursor."""
|
||||
all_rules = []
|
||||
cursor = None
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
while True:
|
||||
params = {"limit": page_size}
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/cloud-detection/rules",
|
||||
headers=HEADERS,
|
||||
params=params,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
all_rules.extend(body.get("data", []))
|
||||
cursor = body.get("pagination", {}).get("nextCursor")
|
||||
if not cursor:
|
||||
break
|
||||
return all_rules
|
||||
|
||||
|
||||
async def get_library_rules(page_size: int = 100) -> list:
|
||||
"""
|
||||
Fetch Detection Library (OOTB/Platform) rules from /web/api/v2.1/detection-library/rules.
|
||||
Requires an account-level or higher API token — site-scoped tokens will receive a 400.
|
||||
Returns an empty list gracefully if the token lacks sufficient scope.
|
||||
"""
|
||||
all_rules = []
|
||||
cursor = None
|
||||
async with httpx.AsyncClient(timeout=60) as client:
|
||||
while True:
|
||||
params: dict = {"limit": page_size}
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/detection-library/rules",
|
||||
headers=HEADERS,
|
||||
params=params,
|
||||
)
|
||||
# 400 typically means site-scoped token — return empty rather than crash
|
||||
if resp.status_code == 400:
|
||||
return []
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
batch = body.get("data", [])
|
||||
all_rules.extend(batch)
|
||||
cursor = body.get("pagination", {}).get("nextCursor")
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
results = []
|
||||
for rule in all_rules:
|
||||
results.append({
|
||||
"id": str(rule.get("id", "")),
|
||||
"name": rule.get("name", "unnamed"),
|
||||
"s1ql": rule.get("s1ql") or rule.get("query", ""),
|
||||
"queryType": rule.get("queryType", "events"),
|
||||
"severity": rule.get("severity", ""),
|
||||
"description": rule.get("description", ""),
|
||||
"gdlRuleId": rule.get("id", ""),
|
||||
"creator": "SentinelOne",
|
||||
"expirationMode": rule.get("expirationMode", "Permanent"),
|
||||
})
|
||||
return results
|
||||
|
||||
|
||||
async def run_powerquery(query: str, from_date: str, to_date: str) -> dict:
|
||||
"""
|
||||
Run a PowerQuery against the Singularity Data Lake via the Scalyr XDR API.
|
||||
Uses SDL_XDR_URL + SDL_LOG_READ_KEY (Scalyr readlog token).
|
||||
The Scalyr PowerQuery API is synchronous — results return in one request.
|
||||
"""
|
||||
if not SDL_LOG_READ_KEY:
|
||||
return {"events": [], "error": "SDL_LOG_READ_KEY not configured — add it to .env"}
|
||||
|
||||
start_ms = _iso_to_epoch_ms(from_date)
|
||||
end_ms = _iso_to_epoch_ms(to_date)
|
||||
|
||||
payload = {
|
||||
"token": SDL_LOG_READ_KEY,
|
||||
"query": query,
|
||||
"startTime": start_ms,
|
||||
"endTime": end_ms,
|
||||
"maxCount": 1000,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(timeout=120) as client:
|
||||
for attempt in range(3):
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{SDL_XDR_URL}/api/powerQuery",
|
||||
json=payload,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
break
|
||||
except httpx.HTTPStatusError as e:
|
||||
if e.response.status_code == 429 and attempt < 2:
|
||||
await asyncio.sleep(10 * (attempt + 1))
|
||||
continue
|
||||
raise RuntimeError(
|
||||
f"HTTP {e.response.status_code} from {e.request.url}: {e.response.text[:500]}"
|
||||
) from e
|
||||
|
||||
data = resp.json()
|
||||
status = data.get("status", "")
|
||||
|
||||
if status != "success":
|
||||
# Return full response as error detail for debugging
|
||||
return {"events": [], "error": f"PowerQuery status={status}: {str(data)[:400]}"}
|
||||
|
||||
# Scalyr PowerQuery returns: {"status":"success","columns":[{"name":"..."},...], "values":[[...],...],...}
|
||||
raw_cols = data.get("columns", [])
|
||||
values = data.get("values", [])
|
||||
|
||||
if raw_cols and values:
|
||||
# columns may be list of strings or list of {"name":...} dicts
|
||||
col_names = [
|
||||
c["name"] if isinstance(c, dict) else c
|
||||
for c in raw_cols
|
||||
]
|
||||
rows = [dict(zip(col_names, row)) for row in values]
|
||||
return {"events": rows}
|
||||
|
||||
# Fallback: return raw matches array
|
||||
matches = data.get("matches", [])
|
||||
return {"events": matches}
|
||||
|
||||
|
||||
def _sdl_config_headers() -> dict:
|
||||
"""Auth headers for the SDL Configuration File API (uses POST /api/listFiles,
|
||||
POST /api/getFile, etc.). Falls back to SDL_LOG_READ_KEY if no dedicated
|
||||
Configuration Read key is set — that won't work for all endpoints, but lets
|
||||
callers fail with a meaningful 401 instead of crashing."""
|
||||
key = SDL_CONFIG_READ_KEY or SDL_LOG_READ_KEY
|
||||
return {
|
||||
"Authorization": f"Bearer {key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
async def list_sdl_parsers() -> list[str]:
|
||||
"""List parser paths under /logParsers/ via the SDL Configuration File API.
|
||||
|
||||
Requires SDL_CONFIG_READ_KEY (or higher) in .env. The endpoint is
|
||||
POST <SDL_XDR_URL>/api/listFiles with {"pathPrefix": "/logParsers/"}.
|
||||
Returns names without the /logParsers/ prefix, suitable for use as
|
||||
filenames in the local parsers/ directory.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(
|
||||
f"{SDL_XDR_URL}/api/listFiles",
|
||||
headers=_sdl_config_headers(),
|
||||
json={"pathPrefix": "/logParsers/"},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
paths = data.get("paths") or data.get("files") or []
|
||||
# Normalize: strip leading /logParsers/ and ignore anything that isn't there
|
||||
names: list[str] = []
|
||||
for p in paths:
|
||||
if isinstance(p, dict):
|
||||
p = p.get("path") or p.get("name") or ""
|
||||
if isinstance(p, str) and p.startswith("/logParsers/"):
|
||||
names.append(p[len("/logParsers/"):])
|
||||
return names
|
||||
|
||||
|
||||
async def list_sdl_parsers_legacy() -> list[str]:
|
||||
"""[Deprecated] Legacy management-console path — kept for reference but unused."""
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/api/v1/files/logParsers",
|
||||
headers=HEADERS,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
# Response is a list of file objects or a dict with 'files' key
|
||||
if isinstance(data, list):
|
||||
return [f.get("name") or f.get("path", "") for f in data if isinstance(f, dict)]
|
||||
return [f.get("name") or f.get("path", "") for f in data.get("files", [])]
|
||||
|
||||
|
||||
async def get_sdl_parser(filename: str) -> dict:
|
||||
"""Fetch a single SDL parser file by name via POST /api/getFile.
|
||||
|
||||
Returns the raw SDL response dict, e.g.
|
||||
{"status": "success", "path": "/logParsers/Foo", "content": "...", "version": 3, ...}
|
||||
"""
|
||||
path = filename if filename.startswith("/logParsers/") else f"/logParsers/{filename}"
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(
|
||||
f"{SDL_XDR_URL}/api/getFile",
|
||||
headers=_sdl_config_headers(),
|
||||
json={"path": path},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
async def get_account_id() -> str | None:
|
||||
"""Return the first account ID visible to the current token.
|
||||
|
||||
Tries /accounts first (works for account-scoped or higher tokens). If that
|
||||
returns 403 (site-scoped token), falls back to /sites and reads accountId
|
||||
from the first site.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
# Path 1: account-scoped token
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/accounts",
|
||||
headers=HEADERS,
|
||||
params={"limit": 1},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
accounts = resp.json().get("data", [])
|
||||
if accounts:
|
||||
return str(accounts[0]["id"])
|
||||
# Path 2: site-scoped token — accountId is embedded in sites payload
|
||||
if resp.status_code in (401, 403):
|
||||
sresp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/sites",
|
||||
headers=HEADERS,
|
||||
params={"limit": 1},
|
||||
)
|
||||
if sresp.status_code == 200:
|
||||
data = sresp.json().get("data", {})
|
||||
sites = data.get("sites") if isinstance(data, dict) else data
|
||||
if sites:
|
||||
return str(sites[0].get("accountId") or "") or None
|
||||
return None
|
||||
|
||||
|
||||
async def get_scope_for_platform_rules() -> tuple[str, str] | None:
|
||||
"""Pick the best scope for /detection-library/platform-rules.
|
||||
|
||||
Returns (scopeLevel, scopeId). Tries account first, then site — site-scoped
|
||||
tokens cannot list accounts but CAN query platform-rules with site scope.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
# Prefer account scope (broadest)
|
||||
a = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/accounts",
|
||||
headers=HEADERS,
|
||||
params={"limit": 1},
|
||||
)
|
||||
if a.status_code == 200:
|
||||
accounts = a.json().get("data", [])
|
||||
if accounts:
|
||||
return ("account", str(accounts[0]["id"]))
|
||||
# Fall back to site scope (site-scoped tokens land here)
|
||||
s = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/sites",
|
||||
headers=HEADERS,
|
||||
params={"limit": 1},
|
||||
)
|
||||
if s.status_code == 200:
|
||||
data = s.json().get("data", {})
|
||||
sites = data.get("sites") if isinstance(data, dict) else data
|
||||
if sites:
|
||||
sid = sites[0].get("id")
|
||||
if sid:
|
||||
return ("site", str(sid))
|
||||
return None
|
||||
|
||||
|
||||
async def get_platform_rules(page_size: int = 1000) -> list:
|
||||
"""
|
||||
Fetch all Detection Library platform rules from /detection-library/platform-rules.
|
||||
Requires scopeLevel + scopeId. Tries account scope first, then site scope so
|
||||
site-scoped tokens also work.
|
||||
"""
|
||||
scope = await get_scope_for_platform_rules()
|
||||
if not scope:
|
||||
return []
|
||||
scope_level, scope_id = scope
|
||||
|
||||
all_rules: list = []
|
||||
cursor: str = ""
|
||||
async with httpx.AsyncClient(timeout=60) as client:
|
||||
while True:
|
||||
params: dict = {
|
||||
"scopeLevel": scope_level,
|
||||
"scopeId": scope_id,
|
||||
"limit": page_size,
|
||||
"cursor": cursor,
|
||||
}
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/detection-library/platform-rules",
|
||||
headers=HEADERS,
|
||||
params=params,
|
||||
)
|
||||
if resp.status_code == 400:
|
||||
return []
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
all_rules.extend(body.get("data", []))
|
||||
cursor = body.get("pagination", {}).get("nextCursor") or ""
|
||||
if not cursor:
|
||||
break
|
||||
return all_rules
|
||||
|
||||
|
||||
async def get_sites() -> list:
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.get(
|
||||
f"{BASE_URL}/web/api/v2.1/sites",
|
||||
headers=HEADERS,
|
||||
params={"limit": 100},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("data", {}).get("sites", [])
|
||||
Reference in New Issue
Block a user