fix(gui): P-6 / PR-Q.6 — workers route detections through CRT extractor (C-5)

Both live and replay paths used the legacy single-PRI extractor with the
LONG-PRI v_res placeholder, which yielded wrong velocities for the SHORT
and MEDIUM sub-frames. PR-Q.5 already provided extract_targets_from_frame_crt
(48-bin, 3-PRI Chinese-Remainder-Theorem unfolding) — this PR wires it in.

Changes:
  - workers.py imports extract_targets_from_frame_crt at module scope.
  - RadarDataWorker._run_host_dsp delegates to the CRT extractor and then
    applies GPS pitch correction + DBSCAN clustering + Kalman tracking
    on the returned targets. Inline det_indices loop and
    velocity_resolution_long_mps placeholder removed.
  - ReplayWorker.__init__ binds _extract_targets to the CRT extractor;
    _emit_frame call simplifies to (frame, waveform, gps=).
  - 32-bin legacy recordings still work via the CRT extractor's internal
    fallback to extract_targets_from_frame.
  - Module docstring stale "(64x32)" -> "(512x48)".
  - Dropped unused `import numpy as np` from workers.py (no remaining users).

Tests (TestWorkersRouteThroughCrt, +4):
  - 3-PRI detection produces CONFIRMED + alias_set (was UNKNOWN before).
  - GPS pitch correction applied post-CRT to elevation.
  - Both clustering+tracking off → returns [] (no DSP work).
  - ReplayWorker._extract_targets is exactly the CRT function reference.

Regression: 232/232 (test_v7 115 + test_GUI_V65_Tk 117). Ruff clean.
Closes audit P-6 / task PR-Q.6 — C-5 host wiring complete (PR-Q.7 dashboard
display column is the remaining piece).
This commit is contained in:
Jason
2026-05-02 16:47:05 +05:45
parent b505266f33
commit 3401d05eca
2 changed files with 117 additions and 82 deletions
+89
View File
@@ -1492,6 +1492,95 @@ class TestExtractTargetsFromFrameCrt(unittest.TestCase):
self.assertGreater(targets[0].longitude, 12.5)
# =============================================================================
# Test: PR-Q.6 — workers route through extract_targets_from_frame_crt
# RadarDataWorker._run_host_dsp + ReplayWorker._extract_targets must use the
# 3-PRI CRT extractor, not the legacy single-PRI placeholder.
# =============================================================================
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
class TestWorkersRouteThroughCrt(unittest.TestCase):
"""Audit P-6: live and replay paths must use CRT extractor on 48-bin frames."""
def _make_48bin_frame(self, det_cells_with_mag):
from radar_protocol import RadarFrame
frame = RadarFrame()
for rbin, dbin, mag in det_cells_with_mag:
frame.detections[rbin, dbin] = 1
frame.magnitude[rbin, dbin] = mag
frame.detection_count = int(frame.detections.sum())
frame.timestamp = 1.0
return frame
def test_radar_data_worker_run_host_dsp_uses_crt(self):
"""3-sub-frame detection → target with CRT confidence (not UNKNOWN)."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = True
proc.set_config(cfg)
worker = RadarDataWorker(connection=None, processor=proc)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
targets = worker._run_host_dsp(frame)
self.assertEqual(len(targets), 1)
# Legacy path returned UNKNOWN; CRT returns CONFIRMED for 3-PRI.
self.assertEqual(targets[0].velocity_confidence, "CONFIRMED")
self.assertIsNotNone(targets[0].alias_set)
def test_radar_data_worker_pitch_correction_applied_post_crt(self):
"""GPS pitch is applied to elevation after CRT extraction."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig, GPSData
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = True
proc.set_config(cfg)
gps = GPSData(latitude=0.0, longitude=0.0, altitude=0.0,
pitch=12.5, heading=0.0)
worker = RadarDataWorker(connection=None, processor=proc,
gps_data_ref=gps)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
targets = worker._run_host_dsp(frame)
self.assertEqual(len(targets), 1)
# apply_pitch_correction(raw=0.0, pitch=12.5) = raw - pitch = -12.5.
self.assertAlmostEqual(targets[0].elevation, -12.5, places=2)
def test_radar_data_worker_skips_dsp_when_both_disabled(self):
"""When clustering and tracking are off, _run_host_dsp returns []."""
from v7.workers import RadarDataWorker
from v7.processing import RadarProcessor
from v7.models import ProcessingConfig
proc = RadarProcessor()
cfg = ProcessingConfig()
cfg.clustering_enabled = False
cfg.tracking_enabled = False
proc.set_config(cfg)
worker = RadarDataWorker(connection=None, processor=proc)
frame = self._make_48bin_frame([
(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0),
])
self.assertEqual(worker._run_host_dsp(frame), [])
def test_replay_worker_extract_bound_to_crt(self):
"""ReplayWorker._extract_targets must be the CRT function, not legacy."""
from v7.workers import ReplayWorker
from v7.processing import extract_targets_from_frame_crt
class _DummyEngine:
total_frames = 0
worker = ReplayWorker(replay_engine=_DummyEngine())
self.assertIs(worker._extract_targets, extract_targets_from_frame_crt)
# =============================================================================
# Helper: lazy import of v7.models
# =============================================================================