mirror of
https://github.com/nox-project/nox-framework.git
synced 2026-06-08 16:07:17 +00:00
91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
"""tests/test_scanner.py — Unit tests for AvalancheScanner dedup and depth cap."""
|
|
import asyncio
|
|
import sys, os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from sources.helpers.scanner import AvalancheScanner, _extract_ids_from_text as _extract_new_ids, _ids_from_records
|
|
|
|
|
|
# ── _extract_new_ids ──────────────────────────────────────────────────
|
|
|
|
def test_extract_email():
|
|
ids = _extract_new_ids("contact user@example.com for info")
|
|
assert ("user@example.com", "email") in ids
|
|
|
|
|
|
def test_extract_username_from_github():
|
|
ids = _extract_new_ids("see github.com/johndoe for code")
|
|
assert ("johndoe", "username") in ids
|
|
|
|
|
|
def test_extract_no_false_positives():
|
|
ids = _extract_new_ids("no identifiers here at all")
|
|
assert ids == []
|
|
|
|
|
|
# ── seen_assets dedup ─────────────────────────────────────────────────
|
|
|
|
class _FakeOrchestrator:
|
|
"""Minimal orchestrator stub — records how many times each asset is scanned."""
|
|
def __init__(self):
|
|
self.scan_calls = []
|
|
self.dorking_engine = _FakeDorkingEngine()
|
|
|
|
async def _full_async_scan(self, asset, qtype):
|
|
self.scan_calls.append(asset)
|
|
return []
|
|
|
|
def dork(self, asset, query_type=None):
|
|
return []
|
|
|
|
def scrape(self, asset, query_type=None):
|
|
return {"pastes": [], "credentials": [], "hashes": [], "telegram": [], "dork_misconfigs": []}
|
|
|
|
|
|
class _FakeDorkingEngine:
|
|
async def async_search(self, session, asset, qtype):
|
|
return []
|
|
|
|
|
|
def test_seen_assets_prevents_duplicate_scan():
|
|
orc = _FakeOrchestrator()
|
|
scanner = AvalancheScanner(orc)
|
|
|
|
async def _run():
|
|
scanner.seen_assets.add("target@example.com")
|
|
await asyncio.gather(
|
|
scanner._process("target@example.com", depth=0, parent=None, found_in="seed"),
|
|
scanner._process("target@example.com", depth=0, parent=None, found_in="seed"),
|
|
)
|
|
|
|
asyncio.run(_run())
|
|
# Should only have been scanned once (or zero times since it was pre-added to seen_assets)
|
|
assert orc.scan_calls.count("target@example.com") <= 1
|
|
|
|
|
|
def test_depth_cap_respected():
|
|
orc = _FakeOrchestrator()
|
|
scanner = AvalancheScanner(orc)
|
|
|
|
async def _run():
|
|
await scanner._process("deep@example.com", depth=99, parent=None, found_in="seed")
|
|
|
|
asyncio.run(_run())
|
|
assert "deep@example.com" not in orc.scan_calls
|
|
|
|
|
|
def test_global_dork_url_dedup():
|
|
orc = _FakeOrchestrator()
|
|
scanner = AvalancheScanner(orc)
|
|
scanner._seen_dork_urls.add("https://example.com/leak")
|
|
|
|
# Simulate accumulating a hit with a URL already seen
|
|
hit = {"url": "https://example.com/leak", "title": "Leak", "snippet": ""}
|
|
initial_len = len(scanner._dork_hits)
|
|
url = hit.get("url", "")
|
|
if url and url not in scanner._seen_dork_urls:
|
|
scanner._seen_dork_urls.add(url)
|
|
scanner._dork_hits.append(hit)
|
|
|
|
assert len(scanner._dork_hits) == initial_len # not added — already seen
|