Files
NawfalMotii79-PLFM_RADAR/9_Firmware/9_3_GUI/test_v7.py
T
Jason 21bf7a0228 feat(fpga,gui): PR-AC.1 — M-5 status pkt 30→34 B for medium_chirp/medium_listen readback
Closes the 161-µs MEDIUM PRI visibility gap from the 2026-05-02 e2e audit.
PR-G ran out of reserved bits in status_words[3] to fit a second 16-bit
pair, so this PR adds status_words[7] = {medium_chirp[15:0], medium_listen[15:0]}
and bumps STATUS_PKT_LEN 30→34, STATUS_PACKET_SIZE 30→34.

RTL (usb_data_interface_ft2232h.v):
  - Two new input ports `status_medium_chirp` / `status_medium_listen`.
  - status_words array bound 6→7, init loop 7→8, snapshot block packs word 7.
  - STATUS_PKT_LEN width 5→6 bits to hold 34, byte-mux index widened
    [4:0]→[5:0] with 4 new entries for word 7 + footer at index 33.
  - Header docstring + length-localparam comment refreshed.

Top-level (radar_system_top.v): wire `host_medium_chirp_cycles` /
`host_medium_listen_cycles` into the FT2232H usb_inst (gen_ft2232h branch
only; legacy FT601 path retains its pre-PR-G 6-word / 26-byte layout —
no host code reads it today).

Host parser (radar_protocol.py):
  - STATUS_PACKET_SIZE 30→34. Module docstring + parse_status_packet
    docstring + find_bulk_frame_boundaries note refreshed.
  - StatusResponse gains `medium_chirp` / `medium_listen` fields.
  - Word-loop bounds 7→8; word-7 decode added.

Tests:
  - tb_usb_protocol_v2 — drive default RP_DEF_MEDIUM_*_CYCLES (500/15600),
    move T3.2 footer assertion 29→33, add T3.8..T3.11 for word-7 bytes.
    Manual run: 31/31 PASS (TB not in run_regression.sh).
  - tb_ft2232h_frame_drop, tb_e2e_dsp_to_host — tie new DUT ports to
    16'd0 (these TBs don't exercise status reads).
  - test_GUI_V65_Tk — extend _make_status_packet builder kwargs, rename
    test_status_packet_is_30_bytes → _is_34_bytes, add round-trip +
    16-bit-max tests for word 7.
  - test_v7 — TestStatusPacketV2RoundTrip rewired for 8 words / 34 B,
    add test_word7_medium_pri_decoded + test_pre_M5_30byte_packet_rejected.

Verification:
  - FPGA regression (iverilog + xsim cosim): 42/0/0.
  - tb_usb_protocol_v2 standalone: 31/0.
  - Host test_GUI_V65_Tk: 119/119.
  - Host test_v7: 152/152.
2026-05-07 15:20:09 +05:45

2000 lines
85 KiB
Python

"""
V7-specific unit tests for the PLFM Radar GUI V7 modules.
Tests cover:
- v7.models: RadarTarget, RadarSettings, GPSData, ProcessingConfig
- v7.processing: RadarProcessor, USBPacketParser, apply_pitch_correction
- v7.workers: polar_to_geographic
- v7.hardware: STM32USBInterface (basic), production protocol re-exports
Does NOT require a running Qt event loop — only unit-testable components.
Run with: python -m unittest test_v7 -v
"""
import os
import struct
import unittest
from dataclasses import asdict
import numpy as np
# =============================================================================
# Test: v7.models
# =============================================================================
class TestRadarTarget(unittest.TestCase):
"""RadarTarget dataclass."""
def test_defaults(self):
t = _models().RadarTarget(id=1, range=1000.0, velocity=5.0,
azimuth=45.0, elevation=2.0)
self.assertEqual(t.id, 1)
self.assertEqual(t.range, 1000.0)
self.assertEqual(t.snr, 0.0)
self.assertEqual(t.track_id, -1)
self.assertEqual(t.classification, "unknown")
def test_to_dict(self):
t = _models().RadarTarget(id=1, range=500.0, velocity=-10.0,
azimuth=0.0, elevation=0.0, snr=15.0)
d = t.to_dict()
self.assertIsInstance(d, dict)
self.assertEqual(d["range"], 500.0)
self.assertEqual(d["snr"], 15.0)
class TestRadarSettings(unittest.TestCase):
"""RadarSettings — verify stale STM32 fields are removed."""
def test_no_stale_fields(self):
"""chirp_duration, freq_min/max, prf1/2 must NOT exist."""
s = _models().RadarSettings()
d = asdict(s)
for stale in ["chirp_duration_1", "chirp_duration_2",
"freq_min", "freq_max", "prf1", "prf2",
"chirps_per_position"]:
self.assertNotIn(stale, d, f"Stale field '{stale}' still present")
def test_has_physical_conversion_fields(self):
s = _models().RadarSettings()
self.assertIsInstance(s.range_resolution, float)
self.assertIsInstance(s.velocity_resolution, float)
self.assertGreater(s.range_resolution, 0)
self.assertGreater(s.velocity_resolution, 0)
def test_defaults(self):
s = _models().RadarSettings()
self.assertEqual(s.system_frequency, 10.5e9)
self.assertEqual(s.coverage_radius, 3072)
self.assertEqual(s.max_distance, 3072)
class TestGPSData(unittest.TestCase):
def test_to_dict(self):
g = _models().GPSData(latitude=41.9, longitude=12.5,
altitude=100.0, pitch=2.5)
d = g.to_dict()
self.assertAlmostEqual(d["latitude"], 41.9)
self.assertAlmostEqual(d["pitch"], 2.5)
class TestProcessingConfig(unittest.TestCase):
def test_defaults(self):
cfg = _models().ProcessingConfig()
self.assertTrue(cfg.clustering_enabled)
self.assertTrue(cfg.tracking_enabled)
self.assertFalse(cfg.mti_enabled)
self.assertFalse(cfg.cfar_enabled)
class TestNoCrcmodDependency(unittest.TestCase):
"""crcmod was removed — verify it's not exported."""
def test_no_crcmod_available(self):
models = _models()
self.assertFalse(hasattr(models, "CRCMOD_AVAILABLE"),
"CRCMOD_AVAILABLE should be removed from models")
# =============================================================================
# Test: v7.processing
# =============================================================================
class TestApplyPitchCorrection(unittest.TestCase):
def test_positive_pitch(self):
from v7.processing import apply_pitch_correction
self.assertAlmostEqual(apply_pitch_correction(10.0, 3.0), 7.0)
def test_zero_pitch(self):
from v7.processing import apply_pitch_correction
self.assertAlmostEqual(apply_pitch_correction(5.0, 0.0), 5.0)
class TestRadarProcessorMTI(unittest.TestCase):
def test_mti_order1(self):
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
proc.set_config(ProcessingConfig(mti_enabled=True, mti_order=1))
frame1 = np.ones((64, 32))
frame2 = np.ones((64, 32)) * 3
result1 = proc.mti_filter(frame1)
np.testing.assert_array_equal(result1, np.zeros((64, 32)),
err_msg="First frame should be zeros (no history)")
result2 = proc.mti_filter(frame2)
expected = frame2 - frame1
np.testing.assert_array_almost_equal(result2, expected)
def test_mti_order2(self):
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
proc.set_config(ProcessingConfig(mti_enabled=True, mti_order=2))
f1 = np.ones((4, 4))
f2 = np.ones((4, 4)) * 2
f3 = np.ones((4, 4)) * 5
proc.mti_filter(f1) # zeros (need 3 frames)
proc.mti_filter(f2) # zeros
result = proc.mti_filter(f3)
# Order 2: x[n] - 2*x[n-1] + x[n-2] = 5 - 4 + 1 = 2
np.testing.assert_array_almost_equal(result, np.ones((4, 4)) * 2)
class TestRadarProcessorCFAR(unittest.TestCase):
def test_cfar_1d_detects_peak(self):
from v7.processing import RadarProcessor
signal = np.ones(64) * 10
signal[32] = 500 # inject a strong target
det = RadarProcessor.cfar_1d(signal, guard=2, train=4,
threshold_factor=3.0, cfar_type="CA-CFAR")
self.assertTrue(det[32], "Should detect strong peak at bin 32")
def test_cfar_1d_no_false_alarm(self):
from v7.processing import RadarProcessor
signal = np.ones(64) * 10 # uniform — no target
det = RadarProcessor.cfar_1d(signal, guard=2, train=4,
threshold_factor=3.0)
self.assertEqual(det.sum(), 0, "Should have no detections in flat noise")
class TestRadarProcessorProcessFrame(unittest.TestCase):
def test_process_frame_returns_shapes(self):
from v7.processing import RadarProcessor
proc = RadarProcessor()
frame = np.random.randn(64, 32) * 10
frame[20, 8] = 5000 # inject a target
power, mask = proc.process_frame(frame)
self.assertEqual(power.shape, (64, 32))
self.assertEqual(mask.shape, (64, 32))
self.assertEqual(mask.dtype, bool)
class TestRadarProcessorWindowing(unittest.TestCase):
def test_hann_window(self):
from v7.processing import RadarProcessor
data = np.ones((4, 32))
windowed = RadarProcessor.apply_window(data, "Hann")
# Hann window tapers to ~0 at edges
self.assertLess(windowed[0, 0], 0.1)
self.assertGreater(windowed[0, 16], 0.5)
def test_none_window(self):
from v7.processing import RadarProcessor
data = np.ones((4, 32))
result = RadarProcessor.apply_window(data, "None")
np.testing.assert_array_equal(result, data)
class TestRadarProcessorDCNotch(unittest.TestCase):
def test_dc_removal(self):
from v7.processing import RadarProcessor
data = np.ones((4, 8)) * 100
data[0, :] += 50 # DC offset in range bin 0
result = RadarProcessor.dc_notch(data)
# Mean along axis=1 should be ~0
row_means = np.mean(result, axis=1)
for m in row_means:
self.assertAlmostEqual(m, 0, places=10)
class TestRadarProcessorClustering(unittest.TestCase):
def test_clustering_empty(self):
from v7.processing import RadarProcessor
result = RadarProcessor.clustering([], eps=100, min_samples=2)
self.assertEqual(result, [])
class TestUSBPacketParser(unittest.TestCase):
def test_parse_gps_text(self):
from v7.processing import USBPacketParser
parser = USBPacketParser()
data = b"GPS:41.9028,12.4964,100.0,2.5\r\n"
gps = parser.parse_gps_data(data)
self.assertIsNotNone(gps)
self.assertAlmostEqual(gps.latitude, 41.9028, places=3)
self.assertAlmostEqual(gps.longitude, 12.4964, places=3)
self.assertAlmostEqual(gps.altitude, 100.0)
self.assertAlmostEqual(gps.pitch, 2.5)
def test_parse_gps_text_invalid(self):
from v7.processing import USBPacketParser
parser = USBPacketParser()
self.assertIsNone(parser.parse_gps_data(b"NOT_GPS_DATA"))
self.assertIsNone(parser.parse_gps_data(b""))
self.assertIsNone(parser.parse_gps_data(None))
def test_parse_binary_gps(self):
from v7.processing import USBPacketParser
parser = USBPacketParser()
# Build a valid binary GPS packet
pkt = bytearray(b"GPSB")
pkt += struct.pack(">d", 41.9028) # lat
pkt += struct.pack(">d", 12.4964) # lon
pkt += struct.pack(">f", 100.0) # alt
pkt += struct.pack(">f", 2.5) # pitch
# Simple checksum
cksum = sum(pkt) & 0xFFFF
pkt += struct.pack(">H", cksum)
self.assertEqual(len(pkt), 30)
gps = parser.parse_gps_data(bytes(pkt))
self.assertIsNotNone(gps)
self.assertAlmostEqual(gps.latitude, 41.9028, places=3)
def test_no_crc16_func_attribute(self):
"""crcmod was removed — USBPacketParser should not have crc16_func."""
from v7.processing import USBPacketParser
parser = USBPacketParser()
self.assertFalse(hasattr(parser, "crc16_func"),
"crc16_func should be removed (crcmod dead code)")
def test_no_multi_prf_unwrap(self):
"""multi_prf_unwrap was removed (never called, prf fields removed)."""
from v7.processing import RadarProcessor
self.assertFalse(hasattr(RadarProcessor, "multi_prf_unwrap"),
"multi_prf_unwrap should be removed")
# =============================================================================
# Test: v7.workers — polar_to_geographic
# =============================================================================
def _pyqt6_available():
try:
import PyQt6.QtCore # noqa: F401
return True
except ImportError:
return False
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestPolarToGeographic(unittest.TestCase):
def test_north_bearing(self):
from v7.workers import polar_to_geographic
lat, lon = polar_to_geographic(0.0, 0.0, 1000.0, 0.0)
# Moving 1km north from equator
self.assertGreater(lat, 0.0)
self.assertAlmostEqual(lon, 0.0, places=4)
def test_east_bearing(self):
from v7.workers import polar_to_geographic
lat, lon = polar_to_geographic(0.0, 0.0, 1000.0, 90.0)
self.assertAlmostEqual(lat, 0.0, places=4)
self.assertGreater(lon, 0.0)
def test_zero_range(self):
from v7.workers import polar_to_geographic
lat, lon = polar_to_geographic(41.9, 12.5, 0.0, 0.0)
self.assertAlmostEqual(lat, 41.9, places=6)
self.assertAlmostEqual(lon, 12.5, places=6)
# =============================================================================
# Test: v7.hardware — production protocol re-exports
# =============================================================================
class TestHardwareReExports(unittest.TestCase):
"""Verify hardware.py re-exports all production protocol classes."""
def test_exports(self):
from v7.hardware import (
FT2232HConnection,
RadarProtocol,
STM32USBInterface,
)
# Verify these are actual classes/types, not None
self.assertTrue(callable(FT2232HConnection))
self.assertTrue(callable(RadarProtocol))
self.assertTrue(callable(STM32USBInterface))
def test_stm32_list_devices_no_crash(self):
from v7.hardware import STM32USBInterface
stm = STM32USBInterface()
self.assertFalse(stm.is_open)
# list_devices should return empty list (no USB in test env), not crash
devs = stm.list_devices()
self.assertIsInstance(devs, list)
# =============================================================================
# Test: v7.workers.RadarDataWorker initialization
# (Audit P-1: __init__ must populate _frame_queue/_acquisition/counters
# without requiring an explicit set_waveform call. Dashboard constructs
# the worker and calls .start() directly; missing init causes AttributeError
# on first frame in production.)
# =============================================================================
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestRadarDataWorkerInit(unittest.TestCase):
def test_init_sets_runtime_attrs(self):
from v7.workers import RadarDataWorker
worker = RadarDataWorker(connection=None)
self.assertIsNotNone(worker._frame_queue)
self.assertEqual(worker._frame_queue.maxsize, 4)
self.assertIsNone(worker._acquisition)
self.assertEqual(worker._frame_count, 0)
self.assertEqual(worker._byte_count, 0)
self.assertEqual(worker._error_count, 0)
self.assertFalse(worker._running)
def test_set_waveform_does_not_reset_counters(self):
from v7.workers import RadarDataWorker
from v7.models import WaveformConfig
worker = RadarDataWorker(connection=None)
worker._frame_count = 7
worker.set_waveform(WaveformConfig())
self.assertEqual(worker._frame_count, 7,
"set_waveform must not reset runtime counters")
# =============================================================================
# Test: radar_protocol PR-G v2 bulk frame + status round-trip
# Audit P-2/P-3: GUI parser must agree byte-for-byte with the FPGA emit
# (usb_data_interface_ft2232h.v). Build synthetic frames the way the FPGA
# does, then parse them back and check every field. Catches:
# - 8 vs 9 byte header (version byte at offset 1)
# - reserved-bit mask 0xC0 vs 0xF8
# - 1-bit vs 2-bit detect packing
# - 26 vs 30 byte status, 6 vs 7 status_words
# =============================================================================
class TestBulkFrameV2RoundTrip(unittest.TestCase):
"""PR-G v2 bulk frame: build synthetic FPGA emit, parse back, check."""
def _build_v2_frame(self, flags: int, frame_num: int = 0,
doppler: np.ndarray | None = None,
cfar_codes: np.ndarray | None = None,
range_profile: np.ndarray | None = None,
subframe_enable: int = 0b111) -> bytes:
"""Construct a v2 frame the way usb_data_interface_ft2232h.v emits.
``subframe_enable`` lands in byte 2 bits[5:3] (PR-U / M-8). Caller
passes raw stream bits in ``flags`` (low 3 bits); helper composes the
full byte 2 = {2'b00, subframe_enable[2:0], stream[2:0]}.
"""
from radar_protocol import (
HEADER_BYTE, FOOTER_BYTE, RP_USB_PROTOCOL_VERSION,
NUM_RANGE_BINS, NUM_DOPPLER_BINS,
BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR,
BULK_DETECT_BYTES_PER_RANGE,
BULK_SUBFRAME_ENABLE_SHIFT,
)
flags_byte = (((subframe_enable & 0x07) << BULK_SUBFRAME_ENABLE_SHIFT)
| (flags & 0x07)
| (flags & 0xC0)) # preserve reserved bits if caller injects them
parts = [
bytes([HEADER_BYTE, RP_USB_PROTOCOL_VERSION, flags_byte & 0xFF]),
struct.pack(">H", frame_num),
struct.pack(">H", NUM_RANGE_BINS),
struct.pack(">H", NUM_DOPPLER_BINS),
]
if flags & BULK_FLAG_STREAM_RANGE:
rp = (range_profile if range_profile is not None
else np.arange(NUM_RANGE_BINS, dtype=np.uint16))
parts.append(rp.astype(">u2").tobytes())
if flags & BULK_FLAG_STREAM_DOPPLER:
d = (doppler if doppler is not None
else np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint16))
parts.append(d.astype(">u2").tobytes())
if flags & BULK_FLAG_STREAM_CFAR:
codes = (cfar_codes if cfar_codes is not None
else np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8))
# Pack 4 cells per byte, MSB-first within byte
packed = np.zeros((NUM_RANGE_BINS, BULK_DETECT_BYTES_PER_RANGE), dtype=np.uint8)
for d_idx in range(NUM_DOPPLER_BINS):
byte_idx = d_idx // 4
shift = (3 - (d_idx % 4)) * 2 # MSB-first
packed[:, byte_idx] |= ((codes[:, d_idx] & 0x03) << shift).astype(np.uint8)
parts.append(packed.tobytes())
parts.append(bytes([FOOTER_BYTE]))
return b"".join(parts)
def test_full_frame_round_trip(self):
from radar_protocol import (
RadarProtocol, NUM_RANGE_BINS, NUM_DOPPLER_BINS,
BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR,
BULK_FRAME_HEADER_SIZE,
)
# Synthetic detection map: scatter all 4 codes across the grid
codes = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8)
codes[10, 5] = 1 # CAND
codes[100, 17] = 2 # CONFIRM
codes[300, 47] = 3 # reserved (still legal on the wire)
doppler = np.full((NUM_RANGE_BINS, NUM_DOPPLER_BINS), 1234, dtype=np.uint16)
rp = np.arange(NUM_RANGE_BINS, dtype=np.uint16)
flags = (BULK_FLAG_STREAM_RANGE | BULK_FLAG_STREAM_DOPPLER
| BULK_FLAG_STREAM_CFAR)
frame = self._build_v2_frame(flags, frame_num=42,
doppler=doppler, cfar_codes=codes,
range_profile=rp)
# 9 + 1024 + 49152 + 6144 + 1 = 56330
self.assertEqual(len(frame), BULK_FRAME_HEADER_SIZE + 1024 + 49152 + 6144 + 1)
self.assertEqual(BULK_FRAME_HEADER_SIZE, 9)
parsed = RadarProtocol.parse_bulk_frame(frame)
self.assertIsNotNone(parsed)
self.assertEqual(parsed["frame_number"], 42)
# PR-U / M-8: byte 2 now packs subframe_enable into bits[5:3]; helper
# defaults to 0b111 (production 3-PRI ladder) so the wire flags byte
# is (0b111 << 3) | 0x07 = 0x3F.
self.assertEqual(parsed["flags"], flags | (0b111 << 3))
self.assertEqual(parsed["subframe_enable"], 0b111)
self.assertEqual(parsed["n_range"], NUM_RANGE_BINS)
self.assertEqual(parsed["n_doppler"], NUM_DOPPLER_BINS)
np.testing.assert_array_equal(parsed["range_profile"], rp)
np.testing.assert_array_equal(parsed["doppler_mag"], doppler)
np.testing.assert_array_equal(parsed["cfar_dense"], codes)
def test_reject_wrong_version_byte(self):
from radar_protocol import RadarProtocol
frame = self._build_v2_frame(0x07)
# Corrupt the version byte
bad = bytes([frame[0], 0x01]) + frame[2:]
self.assertIsNone(RadarProtocol.parse_bulk_frame(bad))
def test_reject_reserved_flag_bits(self):
from radar_protocol import RadarProtocol
# Set bit 7 (reserved); byte order: HEADER, ver, flags
frame = self._build_v2_frame(0x07)
bad = bytes([frame[0], frame[1], frame[2] | 0x80]) + frame[3:]
self.assertIsNone(RadarProtocol.parse_bulk_frame(bad))
def test_detect_2bit_codes_independently(self):
"""Each cell decodes to the same 2-bit code that was packed."""
from radar_protocol import (
RadarProtocol, NUM_RANGE_BINS, NUM_DOPPLER_BINS,
BULK_FLAG_STREAM_CFAR,
)
# All four codes in adjacent cells of the same byte
codes = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8)
codes[0, 0] = 0
codes[0, 1] = 1
codes[0, 2] = 2
codes[0, 3] = 3
frame = self._build_v2_frame(BULK_FLAG_STREAM_CFAR, cfar_codes=codes)
parsed = RadarProtocol.parse_bulk_frame(frame)
self.assertEqual(parsed["cfar_dense"][0, 0], 0)
self.assertEqual(parsed["cfar_dense"][0, 1], 1)
self.assertEqual(parsed["cfar_dense"][0, 2], 2)
self.assertEqual(parsed["cfar_dense"][0, 3], 3)
def test_find_boundaries_on_back_to_back_frames(self):
from radar_protocol import (
RadarProtocol, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR,
)
flags = BULK_FLAG_STREAM_DOPPLER | BULK_FLAG_STREAM_CFAR
f1 = self._build_v2_frame(flags, frame_num=1)
f2 = self._build_v2_frame(flags, frame_num=2)
boundaries = RadarProtocol.find_bulk_frame_boundaries(f1 + f2)
self.assertEqual(len(boundaries), 2)
self.assertEqual(boundaries[0], (0, len(f1), "data"))
self.assertEqual(boundaries[1], (len(f1), len(f1) + len(f2), "data"))
class TestSubframeEnableRoundTrip(TestBulkFrameV2RoundTrip):
"""PR-U / M-8: byte 2 bits[5:3] carry the per-frame sub-frame mask."""
def test_default_mask_round_trip(self):
"""Production default 0b111 round-trips and is the helper default."""
from radar_protocol import (
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
)
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=1)
parsed = RadarProtocol.parse_bulk_frame(frame)
self.assertIsNotNone(parsed)
self.assertEqual(parsed["subframe_enable"], 0b111)
def test_short_disabled_mask(self):
"""subframe_enable = 0b110 (LONG|MEDIUM, no SHORT) survives the wire."""
from radar_protocol import (
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
)
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=1,
subframe_enable=0b110)
parsed = RadarProtocol.parse_bulk_frame(frame)
self.assertIsNotNone(parsed)
self.assertEqual(parsed["subframe_enable"], 0b110)
def test_short_only_mask(self):
"""subframe_enable = 0b001 (SHORT only) survives the wire."""
from radar_protocol import (
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
)
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=2,
subframe_enable=0b001)
parsed = RadarProtocol.parse_bulk_frame(frame)
self.assertIsNotNone(parsed)
self.assertEqual(parsed["subframe_enable"], 0b001)
def test_subframe_bits_no_longer_in_reserved_mask(self):
"""Bits[5:3] are now valid SF mask, not reserved — must NOT reject."""
from radar_protocol import (
RadarProtocol, BULK_FLAGS_RESERVED_MASK,
BULK_SUBFRAME_ENABLE_MASK,
)
# The new reserved mask must not overlap the SF-enable bit field.
self.assertEqual(BULK_FLAGS_RESERVED_MASK & BULK_SUBFRAME_ENABLE_MASK, 0)
# And bit 6 (top of new reserved mask) STILL rejects.
from radar_protocol import BULK_FLAG_STREAM_RANGE
frame = self._build_v2_frame(BULK_FLAG_STREAM_RANGE | 0x40)
bad = bytes([frame[0], frame[1], frame[2] | 0x40]) + frame[3:]
self.assertIsNone(RadarProtocol.parse_bulk_frame(bad))
class TestStatusPacketV2RoundTrip(unittest.TestCase):
"""M-5 status packet: 8 status_words / 34 bytes."""
def _build_status(self, words: list[int]) -> bytes:
from radar_protocol import STATUS_HEADER_BYTE, FOOTER_BYTE
assert len(words) == 8
body = b"".join(struct.pack(">I", w & 0xFFFFFFFF) for w in words)
return bytes([STATUS_HEADER_BYTE]) + body + bytes([FOOTER_BYTE])
def test_size_is_34(self):
from radar_protocol import STATUS_PACKET_SIZE
self.assertEqual(STATUS_PACKET_SIZE, 34)
pkt = self._build_status([0] * 8)
self.assertEqual(len(pkt), 34)
def test_word6_telemetry_decoded(self):
"""word[6] = {detect_count_cand[31:16], detect_threshold_soft[15:0]}"""
from radar_protocol import RadarProtocol
word6 = (0x1234 << 16) | 0xABCD # cand=0x1234, thr_soft=0xABCD
pkt = self._build_status([0, 0, 0, 0, 0, 0, word6, 0])
sr = RadarProtocol.parse_status_packet(pkt)
self.assertIsNotNone(sr)
self.assertEqual(sr.detect_count_cand, 0x1234)
self.assertEqual(sr.detect_threshold_soft, 0xABCD)
def test_word7_medium_pri_decoded(self):
"""M-5: word[7] = {medium_chirp[31:16], medium_listen[15:0]}"""
from radar_protocol import RadarProtocol
# Production defaults: 500 chirp / 15600 listen (RP_DEF_MEDIUM_*).
word7 = (500 << 16) | 15600
pkt = self._build_status([0, 0, 0, 0, 0, 0, 0, word7])
sr = RadarProtocol.parse_status_packet(pkt)
self.assertIsNotNone(sr)
self.assertEqual(sr.medium_chirp, 500)
self.assertEqual(sr.medium_listen, 15600)
def test_short_packet_returns_none(self):
from radar_protocol import RadarProtocol
pkt = self._build_status([0] * 8)
self.assertIsNone(RadarProtocol.parse_status_packet(pkt[:25]))
def test_pre_PR_G_26byte_packet_rejected(self):
"""Old 26-byte status packets must NOT silently parse — they're stale."""
from radar_protocol import RadarProtocol, STATUS_HEADER_BYTE, FOOTER_BYTE
# Build a 26-byte packet (legacy format).
old_pkt = (bytes([STATUS_HEADER_BYTE])
+ b"\x00" * 24 + bytes([FOOTER_BYTE]))
self.assertEqual(len(old_pkt), 26)
self.assertIsNone(RadarProtocol.parse_status_packet(old_pkt))
def test_pre_M5_30byte_packet_rejected(self):
"""Pre-M-5 30-byte (PR-G 7-word) status packets must reject after the bump."""
from radar_protocol import RadarProtocol, STATUS_HEADER_BYTE, FOOTER_BYTE
old_pkt = (bytes([STATUS_HEADER_BYTE])
+ b"\x00" * 28 + bytes([FOOTER_BYTE]))
self.assertEqual(len(old_pkt), 30)
self.assertIsNone(RadarProtocol.parse_status_packet(old_pkt))
# =============================================================================
# Test: v7.__init__ — clean exports
# =============================================================================
class TestV7Init(unittest.TestCase):
"""Verify top-level v7 package exports."""
def test_no_crcmod_export(self):
import v7
self.assertFalse(hasattr(v7, "CRCMOD_AVAILABLE"),
"CRCMOD_AVAILABLE should not be in v7.__all__")
def test_key_exports(self):
import v7
# Core exports (no PyQt6 required)
for name in ["RadarTarget", "RadarSettings", "GPSData",
"ProcessingConfig", "FT2232HConnection",
"RadarProtocol", "RadarProcessor"]:
self.assertTrue(hasattr(v7, name), f"v7 missing export: {name}")
# PyQt6-dependent exports — only present when PyQt6 is installed
if _pyqt6_available():
for name in ["RadarDataWorker", "RadarMapWidget",
"RadarDashboard"]:
self.assertTrue(hasattr(v7, name), f"v7 missing export: {name}")
# =============================================================================
# Test: AGC Visualization data model
# =============================================================================
class TestAGCVisualizationV7(unittest.TestCase):
"""AGC visualization ring buffer and data model tests (no Qt required)."""
def _make_deque(self, maxlen=256):
from collections import deque
return deque(maxlen=maxlen)
def test_ring_buffer_basics(self):
d = self._make_deque(maxlen=4)
for i in range(6):
d.append(i)
self.assertEqual(list(d), [2, 3, 4, 5])
def test_gain_range_4bit(self):
"""AGC gain is 4-bit (0-15)."""
from radar_protocol import StatusResponse
for g in [0, 7, 15]:
sr = StatusResponse(agc_current_gain=g)
self.assertEqual(sr.agc_current_gain, g)
def test_peak_range_8bit(self):
"""Peak magnitude is 8-bit (0-255)."""
from radar_protocol import StatusResponse
for p in [0, 128, 255]:
sr = StatusResponse(agc_peak_magnitude=p)
self.assertEqual(sr.agc_peak_magnitude, p)
def test_saturation_accumulation(self):
"""Saturation ring buffer sum tracks total events."""
sat = self._make_deque(maxlen=256)
for s in [0, 5, 0, 10, 3]:
sat.append(s)
self.assertEqual(sum(sat), 18)
def test_mode_label_logic(self):
"""AGC mode string from enable field."""
from radar_protocol import StatusResponse
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=1).agc_enable else "MANUAL",
"AUTO")
self.assertEqual(
"AUTO" if StatusResponse(agc_enable=0).agc_enable else "MANUAL",
"MANUAL")
def test_history_len_default(self):
"""Default history length should be 256."""
d = self._make_deque(maxlen=256)
self.assertEqual(d.maxlen, 256)
def test_color_thresholds(self):
"""Saturation color: green=0, warning=1-10, error>10."""
from v7.models import DARK_SUCCESS, DARK_WARNING, DARK_ERROR
def pick_color(total):
if total > 10:
return DARK_ERROR
if total > 0:
return DARK_WARNING
return DARK_SUCCESS
self.assertEqual(pick_color(0), DARK_SUCCESS)
self.assertEqual(pick_color(5), DARK_WARNING)
self.assertEqual(pick_color(11), DARK_ERROR)
# =============================================================================
# Test: v7.models.WaveformConfig
# =============================================================================
class TestWaveformConfig(unittest.TestCase):
"""WaveformConfig dataclass and derived physical properties."""
def test_defaults(self):
from v7.models import WaveformConfig
wc = WaveformConfig()
self.assertEqual(wc.sample_rate_hz, 100e6)
self.assertEqual(wc.bandwidth_hz, 20e6)
self.assertEqual(wc.chirp_duration_s, 30e-6)
# PR-Q: 3 staggered PRIs (SHORT 175, MEDIUM 161, LONG 167 us)
self.assertEqual(wc.pri_short_s, 175e-6)
self.assertEqual(wc.pri_medium_s, 161e-6)
self.assertEqual(wc.pri_long_s, 167e-6)
self.assertEqual(wc.center_freq_hz, 10.5e9)
self.assertEqual(wc.n_range_bins, 512)
self.assertEqual(wc.n_doppler_bins, 48)
self.assertEqual(wc.num_subframes, 3)
self.assertEqual(wc.chirps_per_subframe, 16)
self.assertEqual(wc.fft_size, 2048)
self.assertEqual(wc.decimation_factor, 4)
def test_range_resolution(self):
"""range_resolution_m should be ~6.0 m/bin (matched filter, 100 MSPS, decim 4)."""
from v7.models import WaveformConfig
wc = WaveformConfig()
self.assertAlmostEqual(wc.range_resolution_m, 5.996, places=2)
def test_velocity_resolution_per_subframe(self):
"""Per-subframe v_res = lambda / (2 * 16 * PRI), PR-Q stagger."""
from v7.models import WaveformConfig
wc = WaveformConfig()
# lambda = c / 10.5e9 = 0.02856 m
# SHORT 175 us: 0.02856 / (32 * 175e-6) = 5.099 m/s/bin
# MEDIUM 161 us: 0.02856 / (32 * 161e-6) = 5.543 m/s/bin
# LONG 167 us: 0.02856 / (32 * 167e-6) = 5.343 m/s/bin
self.assertAlmostEqual(wc.velocity_resolution_short_mps, 5.099, places=2)
self.assertAlmostEqual(wc.velocity_resolution_medium_mps, 5.543, places=2)
self.assertAlmostEqual(wc.velocity_resolution_long_mps, 5.343, places=2)
# Smallest PRI (MEDIUM) gives largest v_res → largest v_unamb.
self.assertGreater(wc.velocity_resolution_medium_mps, wc.velocity_resolution_long_mps)
self.assertGreater(wc.velocity_resolution_medium_mps, wc.velocity_resolution_short_mps)
def test_max_range(self):
"""max_range_m = range_resolution * n_range_bins."""
from v7.models import WaveformConfig
wc = WaveformConfig()
self.assertAlmostEqual(wc.max_range_m, wc.range_resolution_m * 512, places=1)
def test_max_velocity_per_subframe(self):
"""Per-subframe v_unamb = v_res * chirps_per_subframe / 2."""
from v7.models import WaveformConfig
wc = WaveformConfig()
for vmax, vres in [
(wc.max_velocity_short_mps, wc.velocity_resolution_short_mps),
(wc.max_velocity_medium_mps, wc.velocity_resolution_medium_mps),
(wc.max_velocity_long_mps, wc.velocity_resolution_long_mps),
]:
self.assertAlmostEqual(vmax, vres * 8.0, places=2)
def test_extended_max_velocity_crt(self):
"""CRT-extended v_unamb = max(per-subframe v_unamb) * K."""
from v7.models import WaveformConfig
wc = WaveformConfig()
# MEDIUM has the largest per-subframe v_unamb (smallest PRI).
# K=6 default -> ~266 m/s; well above UAS speeds 50-80 m/s.
v6 = wc.extended_max_velocity_mps_crt()
self.assertAlmostEqual(v6, wc.max_velocity_medium_mps * 6, places=2)
# K=3 should give half of K=6.
v3 = wc.extended_max_velocity_mps_crt(max_alias_k=3)
self.assertAlmostEqual(v3, wc.max_velocity_medium_mps * 3, places=2)
self.assertAlmostEqual(v6, 2.0 * v3, places=2)
def test_custom_params(self):
"""Non-default parameters correctly change derived values."""
from v7.models import WaveformConfig
wc1 = WaveformConfig()
wc2 = WaveformConfig(sample_rate_hz=200e6) # double Fs → halve range bin
self.assertAlmostEqual(wc2.range_resolution_m, wc1.range_resolution_m / 2, places=2)
def test_zero_center_freq_velocity(self):
"""Zero center freq should ZeroDivisionError in any per-subframe velocity calc."""
from v7.models import WaveformConfig
wc = WaveformConfig(center_freq_hz=0.0)
with self.assertRaises(ZeroDivisionError):
_ = wc.velocity_resolution_long_mps
with self.assertRaises(ZeroDivisionError):
_ = wc.velocity_resolution_short_mps
with self.assertRaises(ZeroDivisionError):
_ = wc.velocity_resolution_medium_mps
# =============================================================================
# Test: v7.software_fpga.SoftwareFPGA
# =============================================================================
class TestSoftwareFPGA(unittest.TestCase):
"""SoftwareFPGA register interface and signal chain."""
def _make_fpga(self):
from v7.software_fpga import SoftwareFPGA
return SoftwareFPGA()
def test_reset_defaults(self):
"""Register reset values match FPGA RTL (radar_system_top.v)."""
fpga = self._make_fpga()
self.assertEqual(fpga.detect_threshold, 10_000)
self.assertEqual(fpga.gain_shift, 0)
self.assertFalse(fpga.cfar_enable)
self.assertEqual(fpga.cfar_guard, 2)
self.assertEqual(fpga.cfar_train, 8)
self.assertEqual(fpga.cfar_alpha, 0x30)
self.assertEqual(fpga.cfar_mode, 0)
self.assertFalse(fpga.mti_enable)
self.assertEqual(fpga.dc_notch_width, 0)
self.assertFalse(fpga.agc_enable)
self.assertEqual(fpga.agc_target, 200)
self.assertEqual(fpga.agc_attack, 1)
self.assertEqual(fpga.agc_decay, 1)
self.assertEqual(fpga.agc_holdoff, 4)
def test_setter_detect_threshold(self):
fpga = self._make_fpga()
fpga.set_detect_threshold(5000)
self.assertEqual(fpga.detect_threshold, 5000)
def test_setter_detect_threshold_clamp_16bit(self):
fpga = self._make_fpga()
fpga.set_detect_threshold(0x1FFFF) # 17-bit
self.assertEqual(fpga.detect_threshold, 0xFFFF)
def test_setter_gain_shift_clamp_4bit(self):
fpga = self._make_fpga()
fpga.set_gain_shift(0xFF)
self.assertEqual(fpga.gain_shift, 0x0F)
def test_setter_cfar_enable(self):
fpga = self._make_fpga()
fpga.set_cfar_enable(True)
self.assertTrue(fpga.cfar_enable)
fpga.set_cfar_enable(False)
self.assertFalse(fpga.cfar_enable)
def test_setter_cfar_guard_clamp_4bit(self):
fpga = self._make_fpga()
fpga.set_cfar_guard(0x1F)
self.assertEqual(fpga.cfar_guard, 0x0F)
def test_setter_cfar_train_min_1(self):
"""CFAR train cells clamped to min 1."""
fpga = self._make_fpga()
fpga.set_cfar_train(0)
self.assertEqual(fpga.cfar_train, 1)
def test_setter_cfar_train_clamp_5bit(self):
fpga = self._make_fpga()
fpga.set_cfar_train(0x3F)
self.assertEqual(fpga.cfar_train, 0x1F)
def test_setter_cfar_alpha_clamp_8bit(self):
fpga = self._make_fpga()
fpga.set_cfar_alpha(0x1FF)
self.assertEqual(fpga.cfar_alpha, 0xFF)
def test_setter_cfar_mode_clamp_2bit(self):
fpga = self._make_fpga()
fpga.set_cfar_mode(7)
self.assertEqual(fpga.cfar_mode, 3)
def test_setter_mti_enable(self):
fpga = self._make_fpga()
fpga.set_mti_enable(True)
self.assertTrue(fpga.mti_enable)
def test_setter_dc_notch_clamp_3bit(self):
fpga = self._make_fpga()
fpga.set_dc_notch_width(0xFF)
self.assertEqual(fpga.dc_notch_width, 7)
def test_setter_agc_params_selective(self):
"""set_agc_params only changes provided fields."""
fpga = self._make_fpga()
fpga.set_agc_params(target=100)
self.assertEqual(fpga.agc_target, 100)
self.assertEqual(fpga.agc_attack, 1) # unchanged
fpga.set_agc_params(attack=3, decay=5)
self.assertEqual(fpga.agc_attack, 3)
self.assertEqual(fpga.agc_decay, 5)
self.assertEqual(fpga.agc_target, 100) # unchanged
def test_setter_agc_params_clamp(self):
fpga = self._make_fpga()
fpga.set_agc_params(target=0xFFF, attack=0xFF, decay=0xFF, holdoff=0xFF)
self.assertEqual(fpga.agc_target, 0xFF)
self.assertEqual(fpga.agc_attack, 0x0F)
self.assertEqual(fpga.agc_decay, 0x0F)
self.assertEqual(fpga.agc_holdoff, 0x0F)
class TestSoftwareFPGASignalChain(unittest.TestCase):
"""SoftwareFPGA.process_chirps with real co-sim data."""
COSIM_DIR = os.path.join(
os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
"real_data", "hex"
)
def _cosim_available(self):
return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy"))
def test_process_chirps_returns_radar_frame(self):
"""process_chirps produces a RadarFrame with production shapes (PR-O.6 / PR-F)."""
if not self._cosim_available():
self.skipTest("co-sim data not found")
from v7.software_fpga import SoftwareFPGA
from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS
# Load decimated range data and pad up to current FPGA chirp width.
dec_i = np.load(os.path.join(self.COSIM_DIR, "decimated_range_i.npy"))
dec_q = np.load(os.path.join(self.COSIM_DIR, "decimated_range_q.npy"))
# Production chirp width = FFT_SIZE = 2048 samples; pad with zeros.
n_chirps = dec_i.shape[0]
iq_i = np.zeros((n_chirps, 2048), dtype=np.int64)
iq_q = np.zeros((n_chirps, 2048), dtype=np.int64)
n_copy = min(2048, dec_i.shape[1])
iq_i[:, :n_copy] = dec_i[:, :n_copy]
iq_q[:, :n_copy] = dec_q[:, :n_copy]
fpga = SoftwareFPGA()
frame = fpga.process_chirps(iq_i, iq_q, frame_number=42, timestamp=1.0)
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.frame_number, 42)
self.assertAlmostEqual(frame.timestamp, 1.0)
# Doppler width tracks input n_chirps (16-multiple → that-many sub-frames).
n_dop = (n_chirps // 16) * 16
self.assertEqual(frame.range_doppler_i.shape, (NUM_RANGE_BINS, n_dop))
self.assertEqual(frame.range_doppler_q.shape, (NUM_RANGE_BINS, n_dop))
self.assertEqual(frame.magnitude.shape, (NUM_RANGE_BINS, n_dop))
self.assertEqual(frame.detections.shape, (NUM_RANGE_BINS, n_dop))
self.assertEqual(frame.range_profile.shape, (NUM_RANGE_BINS,))
self.assertEqual(frame.detection_count, int(frame.detections.sum()))
# Sanity: NUM_DOPPLER_BINS is the production max (48).
self.assertLessEqual(n_dop, NUM_DOPPLER_BINS)
def test_cfar_enable_changes_detections(self):
"""Enabling CFAR vs simple threshold should yield different detection counts."""
from v7.software_fpga import SoftwareFPGA
from radar_protocol import NUM_RANGE_BINS, NUM_DOPPLER_BINS
# Production dimensions: 48 chirps x 2048 samples.
iq_i = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64)
iq_q = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64)
# Inject a single strong tone in bin 10 of every chirp.
iq_i[:, 10] = 5000
iq_q[:, 10] = 3000
fpga_thresh = SoftwareFPGA()
fpga_thresh.set_detect_threshold(1) # very low → many detections
frame_thresh = fpga_thresh.process_chirps(iq_i, iq_q)
fpga_cfar = SoftwareFPGA()
fpga_cfar.set_cfar_enable(True)
fpga_cfar.set_cfar_alpha(0x10)
frame_cfar = fpga_cfar.process_chirps(iq_i, iq_q)
self.assertIsNotNone(frame_thresh)
self.assertIsNotNone(frame_cfar)
self.assertEqual(frame_thresh.magnitude.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
self.assertEqual(frame_cfar.magnitude.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
class TestQuantizeRawIQ(unittest.TestCase):
"""quantize_raw_iq utility function."""
def test_3d_input(self):
"""3-D (frames, chirps, samples) → uses first frame."""
from v7.software_fpga import quantize_raw_iq
raw = np.random.randn(5, 32, 1024) + 1j * np.random.randn(5, 32, 1024)
iq_i, iq_q = quantize_raw_iq(raw)
self.assertEqual(iq_i.shape, (32, 1024))
self.assertEqual(iq_q.shape, (32, 1024))
self.assertTrue(np.all(np.abs(iq_i) <= 32767))
self.assertTrue(np.all(np.abs(iq_q) <= 32767))
def test_2d_input(self):
"""2-D (chirps, samples) → works directly."""
from v7.software_fpga import quantize_raw_iq
raw = np.random.randn(32, 1024) + 1j * np.random.randn(32, 1024)
iq_i, _iq_q = quantize_raw_iq(raw)
self.assertEqual(iq_i.shape, (32, 1024))
def test_zero_input(self):
"""All-zero complex input → all-zero output."""
from v7.software_fpga import quantize_raw_iq
raw = np.zeros((32, 1024), dtype=np.complex128)
iq_i, iq_q = quantize_raw_iq(raw)
self.assertTrue(np.all(iq_i == 0))
self.assertTrue(np.all(iq_q == 0))
def test_peak_target_scaling(self):
"""Peak of output should be near peak_target."""
from v7.software_fpga import quantize_raw_iq
raw = np.zeros((32, 1024), dtype=np.complex128)
raw[0, 0] = 1.0 + 0j # single peak
iq_i, _iq_q = quantize_raw_iq(raw, peak_target=500)
# The peak I value should be exactly 500 (sole max)
self.assertEqual(int(iq_i[0, 0]), 500)
# =============================================================================
# Test: v7.replay (ReplayEngine, detect_format)
# =============================================================================
class TestDetectFormat(unittest.TestCase):
"""detect_format auto-detection logic."""
COSIM_DIR = os.path.join(
os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
"real_data", "hex"
)
def test_cosim_dir(self):
if not os.path.isdir(self.COSIM_DIR):
self.skipTest("co-sim dir not found")
# COSIM_DIR format requires doppler_map_i/q.npy; skip if absent.
if not os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy")):
self.skipTest("co-sim doppler_map .npy files not present")
from v7.replay import detect_format, ReplayFormat
self.assertEqual(detect_format(self.COSIM_DIR), ReplayFormat.COSIM_DIR)
def test_npy_file(self):
"""A .npy file → RAW_IQ_NPY."""
from v7.replay import detect_format, ReplayFormat
import tempfile
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
np.save(f, np.zeros((2, 32, 1024), dtype=np.complex128))
tmp = f.name
try:
self.assertEqual(detect_format(tmp), ReplayFormat.RAW_IQ_NPY)
finally:
os.unlink(tmp)
def test_h5_file(self):
"""A .h5 file → HDF5."""
from v7.replay import detect_format, ReplayFormat
self.assertEqual(detect_format("/tmp/fake_recording.h5"), ReplayFormat.HDF5)
def test_unknown_extension_raises(self):
from v7.replay import detect_format
with self.assertRaises(ValueError):
detect_format("/tmp/data.csv")
def test_empty_dir_raises(self):
"""Directory without co-sim files → ValueError."""
from v7.replay import detect_format
import tempfile
with tempfile.TemporaryDirectory() as td, self.assertRaises(ValueError):
detect_format(td)
class TestReplayEngineCosim(unittest.TestCase):
"""ReplayEngine loading from FPGA co-sim directory."""
COSIM_DIR = os.path.join(
os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
"real_data", "hex"
)
def _available(self):
return os.path.isfile(os.path.join(self.COSIM_DIR, "doppler_map_i.npy"))
def test_load_cosim(self):
if not self._available():
self.skipTest("co-sim data not found")
from v7.replay import ReplayEngine, ReplayFormat
engine = ReplayEngine(self.COSIM_DIR)
self.assertEqual(engine.fmt, ReplayFormat.COSIM_DIR)
self.assertEqual(engine.total_frames, 1)
def test_get_frame_cosim(self):
if not self._available():
self.skipTest("co-sim data not found")
from v7.replay import ReplayEngine
from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS
engine = ReplayEngine(self.COSIM_DIR)
frame = engine.get_frame(0)
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.range_doppler_i.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
self.assertEqual(frame.magnitude.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
def test_get_frame_out_of_range(self):
if not self._available():
self.skipTest("co-sim data not found")
from v7.replay import ReplayEngine
engine = ReplayEngine(self.COSIM_DIR)
with self.assertRaises(IndexError):
engine.get_frame(1)
with self.assertRaises(IndexError):
engine.get_frame(-1)
class TestReplayEngineRawIQ(unittest.TestCase):
"""ReplayEngine loading from raw IQ .npy cube."""
def test_load_raw_iq_synthetic(self):
"""Synthetic raw IQ cube loads and produces correct frame count."""
import tempfile
from v7.replay import ReplayEngine, ReplayFormat
from v7.software_fpga import SoftwareFPGA
raw = np.random.randn(3, 32, 1024) + 1j * np.random.randn(3, 32, 1024)
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
np.save(f, raw)
tmp = f.name
try:
fpga = SoftwareFPGA()
engine = ReplayEngine(tmp, software_fpga=fpga)
self.assertEqual(engine.fmt, ReplayFormat.RAW_IQ_NPY)
self.assertEqual(engine.total_frames, 3)
finally:
os.unlink(tmp)
def test_get_frame_raw_iq_synthetic(self):
"""get_frame on raw IQ runs SoftwareFPGA and returns RadarFrame."""
import tempfile
from v7.replay import ReplayEngine
from v7.software_fpga import SoftwareFPGA
from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS
# Production dimensions: 48 chirps x 2048 samples per frame.
raw = (np.random.randn(2, NUM_DOPPLER_BINS, 2048)
+ 1j * np.random.randn(2, NUM_DOPPLER_BINS, 2048))
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
np.save(f, raw)
tmp = f.name
try:
fpga = SoftwareFPGA()
engine = ReplayEngine(tmp, software_fpga=fpga)
frame = engine.get_frame(0)
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.range_doppler_i.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
self.assertEqual(frame.frame_number, 0)
finally:
os.unlink(tmp)
def test_raw_iq_no_fpga_raises(self):
"""Raw IQ get_frame without SoftwareFPGA → RuntimeError."""
import tempfile
from v7.replay import ReplayEngine
raw = np.random.randn(1, 32, 1024) + 1j * np.random.randn(1, 32, 1024)
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
np.save(f, raw)
tmp = f.name
try:
engine = ReplayEngine(tmp)
with self.assertRaises(RuntimeError):
engine.get_frame(0)
finally:
os.unlink(tmp)
class TestReplayEngineHDF5(unittest.TestCase):
"""ReplayEngine loading from HDF5 recordings."""
def _skip_no_h5py(self):
try:
import h5py # noqa: F401
except ImportError:
self.skipTest("h5py not installed")
def test_load_hdf5_synthetic(self):
"""Synthetic HDF5 loads and iterates frames."""
self._skip_no_h5py()
import tempfile
import h5py
from v7.replay import ReplayEngine, ReplayFormat
from radar_protocol import RadarFrame
with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as f:
tmp = f.name
try:
with h5py.File(tmp, "w") as hf:
hf.attrs["creator"] = "test"
hf.attrs["range_bins"] = 64
hf.attrs["doppler_bins"] = 32
grp = hf.create_group("frames")
for i in range(3):
fg = grp.create_group(f"frame_{i:06d}")
fg.attrs["timestamp"] = float(i)
fg.attrs["frame_number"] = i
fg.attrs["detection_count"] = 0
fg.create_dataset("range_doppler_i",
data=np.zeros((64, 32), dtype=np.int16))
fg.create_dataset("range_doppler_q",
data=np.zeros((64, 32), dtype=np.int16))
fg.create_dataset("magnitude",
data=np.zeros((64, 32), dtype=np.float64))
fg.create_dataset("detections",
data=np.zeros((64, 32), dtype=np.uint8))
fg.create_dataset("range_profile",
data=np.zeros(64, dtype=np.float64))
engine = ReplayEngine(tmp)
self.assertEqual(engine.fmt, ReplayFormat.HDF5)
self.assertEqual(engine.total_frames, 3)
frame = engine.get_frame(1)
self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.frame_number, 1)
self.assertEqual(frame.range_doppler_i.shape, (64, 32))
engine.close()
finally:
os.unlink(tmp)
# =============================================================================
# Test: v7.processing.extract_targets_from_frame
# =============================================================================
class TestExtractTargetsFromFrame(unittest.TestCase):
"""extract_targets_from_frame bin-to-physical conversion."""
def _make_frame(self, det_cells=None):
"""Create a minimal RadarFrame with optional detection cells."""
from radar_protocol import RadarFrame
frame = RadarFrame()
if det_cells:
for rbin, dbin in det_cells:
frame.detections[rbin, dbin] = 1
frame.magnitude[rbin, dbin] = 1000.0
frame.detection_count = int(frame.detections.sum())
frame.timestamp = 1.0
return frame
def test_no_detections(self):
from v7.processing import extract_targets_from_frame
frame = self._make_frame()
targets = extract_targets_from_frame(frame)
self.assertEqual(len(targets), 0)
def test_single_detection_range(self):
"""Detection at range bin 10 → range = 10 * range_resolution."""
from v7.processing import extract_targets_from_frame
# PR-Q: n_doppler_bins=48 → centre bin = 24 (was 16 in 32-bin world).
frame = self._make_frame(det_cells=[(10, 24)])
targets = extract_targets_from_frame(frame, range_resolution=5.996)
self.assertEqual(len(targets), 1)
self.assertAlmostEqual(targets[0].range, 10 * 5.996, places=1)
self.assertAlmostEqual(targets[0].velocity, 0.0, places=2)
def test_velocity_sign(self):
"""Doppler bin < center → negative velocity, > center → positive."""
from v7.processing import extract_targets_from_frame
# PR-Q: centre = 24 in 48-bin frame. dbin=10 below, dbin=30 above.
frame = self._make_frame(det_cells=[(5, 10), (5, 30)])
targets = extract_targets_from_frame(frame, velocity_resolution=1.484)
# dbin=10: vel = (10-24)*1.484 = -20.776 (approaching)
# dbin=30: vel = (30-24)*1.484 = +8.904 (receding)
self.assertLess(targets[0].velocity, 0)
self.assertGreater(targets[1].velocity, 0)
def test_snr_positive_for_nonzero_mag(self):
from v7.processing import extract_targets_from_frame
frame = self._make_frame(det_cells=[(3, 16)])
targets = extract_targets_from_frame(frame)
self.assertGreater(targets[0].snr, 0)
def test_gps_georef(self):
"""With GPS data, targets get non-zero lat/lon."""
from v7.processing import extract_targets_from_frame
from v7.models import GPSData
gps = GPSData(latitude=41.9, longitude=12.5, altitude=0.0,
pitch=0.0, heading=90.0)
frame = self._make_frame(det_cells=[(10, 16)])
targets = extract_targets_from_frame(
frame, range_resolution=100.0, gps=gps)
# Should be roughly east of radar position
self.assertAlmostEqual(targets[0].latitude, 41.9, places=2)
self.assertGreater(targets[0].longitude, 12.5)
def test_multiple_detections(self):
from v7.processing import extract_targets_from_frame
frame = self._make_frame(det_cells=[(0, 0), (10, 10), (63, 31)])
targets = extract_targets_from_frame(frame)
self.assertEqual(len(targets), 3)
# IDs should be sequential 0, 1, 2
self.assertEqual([t.id for t in targets], [0, 1, 2])
# =============================================================================
# Test: v7.processing.unfold_velocity_crt (PR-Q.5, audit C-5)
# =============================================================================
def _fold_v(v: float, v_unamb: float) -> float:
"""Helper: fold v into signed [-v_unamb, +v_unamb] (FFT convention)."""
span = 2.0 * v_unamb
return ((v + v_unamb) % span) - v_unamb
class TestUnfoldVelocityCRT(unittest.TestCase):
"""3-PRI Chinese-Remainder Doppler unfolding."""
def _vu_vr(self):
from v7.models import WaveformConfig
wc = WaveformConfig()
v_unamb = [
wc.max_velocity_short_mps,
wc.max_velocity_medium_mps,
wc.max_velocity_long_mps,
]
v_res = [
wc.velocity_resolution_short_mps,
wc.velocity_resolution_medium_mps,
wc.velocity_resolution_long_mps,
]
return v_unamb, v_res
def test_zero_velocity_three_pri_confirmed(self):
"""All zero measurements → v=0, single fold, CONFIRMED."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
v_est, conf, alias = unfold_velocity_crt([0.0, 0.0, 0.0], v_unamb, v_res)
self.assertAlmostEqual(v_est, 0.0, places=2)
self.assertEqual(conf, "CONFIRMED")
self.assertEqual(len(alias), 1)
def test_below_per_pri_unamb_three_pri_confirmed(self):
"""v_true=30 m/s (below per-PRI v_unamb ~42 m/s): all 3 PRIs measure +30 directly."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
v_true = 30.0
v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
# Sanity: each |v_meas| ≤ v_unamb
for vm, vu in zip(v_meas, v_unamb, strict=False):
self.assertLessEqual(abs(vm), vu)
v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertAlmostEqual(v_est, v_true, places=1)
self.assertEqual(conf, "CONFIRMED")
self.assertEqual(len(alias), 1)
def test_above_per_pri_unamb_crt_unfolds_correctly(self):
"""v_true=100 m/s (above any per-PRI v_unamb): 3-PRI CRT unfolds."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
v_true = 100.0
v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
# Each per-PRI fold differs (since each PRI has different v_unamb)
self.assertNotAlmostEqual(v_meas[0], v_meas[1], places=1)
self.assertNotAlmostEqual(v_meas[1], v_meas[2], places=1)
v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertAlmostEqual(v_est, v_true, places=1)
self.assertEqual(conf, "CONFIRMED")
self.assertEqual(len(alias), 1)
def test_negative_velocity_crt_unfolds(self):
"""v_true=-75 m/s: CRT unfolds to a negative velocity."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
v_true = -75.0
v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertAlmostEqual(v_est, v_true, places=1)
self.assertEqual(conf, "CONFIRMED")
def test_long_only_single_pri_ambiguous(self):
"""1-PRI input → AMBIGUOUS (LONG-only-at-20-km regime)."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
# Only LONG sub-frame seeing the target.
v_est, conf, alias = unfold_velocity_crt(
[15.0], [v_unamb[2]], [v_res[2]],
)
self.assertAlmostEqual(v_est, 15.0, places=2)
self.assertEqual(conf, "AMBIGUOUS")
self.assertEqual(alias, [15.0])
def test_two_pri_consistent_likely(self):
"""2-PRI consistent measurements → LIKELY (less constraint than 3-PRI)."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
v_true = 25.0
# SHORT + MEDIUM only (LONG dropped out, e.g. clutter).
v_meas = [_fold_v(v_true, v_unamb[0]), _fold_v(v_true, v_unamb[1])]
v_est, conf, _alias = unfold_velocity_crt(
v_meas, [v_unamb[0], v_unamb[1]], [v_res[0], v_res[1]],
)
self.assertAlmostEqual(v_est, v_true, places=1)
self.assertEqual(conf, "LIKELY")
def test_inconsistent_measurements_ambiguous_fallback(self):
"""Bogus per-PRI measurements that no fold reconciles → AMBIGUOUS, return PRI-0."""
from v7.processing import unfold_velocity_crt
v_unamb, v_res = self._vu_vr()
# Random per-PRI values that do not correspond to any v_true.
v_meas = [10.0, -30.0, 35.0]
v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertEqual(conf, "AMBIGUOUS")
self.assertAlmostEqual(v_est, 10.0, places=2) # PRI-0 fallback
def test_search_depth_covers_extended_ceiling(self):
"""K=6 covers ±extended_max_velocity_mps_crt ≈ 266 m/s."""
from v7.processing import unfold_velocity_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
v_unamb, v_res = self._vu_vr()
# Pick v_true near the advertised CRT ceiling.
v_true = wc.extended_max_velocity_mps_crt(max_alias_k=6) - 5.0 # ~261 m/s
v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res, max_alias_k=6)
self.assertAlmostEqual(v_est, v_true, places=0) # within 1 m/s
# Should still be CONFIRMED for a real velocity at this scale.
self.assertIn(conf, ("CONFIRMED", "LIKELY"))
# =============================================================================
# Test: v7.processing.extract_targets_from_frame_crt (PR-Q.5)
# =============================================================================
class TestExtractTargetsFromFrameCrt(unittest.TestCase):
"""3-PRI cluster extractor with CRT unfolding."""
def _make_frame(self, det_cells_with_mag=None):
"""Create RadarFrame; det_cells_with_mag is list of (rbin, dbin, mag)."""
from radar_protocol import RadarFrame
frame = RadarFrame()
if det_cells_with_mag:
for rbin, dbin, mag in det_cells_with_mag:
frame.detections[rbin, dbin] = 1
frame.magnitude[rbin, dbin] = mag
frame.detection_count = int(frame.detections.sum())
frame.timestamp = 1.0
return frame
def test_three_pri_target_confirmed(self):
"""Detection at rbin=10 in all 3 sub-frames at bin 3 → CONFIRMED, v ≈ 15 m/s."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
# bins 3 / 19 / 35 = sub-frame {0, 1, 2} bin-in-sf 3.
frame = self._make_frame([
(10, 3, 1000.0),
(10, 19, 800.0),
(10, 35, 1200.0),
])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
t = targets[0]
self.assertAlmostEqual(t.range, 10 * wc.range_resolution_m, places=1)
# bin 3 across PRIs maps to ~ 15 m/s (≈ 3 · v_res ≈ 15.3 / 16.6 / 16.0)
self.assertGreater(t.velocity, 12.0)
self.assertLess(t.velocity, 18.0)
self.assertEqual(t.velocity_confidence, "CONFIRMED")
self.assertIsNotNone(t.alias_set)
self.assertEqual(len(t.alias_set), 1)
def test_long_only_target_ambiguous(self):
"""Detection only in LONG sub-frame at rbin=20 → AMBIGUOUS, single-PRI v."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
# dbin = 32 + 5 = 37 → LONG sub-frame, bin 5 (positive).
frame = self._make_frame([(20, 37, 1500.0)])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
t = targets[0]
self.assertEqual(t.velocity_confidence, "AMBIGUOUS")
# v should be close to 5 · v_res_long ≈ 26.7 m/s
expected_v = 5.0 * wc.velocity_resolution_long_mps
self.assertAlmostEqual(t.velocity, expected_v, places=1)
def test_two_pri_target_likely(self):
"""Detection in SHORT + MEDIUM but not LONG → LIKELY."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
# bin 4 in SHORT (dbin=4), bin 4 in MEDIUM (dbin=20).
frame = self._make_frame([
(15, 4, 900.0),
(15, 20, 700.0),
])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
self.assertEqual(targets[0].velocity_confidence, "LIKELY")
def test_strongest_bin_per_subframe_picked(self):
"""Two detections in same sub-frame at same rbin: stronger one wins."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
# SHORT sub-frame: bins 3 (mag=500) and 5 (mag=1500) — bin 5 stronger.
# MEDIUM: bin 5 (mag=1200) — matches.
# LONG: bin 5 (mag=1100).
frame = self._make_frame([
(8, 3, 500.0),
(8, 5, 1500.0),
(8, 21, 1200.0),
(8, 37, 1100.0),
])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
t = targets[0]
# Expected v ≈ 5 · v_res ≈ 25.5 m/s (3-PRI CRT picks the bin-5 fold).
self.assertGreater(t.velocity, 23.0)
self.assertLess(t.velocity, 28.0)
self.assertEqual(t.velocity_confidence, "CONFIRMED")
def test_two_targets_at_different_ranges(self):
"""Two targets at distinct rbins → 2 RadarTargets, IDs sequential."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
frame = self._make_frame([
# Target A at rbin 5, all 3 sub-frames bin 2.
(5, 2, 800.0), (5, 18, 700.0), (5, 34, 750.0),
# Target B at rbin 30, all 3 sub-frames bin 12 (negative velocity).
(30, 12, 600.0), (30, 28, 550.0), (30, 44, 580.0),
])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 2)
self.assertEqual([t.id for t in targets], [0, 1])
# rbin 5 should come first (sorted), with positive v; rbin 30 negative.
self.assertGreater(targets[0].velocity, 0)
self.assertLess(targets[1].velocity, 0)
for t in targets:
self.assertEqual(t.velocity_confidence, "CONFIRMED")
def test_falls_back_to_legacy_for_non_48_bin_frame(self):
"""Frame with n_doppler != 48 → calls legacy extract_targets_from_frame."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
from radar_protocol import RadarFrame
# Synthesize a 32-bin frame manually.
frame = RadarFrame()
frame.detections = np.zeros((64, 32), dtype=np.uint8)
frame.magnitude = np.zeros((64, 32), dtype=np.float64)
frame.detections[5, 16] = 1 # legacy center
frame.magnitude[5, 16] = 1000.0
frame.detection_count = 1
frame.timestamp = 1.0
wc = WaveformConfig()
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
# Legacy path → velocity_confidence stays default "UNKNOWN".
self.assertEqual(targets[0].velocity_confidence, "UNKNOWN")
self.assertIsNone(targets[0].alias_set)
def test_no_detections_returns_empty(self):
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
frame = self._make_frame([])
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(targets, [])
def test_gps_georef_with_crt(self):
"""GPS-georef populates lat/lon (smoke test)."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig, GPSData
wc = WaveformConfig()
gps = GPSData(latitude=41.9, longitude=12.5, altitude=0.0,
pitch=0.0, heading=90.0)
frame = self._make_frame([(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0)])
targets = extract_targets_from_frame_crt(frame, wc, gps=gps)
self.assertEqual(len(targets), 1)
self.assertAlmostEqual(targets[0].latitude, 41.9, places=2)
self.assertGreater(targets[0].longitude, 12.5)
class TestCrtSubframeMaskGating(unittest.TestCase):
"""PR-U / M-8: CRT downgrades confidence to AMBIGUOUS when SF mask != 0b111."""
def _make_3pri_frame(self, subframe_enable: int):
from radar_protocol import RadarFrame
frame = RadarFrame()
# Detection at rbin=10 in all 3 sub-frames at bin 3 — would normally
# CONFIRM, but a non-default mask must force AMBIGUOUS.
for rbin, dbin, mag in [(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0)]:
frame.detections[rbin, dbin] = 1
frame.magnitude[rbin, dbin] = mag
frame.detection_count = int(frame.detections.sum())
frame.timestamp = 1.0
frame.subframe_enable = subframe_enable
return frame
def test_default_mask_keeps_confirmed_path(self):
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
frame = self._make_3pri_frame(0b111)
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
self.assertEqual(targets[0].velocity_confidence, "CONFIRMED")
def test_short_disabled_forces_ambiguous(self):
"""SHORT off → CRT can't trust dbin // 16 attribution → AMBIGUOUS."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
frame = self._make_3pri_frame(0b110)
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
self.assertEqual(targets[0].velocity_confidence, "AMBIGUOUS")
def test_long_only_forces_ambiguous(self):
"""LONG only mask: scheduler skips SHORT+MEDIUM, all targets AMBIGUOUS."""
from v7.processing import extract_targets_from_frame_crt
from v7.models import WaveformConfig
wc = WaveformConfig()
frame = self._make_3pri_frame(0b100)
targets = extract_targets_from_frame_crt(frame, wc)
self.assertEqual(len(targets), 1)
self.assertEqual(targets[0].velocity_confidence, "AMBIGUOUS")
# =============================================================================
# Test: PR-Q.6 — workers route through extract_targets_from_frame_crt
# RadarDataWorker._run_host_dsp + ReplayWorker._extract_targets must use the
# 3-PRI CRT extractor, not the legacy single-PRI placeholder.
# =============================================================================
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestWorkersRouteThroughCrt(unittest.TestCase):
"""Audit P-6: live and replay paths must use CRT extractor on 48-bin frames."""
def _make_48bin_frame(self, det_cells_with_mag):
from radar_protocol import RadarFrame
frame = RadarFrame()
for rbin, dbin, mag in det_cells_with_mag:
frame.detections[rbin, dbin] = 1
frame.magnitude[rbin, dbin] = mag
frame.detection_count = int(frame.detections.sum())
frame.timestamp = 1.0
return frame
def test_radar_data_worker_run_host_dsp_uses_crt(self):
"""3-sub-frame detection → target with CRT confidence (not UNKNOWN)."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = True
proc.set_config(cfg)
worker = RadarDataWorker(connection=None, processor=proc)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
targets = worker._run_host_dsp(frame)
self.assertEqual(len(targets), 1)
# Legacy path returned UNKNOWN; CRT returns CONFIRMED for 3-PRI.
self.assertEqual(targets[0].velocity_confidence, "CONFIRMED")
self.assertIsNotNone(targets[0].alias_set)
def test_radar_data_worker_pitch_correction_applied_post_crt(self):
"""GPS pitch is applied to elevation after CRT extraction."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig, GPSData
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = True
proc.set_config(cfg)
gps = GPSData(latitude=0.0, longitude=0.0, altitude=0.0,
pitch=12.5, heading=0.0)
worker = RadarDataWorker(connection=None, processor=proc,
gps_data_ref=gps)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
targets = worker._run_host_dsp(frame)
self.assertEqual(len(targets), 1)
# apply_pitch_correction(raw=0.0, pitch=12.5) = raw - pitch = -12.5.
self.assertAlmostEqual(targets[0].elevation, -12.5, places=2)
def test_radar_data_worker_skips_dsp_when_both_disabled(self):
"""When clustering and tracking are off, _run_host_dsp returns []."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = False
proc.set_config(cfg)
worker = RadarDataWorker(connection=None, processor=proc)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
self.assertEqual(worker._run_host_dsp(frame), [])
def test_replay_worker_extract_bound_to_crt(self):
"""ReplayWorker._extract_targets must be the CRT function, not legacy."""
from v7.workers import ReplayWorker
from v7.processing import extract_targets_from_frame_crt
class _DummyEngine:
total_frames = 0
worker = ReplayWorker(replay_engine=_DummyEngine())
self.assertIs(worker._extract_targets, extract_targets_from_frame_crt)
# =============================================================================
# Test: PR-Q.7 / audit M-1 — dashboard confidence column display helper
# =============================================================================
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestDashboardConfidenceDisplay(unittest.TestCase):
"""_confidence_display maps RadarTarget.velocity_confidence to (text, QColor)."""
def test_confirmed_is_green_no_prefix(self):
from v7.dashboard import _confidence_display
from v7.models import DARK_SUCCESS
text, color = _confidence_display("CONFIRMED")
self.assertEqual(text, "CONFIRMED")
self.assertEqual(color.name().upper(), DARK_SUCCESS.upper())
def test_likely_is_amber(self):
from v7.dashboard import _confidence_display
from v7.models import DARK_WARNING
text, color = _confidence_display("LIKELY")
self.assertEqual(text, "LIKELY")
self.assertEqual(color.name().upper(), DARK_WARNING.upper())
def test_ambiguous_gets_question_mark_prefix_and_red(self):
from v7.dashboard import _confidence_display
from v7.models import DARK_ERROR
text, color = _confidence_display("AMBIGUOUS")
self.assertTrue(text.startswith("?"),
"AMBIGUOUS must lead with '?' so it's visible without color")
self.assertIn("AMBIGUOUS", text)
self.assertEqual(color.name().upper(), DARK_ERROR.upper())
def test_unknown_falls_back_to_text_color(self):
from v7.dashboard import _confidence_display
from v7.models import DARK_TEXT
text, color = _confidence_display("UNKNOWN")
self.assertEqual(text, "UNKNOWN")
self.assertEqual(color.name().upper(), DARK_TEXT.upper())
def test_unrecognised_label_falls_through_to_unknown(self):
from v7.dashboard import _confidence_display
from v7.models import DARK_TEXT
text, color = _confidence_display("BANANA")
self.assertEqual(text, "UNKNOWN")
self.assertEqual(color.name().upper(), DARK_TEXT.upper())
# =============================================================================
# Test: PR-R / audit M-2..M-7 — host control surface fill-in
# =============================================================================
class TestOpcodeEnumFillIn(unittest.TestCase):
"""M-2 + M-3: enum gains MEDIUM_CHIRP, MEDIUM_LISTEN, CFAR_ALPHA_SOFT, ADC_PWDN."""
def test_medium_chirp_listen_opcodes(self):
from radar_protocol import Opcode
self.assertEqual(Opcode.MEDIUM_CHIRP.value, 0x17)
self.assertEqual(Opcode.MEDIUM_LISTEN.value, 0x18)
def test_cfar_alpha_soft_opcode(self):
from radar_protocol import Opcode
self.assertEqual(Opcode.CFAR_ALPHA_SOFT.value, 0x2D)
def test_adc_pwdn_opcode(self):
from radar_protocol import Opcode
self.assertEqual(Opcode.ADC_PWDN.value, 0x32)
def test_adc_format_opcode_unchanged(self):
from radar_protocol import Opcode
self.assertEqual(Opcode.ADC_FORMAT.value, 0x33)
def test_subframe_enable_opcode(self):
"""PR-U / M-8: 0x19 sets host_subframe_enable mask."""
from radar_protocol import Opcode
self.assertEqual(Opcode.SUBFRAME_ENABLE.value, 0x19)
def test_no_duplicate_opcodes(self):
"""All Opcode values are unique (catches accidental collisions)."""
from radar_protocol import Opcode
values = [op.value for op in Opcode]
self.assertEqual(len(values), len(set(values)),
"duplicate opcode values would silently shadow earlier entries")
class TestSoftwareFpgaCfarAlphaSoft(unittest.TestCase):
"""M-6: SoftwareFPGA mirrors the soft-tier alpha and clamps to 8 bits."""
def test_default(self):
from v7.software_fpga import SoftwareFPGA
fpga = SoftwareFPGA()
self.assertEqual(fpga.cfar_alpha_soft, 0x18) # RP_DEF_CFAR_ALPHA_SOFT
def test_setter_masks_to_8_bits(self):
from v7.software_fpga import SoftwareFPGA
fpga = SoftwareFPGA()
fpga.set_cfar_alpha_soft(0x1234)
self.assertEqual(fpga.cfar_alpha_soft, 0x34)
class TestSoftwareFpgaSubframeEnable(unittest.TestCase):
"""PR-U / M-8: SoftwareFPGA mirrors host_subframe_enable, masks to 3 bits."""
def test_default(self):
from v7.software_fpga import SoftwareFPGA
fpga = SoftwareFPGA()
self.assertEqual(fpga.subframe_enable, 0b111) # RP_DEF_SUBFRAME_ENABLE
def test_setter_masks_to_3_bits(self):
from v7.software_fpga import SoftwareFPGA
fpga = SoftwareFPGA()
fpga.set_subframe_enable(0xFE)
self.assertEqual(fpga.subframe_enable, 0b110)
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestReplayOpcodeDispatch(unittest.TestCase):
"""M-6: replay dispatch routes 0x2D to SoftwareFPGA + acknowledges inert opcodes."""
def _dashboard_with_replay(self):
"""Build a minimal dashboard-like object: just what _dispatch_to_software_fpga needs."""
from v7.software_fpga import SoftwareFPGA
from v7.dashboard import RadarDashboard
# Bypass full QMainWindow init — call the unbound method against a
# fake `self` that only carries the two attributes the dispatch reads.
class _Fake:
pass
fake = _Fake()
fake._software_fpga = SoftwareFPGA()
return RadarDashboard._dispatch_to_software_fpga, fake
def test_0x2d_routed_to_set_cfar_alpha_soft(self):
dispatch, fake = self._dashboard_with_replay()
dispatch(fake, 0x2D, 42)
self.assertEqual(fake._software_fpga.cfar_alpha_soft, 42)
def test_0x19_routed_to_set_subframe_enable(self):
"""PR-U / M-8: 0x19 lands on SoftwareFPGA.set_subframe_enable."""
dispatch, fake = self._dashboard_with_replay()
dispatch(fake, 0x19, 0b101)
self.assertEqual(fake._software_fpga.subframe_enable, 0b101)
def test_inert_opcode_does_not_raise(self):
"""Inert opcodes (e.g. 0x32 ADC_PWDN) accepted without exception."""
dispatch, fake = self._dashboard_with_replay()
for inert in (0x10, 0x15, 0x17, 0x18, 0x20, 0x32, 0x33, 0xFF):
dispatch(fake, inert, 1) # should not raise
def test_unknown_opcode_does_not_raise(self):
dispatch, fake = self._dashboard_with_replay()
dispatch(fake, 0xEE, 0) # unmapped — debug-log only, no exception
# =============================================================================
# Test: live vs replay physical-unit parity — regression guard for unit drift
#
# Origin: f895c02 (Serhii <jshmitz@me.com>) — adapted for the post-PR-Q.6
# code structure where both worker paths delegate to
# extract_targets_from_frame_crt instead of doing in-method bin->units math.
#
# Uses AST parse of workers.py (not inspect.getsource / import) so the test
# runs in headless CI without PyQt6 — v7.workers imports PyQt6 unconditionally,
# and contract enforcement must not be gated on GUI deps.
#
# Asserts on AST nodes (Call / Attribute), not source substrings, so false-pass
# on comments or docstring wording is impossible.
# =============================================================================
class TestLiveReplayPhysicalUnitsParity(unittest.TestCase):
"""Live (RadarDataWorker._run_host_dsp) and replay (ReplayWorker._emit_frame)
must both delegate detection->target conversion to
extract_targets_from_frame_crt with self._waveform as the units source.
Regression context: before PR-Q.6, RadarDataWorker computed velocity from
self._settings.velocity_resolution (RadarSettings default 1.0 m/s/bin)
while ReplayWorker used WaveformConfig (~5.343 m/s/bin) — live GUI
under-reported velocity by ~5.34x. PR-Q.6 unified both paths through
extract_targets_from_frame_crt(frame, self._waveform, ...). This test
fails if either path drifts back to per-method bin->units math or to
self._settings-based resolution.
"""
@staticmethod
def _parse_method(class_name: str, method_name: str):
"""Return AST FunctionDef for class_name.method_name from workers.py
without importing v7.workers (PyQt6-independent)."""
import ast
from pathlib import Path
path = Path(__file__).parent / "v7" / "workers.py"
tree = ast.parse(path.read_text(encoding="utf-8"))
for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == method_name:
return item
raise RuntimeError(f"{class_name}.{method_name} not found in workers.py")
@staticmethod
def _calls_extract_with_self_waveform(tree):
"""True if tree contains a call to extract_targets_from_frame_crt
(or self._extract_targets, which ReplayWorker.__init__ binds to it)
passing self._waveform as a positional or keyword argument."""
import ast
target_names = {"extract_targets_from_frame_crt", "_extract_targets"}
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
f = node.func
name = f.id if isinstance(f, ast.Name) else (
f.attr if isinstance(f, ast.Attribute) else None
)
if name not in target_names:
continue
args_iter = list(node.args) + [kw.value for kw in node.keywords]
for arg in args_iter:
if (isinstance(arg, ast.Attribute)
and arg.attr == "_waveform"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"):
return True
return False
@staticmethod
def _reads_settings_resolution(tree):
"""True if tree reads self._settings.velocity_resolution or
self._settings.range_resolution — the regression pattern."""
import ast
for node in ast.walk(tree):
if not (isinstance(node, ast.Attribute)
and node.attr in ("velocity_resolution", "range_resolution")):
continue
if (isinstance(node.value, ast.Attribute)
and node.value.attr == "_settings"
and isinstance(node.value.value, ast.Name)
and node.value.value.id == "self"):
return True
return False
def test_live_path_uses_self_waveform_for_extraction(self):
method = self._parse_method("RadarDataWorker", "_run_host_dsp")
self.assertTrue(
self._calls_extract_with_self_waveform(method),
"RadarDataWorker._run_host_dsp must call "
"extract_targets_from_frame_crt(..., self._waveform, ...) — "
"see f895c02 / PR-Q.6 for context.",
)
def test_live_path_does_not_read_settings_resolution(self):
method = self._parse_method("RadarDataWorker", "_run_host_dsp")
self.assertFalse(
self._reads_settings_resolution(method),
"RadarDataWorker._run_host_dsp must NOT read "
"self._settings.{velocity,range}_resolution — those default to 1.0 "
"(RadarSettings) and produce ~5.34x velocity under-reporting vs "
"replay. Use self._waveform via extract_targets_from_frame_crt.",
)
def test_replay_path_uses_self_waveform_for_extraction(self):
method = self._parse_method("ReplayWorker", "_emit_frame")
self.assertTrue(
self._calls_extract_with_self_waveform(method),
"ReplayWorker._emit_frame must call "
"self._extract_targets(..., self._waveform, ...) "
"(where _extract_targets = extract_targets_from_frame_crt at "
"__init__) — see f895c02 / PR-Q.6 for context.",
)
def test_replay_path_does_not_read_settings_resolution(self):
method = self._parse_method("ReplayWorker", "_emit_frame")
self.assertFalse(
self._reads_settings_resolution(method),
"ReplayWorker._emit_frame must NOT read "
"self._settings.{velocity,range}_resolution — use self._waveform "
"via extract_targets_from_frame_crt.",
)
# =============================================================================
# Helper: lazy import of v7.models
# =============================================================================
def _models():
import v7.models
return v7.models
if __name__ == "__main__":
unittest.main()