Files
nox-project-nox-framework/sources/helpers/scanner.py
T
2026-04-07 10:17:43 +02:00

526 lines
23 KiB
Python

"""
sources/helpers/scanner.py
Recursive Avalanche Engine for NOX autoscan.
Pipeline per asset (sequential phases):
Phase 1 — Breach scan
Phase 2 — Hash crack (non-blocking, on breach results)
Phase 3 — Dork
Phase 4 — Scrape
→ Harvest new identifiers from all phases
→ Reinject every new unique identifier (not seen before) recursively
"""
import asyncio
import logging
import re
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
if TYPE_CHECKING:
from nox import Orchestrator
_syslog = logging.getLogger("nox.system")
_EMAIL_RE = re.compile(r"[\w.+-]+@[\w-]+\.[\w.]+")
_USERNAME_RE = re.compile(r"(?:github\.com|twitter\.com|linkedin\.com/in|reddit\.com/u)/([A-Za-z0-9_.-]{3,39})", re.I)
_PHONE_RE = re.compile(r"\+\d[\d\s.\-()]{7,14}\d|\b\d{3}[\s.\-]\d{3}[\s.\-]\d{4}\b")
_NAME_RE = re.compile(r"\b([A-Z][a-z]{1,20}(?:\s+[A-Z][a-z]{1,20}){1,3})\b")
_DORK_LIMIT = 20
_PIVOT_TYPES = {"email", "username", "phone", "name", "ip", "domain"}
def _cfg_depth(orc=None) -> int:
# A7/A10: read from orchestrator config if available
if orc is not None:
cfg = getattr(orc, "config", None)
if cfg is not None:
v = getattr(cfg, "pivot_depth", None)
if v is not None:
return int(v)
try:
from nox import Cfg # type: ignore
return Cfg.PIVOT_DEPTH
except ImportError:
return 2
def _cfg_concurrency(orc=None) -> int:
# A7: read from orchestrator config if available
if orc is not None:
cfg = getattr(orc, "config", None)
if cfg is not None:
v = getattr(cfg, "concurrency", None)
if v is not None:
return int(v)
try:
from nox import Cfg # type: ignore
return Cfg.CONCURRENCY
except ImportError:
return 15
def _out(level: str, msg: str) -> None:
try:
from nox import out as _nox_out # type: ignore
_nox_out(level, msg)
except Exception:
import sys
print(f"[{level}] {msg}", file=sys.stderr)
def _extract_ids_from_text(text: str, exclude: str = "") -> List[Tuple[str, str]]:
"""Extract pivotable identifiers from free text, excluding the current asset."""
found: List[Tuple[str, str]] = []
excl = exclude.lower()
for m in _EMAIL_RE.findall(text):
v = m.lower()
if v != excl:
found.append((v, "email"))
for m in _USERNAME_RE.findall(text):
v = m.lower()
if v != excl:
found.append((v, "username"))
for m in _PHONE_RE.findall(text):
clean = re.sub(r"[\s.\-()]", "", m)
if 8 <= len(clean) <= 15 and clean != excl:
found.append((clean, "phone"))
for m in _NAME_RE.findall(text):
if len(m.split()) >= 2 and m.lower() != excl:
found.append((m, "name"))
return found
def _ids_from_records(records: list, exclude: str = "") -> List[Tuple[str, str, str]]:
"""
Extract pivotable identifiers from breach records.
Returns (value, qtype, ref) where ref is the source/breach name for logging.
"""
found: List[Tuple[str, str, str]] = []
excl = exclude.lower()
for r in records:
src = getattr(r, "source", "") or ""
breach = getattr(r, "breach_name", "") or src
for val, qtype in [
(getattr(r, "email", ""), "email"),
(getattr(r, "username", ""), "username"),
(getattr(r, "phone", ""), "phone"),
(getattr(r, "full_name", ""), "name"),
(getattr(r, "ip_address", ""), "ip"),
(getattr(r, "domain", ""), "domain"),
]:
if val and len(val) > 2 and val.lower() != excl:
found.append((val.strip(), qtype, breach))
meta = getattr(r, "metadata", {}) or {}
for em in meta.get("emails", []):
if em and em.lower() != excl:
found.append((em.lower(), "email", breach))
return found
# ── Pivot log entry schema ─────────────────────────────────────────────────
# {
# "asset": str, # identifier scanned
# "qtype": str, # email/username/phone/name/domain/ip
# "depth": int, # 0=seed, 1=first pivot, …
# "parent": str|None, # asset that discovered this one
# "found_in": str, # phase that found this asset: seed/breach/dork/scrape/hash_crack
# "records": int, # breach records found for this asset
# "dorks": int, # dork hits found for this asset
# "scrape": int, # scrape items found for this asset
# "children": List[dict], # [{asset, qtype, found_in, ref}] — new assets discovered
# "cracked": List[str], # plaintexts cracked from hashes in breach results
# }
class AvalancheScanner:
def __init__(self, orchestrator: "Orchestrator") -> None:
self._orc = orchestrator
self.seen_assets: Set[str] = set()
# A2: single semaphore for the entire run, created lazily inside the event loop
self._sem: Optional[asyncio.Semaphore] = None
self._all_records: List = []
self._dork_hits: List[dict] = []
self._seen_dork_urls: Set[str] = set()
# A6: scrape_hits merged atomically per _do_process call
self._scrape_hits: Dict = {"pastes": [], "credentials": [], "hashes": [],
"telegram": [], "dork_misconfigs": []}
self._max_depth: int = 0
self._in_flight: Dict[str, asyncio.Future] = {}
self.pivot_log: List[dict] = []
# A8: global set to prevent duplicate entries in discovered_assets
self._seen_discovered: Set[str] = set()
self.discovered_assets: List[dict] = []
def _get_sem(self) -> asyncio.Semaphore:
# A2: semaphore created once per run, shared across all coroutines
if self._sem is None:
self._sem = asyncio.Semaphore(_cfg_concurrency(self._orc))
return self._sem
async def run(self, target: str) -> tuple:
# A9: respect no_pivot flag from config
cfg = getattr(self._orc, "config", None)
no_pivot = getattr(cfg, "no_pivot", False) if cfg else False
if no_pivot:
try:
from nox import Detect # type: ignore
qtype = Detect.qtype(target)
except ImportError:
qtype = "email"
async with self._get_sem():
try:
records = await self._orc._full_async_scan(target, qtype)
except Exception:
records = []
self._all_records.extend(records)
self.seen_assets.add(target.lower().strip())
self.pivot_log.append({
"asset": target, "qtype": qtype, "depth": 0, "parent": None,
"found_in": "seed", "records": len(records), "dorks": 0,
"scrape": 0, "children": [], "cracked": [],
})
return self._all_records, self._dork_hits, self._scrape_hits
await self._process(target, depth=0, parent=None, found_in="seed")
return self._all_records, self._dork_hits, self._scrape_hits
def get_discovered_assets(self) -> List[dict]:
"""Return flat list of all discovered assets with full provenance."""
return self.discovered_assets
def get_max_depth(self) -> int:
return self._max_depth
# ── Dedup gate ────────────────────────────────────────────────────
async def _process(self, asset: str, depth: int,
parent: Optional[str], found_in: str) -> None:
"""Dedup gate: ensures each asset is processed exactly once."""
# A10: use per-run depth from orchestrator config
if depth > _cfg_depth(self._orc):
_syslog.debug("avalanche depth cap reached for %s", asset)
return
key = asset.lower().strip()
if not key:
return
# A1: add to seen_assets FIRST (atomic gate) before any other check.
# If already present, wait on the in-flight future if one exists, then return.
if key in self.seen_assets:
if key in self._in_flight:
try:
await self._in_flight[key]
except Exception:
pass
return
self.seen_assets.add(key)
# If already in-flight (shouldn't happen after the seen_assets check above,
# but guard defensively), wait and return.
if key in self._in_flight:
try:
await self._in_flight[key]
except Exception:
pass
return
loop = asyncio.get_running_loop()
fut: asyncio.Future = loop.create_future()
self._in_flight[key] = fut
try:
await self._do_process(asset, depth, parent, found_in)
finally:
if not fut.done():
fut.set_result(None)
# ── Core pipeline ─────────────────────────────────────────────────
async def _do_process(self, asset: str, depth: int,
parent: Optional[str], found_in: str) -> None:
"""
Sequential pipeline:
Phase 1 — Breach scan
Phase 2 — Hash crack (concurrent, non-blocking)
Phase 3 — Dork
Phase 4 — Scrape
→ Harvest all new identifiers with phase+ref annotation
→ Reinject every unseen identifier
"""
if depth > self._max_depth:
self._max_depth = depth
try:
from nox import Detect # type: ignore
qtype = Detect.qtype(asset)
except ImportError:
qtype = "email"
indent = " " * depth
_out("pivot" if depth > 0 else "info",
f"{indent}[depth={depth}] {'' if depth > 0 else ''} {asset} ({qtype})"
+ (f"{found_in} via {parent}" if parent else " [SEED]"))
_syslog.info("AVALANCHE asset=%s depth=%d parent=%s found_in=%s",
asset, depth, parent or "", found_in)
# ── Phase 1: Breach scan ──────────────────────────────────────
async with self._get_sem():
try:
records: List = await self._orc._full_async_scan(asset, qtype)
except Exception as exc:
_syslog.warning("BREACH_FAIL asset=%s err=%s", asset, exc)
records = []
_out("ok" if records else "dim",
f"{indent} [breach] {len(records)} records")
_syslog.info("BREACH_DONE asset=%s records=%d", asset, len(records))
self._all_records.extend(records)
# ── Phase 2: Hash crack (non-blocking) ────────────────────────
cracked_plaintexts: List[str] = []
try:
from sources.helpers.cracker import detect_hash # type: ignore
import aiohttp as _aio # type: ignore
async with _aio.ClientSession(connector=_aio.TCPConnector(limit=5)) as _cs:
crack_tasks = [
_crack_and_inject(_cs, getattr(r, "password_hash", ""), r,
self.seen_assets, self._all_records,
self, depth, asset, cracked_plaintexts)
for r in records
if getattr(r, "password_hash", "") and not getattr(r, "password", "")
and detect_hash(getattr(r, "password_hash", ""))
]
if crack_tasks:
await asyncio.gather(*crack_tasks, return_exceptions=True)
except ImportError:
pass
# ── Phase 3: Dork ─────────────────────────────────────────────
_out("info", f"{indent} [dork] querying for {asset}")
try:
dork_res = await self._async_dork(asset, qtype)
except Exception as exc:
_syslog.warning("DORK_FAIL asset=%s err=%s", asset, exc)
dork_res = []
dork_count = 0
for hit in (dork_res or [])[:_DORK_LIMIT]:
url = hit.get("url", "") or hit.get("title", "")
if url and url not in self._seen_dork_urls:
self._seen_dork_urls.add(url)
hit["pivot_asset"] = asset
hit["pivot_depth"] = depth
self._dork_hits.append(hit)
dork_count += 1
_out("ok" if dork_count else "dim",
f"{indent} [dork] {dork_count} hits")
_syslog.info("DORK_DONE asset=%s hits=%d", asset, dork_count)
# ── Phase 4: Scrape ───────────────────────────────────────────
_out("info", f"{indent} [scrape] querying for {asset}")
try:
scrape_res = await self._async_scrape(asset)
except Exception as exc:
_syslog.warning("SCRAPE_FAIL asset=%s err=%s", asset, exc)
scrape_res = {}
# A6: collect scrape results locally, then merge atomically
scrape_count = 0
local_scrape: Dict = {k: [] for k in self._scrape_hits}
for k in self._scrape_hits:
for item in (scrape_res or {}).get(k, []):
if isinstance(item, dict):
item["pivot_asset"] = asset
item["pivot_depth"] = depth
local_scrape[k].append(item)
scrape_count += 1
# Atomic merge into shared dict (single-threaded event loop — safe)
for k, items in local_scrape.items():
self._scrape_hits[k].extend(items)
_out("ok" if scrape_count else "dim",
f"{indent} [scrape] {scrape_count} items")
_syslog.info("SCRAPE_DONE asset=%s items=%d", asset, scrape_count)
# ── Harvest new identifiers with phase+ref annotation ─────────
# Each entry: (value, qtype, found_in_phase, ref)
new_ids: List[Tuple[str, str, str, str]] = []
# From breach records
for val, vqtype, ref in _ids_from_records(records, exclude=asset):
if vqtype in _PIVOT_TYPES:
new_ids.append((val, vqtype, "breach", ref))
# From dork hits
for hit in (dork_res or [])[:_DORK_LIMIT]:
url = hit.get("url", "")
dork = hit.get("dork", "")
ref = url or dork
text = f"{hit.get('title','')} {hit.get('snippet','')} {url} {dork}"
for val, vqtype in _extract_ids_from_text(text, exclude=asset):
if vqtype in _PIVOT_TYPES:
new_ids.append((val, vqtype, "dork", ref[:120]))
# From scrape results
for cred in (scrape_res or {}).get("credentials", []):
raw = cred.get("raw", "")
ref = f"paste:{cred.get('paste_id','')}" or cred.get("source", "scrape")
for val, vqtype in _extract_ids_from_text(raw, exclude=asset):
if vqtype in _PIVOT_TYPES:
new_ids.append((val, vqtype, "scrape", ref))
for tg in (scrape_res or {}).get("telegram", []):
ref = f"t.me/{tg.get('channel','')}"
for val, vqtype in _extract_ids_from_text(tg.get("text", ""), exclude=asset):
if vqtype in _PIVOT_TYPES:
new_ids.append((val, vqtype, "scrape", ref))
for mc in (scrape_res or {}).get("dork_misconfigs", []):
ref = mc.get("url", mc.get("title", "misconfig"))
for val, vqtype in _extract_ids_from_text(
f"{mc.get('title','')} {mc.get('snippet','')}", exclude=asset):
if vqtype in _PIVOT_TYPES:
new_ids.append((val, vqtype, "scrape", ref[:120]))
# ── Deduplicate and queue children ────────────────────────────
children: List[dict] = []
child_tasks = []
queued: Set[str] = set()
for val, vqtype, phase, ref in new_ids:
child_key = val.lower().strip()
if not child_key or child_key in self.seen_assets or child_key in queued:
continue
queued.add(child_key)
child_entry = {"asset": val, "qtype": vqtype, "found_in": phase, "ref": ref}
children.append(child_entry)
# A8: prevent duplicate entries in discovered_assets across parallel parents
if child_key not in self._seen_discovered:
self._seen_discovered.add(child_key)
self.discovered_assets.append({
"asset": val,
"qtype": vqtype,
"phase": phase,
"ref": ref,
"parent": asset,
"depth": depth + 1,
})
_out("pivot",
f"{indent} ↳ new asset [{phase}]: {val} ({vqtype}) ref: {ref[:60]}")
_syslog.info("PIVOT_QUEUE asset=%s qtype=%s phase=%s ref=%s parent=%s depth=%d",
val, vqtype, phase, ref[:80], asset, depth + 1)
child_tasks.append(
self._process(val, depth + 1, parent=asset, found_in=phase)
)
# A5: run child tasks FIRST, then append pivot_log so the log reflects actual outcomes
if child_tasks:
_out("info", f"{indent} → reinjecting {len(child_tasks)} new asset(s)…")
await asyncio.gather(*child_tasks, return_exceptions=True)
# ── Log this node (after children complete — A5) ──────────────
self.pivot_log.append({
"asset": asset,
"qtype": qtype,
"depth": depth,
"parent": parent,
"found_in": found_in,
"records": len(records),
"dorks": dork_count,
"scrape": scrape_count,
"children": children,
"cracked": cracked_plaintexts or [],
})
# ── Dork dispatcher ───────────────────────────────────────────────
async def _async_dork(self, asset: str, qtype: str = "email") -> list:
try:
import aiohttp as _aio # type: ignore
import ssl as _ssl
connector = _aio.TCPConnector(limit=10, ssl=_ssl.create_default_context(), family=0)
async with _aio.ClientSession(connector=connector) as session:
recs = await self._orc.dorking_engine.async_search(session, asset, qtype)
return [
{
"url": r.raw_data.get("url", "") if hasattr(r, "raw_data") else "",
"title": r.raw_data.get("url", r.raw_data.get("dork", "")) if hasattr(r, "raw_data") else "",
"snippet": "",
"dork": r.raw_data.get("dork", "") if hasattr(r, "raw_data") else "",
"engine": "DDG",
}
for r in recs
]
except ImportError:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, self._orc.dork, asset)
return result if isinstance(result, list) else []
except Exception as exc:
_syslog.debug("DORK_ERR asset=%s err=%s", asset, exc)
return []
# ── Scrape dispatcher ─────────────────────────────────────────────
async def _async_scrape(self, asset: str) -> dict:
# A3: instantiate a fresh Session + ScrapeEngine per call to avoid sharing
# a non-thread-safe requests.Session / cloudscraper across concurrent coroutines.
_empty: dict = {"pastes": [], "credentials": [], "hashes": [],
"telegram": [], "dork_misconfigs": []}
try:
loop = asyncio.get_running_loop()
try:
from nox import Session, NoxConfig, ScrapeEngine # type: ignore
_cfg = getattr(self._orc, "config", None) or NoxConfig()
_session = Session(_cfg)
_engine = ScrapeEngine(_session, self._orc.db)
qtype = "email"
try:
from nox import Detect # type: ignore
qtype = Detect.qtype(asset)
except Exception:
pass
result = await loop.run_in_executor(None, _engine.run, asset, qtype)
except Exception:
result = await loop.run_in_executor(None, self._orc.scrape, asset)
return result if isinstance(result, dict) else _empty
except Exception as exc:
_syslog.debug("SCRAPE_ERR asset=%s err=%s", asset, exc)
return _empty
# ── Hash crack helper ──────────────────────────────────────────────────────
async def _crack_and_inject(session, hash_value: str, record_ref,
seen_assets: Set[str], all_records: list,
scanner: "AvalancheScanner",
depth: int, parent_asset: str,
cracked_out: List[str]) -> None:
from sources.helpers.cracker import detect_hash, async_crack, CRACK_TIMEOUT # type: ignore
hash_type = detect_hash(hash_value)
if not hash_type:
return
try:
plaintext = await asyncio.wait_for(
async_crack(session, hash_value, hash_type), timeout=CRACK_TIMEOUT)
except (asyncio.TimeoutError, Exception) as exc:
_syslog.debug("CRACK_FAIL hash=%s reason=%s", hash_value[:16], exc)
return
if not plaintext:
_syslog.debug("CRACK_FAIL hash=%s reason=no_result", hash_value[:16])
return
record_ref.password = plaintext
record_ref.hash_type = hash_type
if "Cracked" not in (record_ref.data_types or []):
record_ref.data_types = list(record_ref.data_types) + ["Cracked"]
_syslog.info("CRACK_OK hash=%s plain=%s parent=%s", hash_value[:16], plaintext, parent_asset)
_out("ok", f" [crack] {hash_value[:16]}… → {plaintext} (from {parent_asset})")
cracked_out.append(plaintext)
# A4: inject cracked plaintext as qtype="password" — NOT as username.
# Only pivot on it if sources support password-recycling queries.
key = plaintext.lower()
if key not in seen_assets and depth + 1 <= _cfg_depth(scanner._orc):
await scanner._process(plaintext, depth + 1,
parent=parent_asset, found_in="hash_crack")