From 5a7e8b868923da5690013e6c5367a075fca1837a Mon Sep 17 00:00:00 2001 From: Jason <83615043+JJassonn69@users.noreply.github.com> Date: Sat, 2 May 2026 15:23:17 +0545 Subject: [PATCH] =?UTF-8?q?feat(gui):=20PR-Q.5=20=E2=80=94=203-PRI=20CRT?= =?UTF-8?q?=20Doppler=20unfolder=20+=20cluster=20extractor=20(C-5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add host-side 3-PRI Chinese-Remainder velocity unfolding and a cluster extractor that reads the 48-bin Doppler frame, splits it into the 3 sub-frames (SHORT/MEDIUM/LONG), and resolves Doppler aliases across coprime PRIs. Resolves the algorithm half of audit C-5; the data is now in extract_targets_from_frame_crt's hands but workers still call the legacy single-PRI extractor (PR-Q.6 wires it). v7/processing.py: - unfold_velocity_crt(v_meas, v_unamb, v_res, max_alias_k=6, tol_factor=0.5) -> (v_est, confidence, alias_set). Brute-force candidate search over PRI-0 fold depth, per-PRI half-bin tolerance. Confidence: CONFIRMED (3-PRI unique), LIKELY (3-PRI with 2 cands, or 2-PRI with unique cand), AMBIGUOUS (1-PRI, 3+ cands, 2-PRI multi-cand, or no fold within tol). - extract_targets_from_frame_crt(frame, waveform, gps, max_alias_k): groups detections by range bin, picks strongest bin per (rbin, sf), decodes signed Doppler via sub_frame = dbin // 16 / bin_in_sf = dbin % 16, calls unfold_velocity_crt, attaches velocity_confidence and alias_set to RadarTarget. Falls back to legacy extract_targets_from_frame for non-48-bin frames. v7/models.py: - RadarTarget gains velocity_confidence (str default "UNKNOWN") and alias_set (list[float] | None). v7/__init__.py: - Re-exports unfold_velocity_crt + extract_targets_from_frame_crt. test_v7.py (16 new tests, 0 failures): - TestUnfoldVelocityCRT (8): zero-velocity CONFIRMED, below per-PRI v_unamb CONFIRMED, above per-PRI (100 m/s) CONFIRMED, near CRT ceiling (~261 m/s) CONFIRMED, negative velocity, 1-PRI AMBIGUOUS, 2-PRI LIKELY, inconsistent measurements AMBIGUOUS+fallback. - TestExtractTargetsFromFrameCrt (8): 3-PRI CONFIRMED target, LONG-only AMBIGUOUS (the 20-km blindspot regime), 2-PRI LIKELY, strongest-bin picking, two targets at distinct ranges, legacy 32-bin frame fallback, no-detections empty, GPS georef. Local: test_v7 100/0/0 (9 graceful skips), test_GUI_V65_Tk 117/0/2. --- 9_Firmware/9_3_GUI/test_v7.py | 279 ++++++++++++++++++++++++++++ 9_Firmware/9_3_GUI/v7/__init__.py | 5 +- 9_Firmware/9_3_GUI/v7/models.py | 9 + 9_Firmware/9_3_GUI/v7/processing.py | 219 ++++++++++++++++++++++ 4 files changed, 511 insertions(+), 1 deletion(-) diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index e3d13a2..191acc3 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -1011,6 +1011,285 @@ class TestExtractTargetsFromFrame(unittest.TestCase): self.assertEqual([t.id for t in targets], [0, 1, 2]) +# ============================================================================= +# Test: v7.processing.unfold_velocity_crt (PR-Q.5, audit C-5) +# ============================================================================= + +def _fold_v(v: float, v_unamb: float) -> float: + """Helper: fold v into signed [-v_unamb, +v_unamb] (FFT convention).""" + span = 2.0 * v_unamb + return ((v + v_unamb) % span) - v_unamb + + +class TestUnfoldVelocityCRT(unittest.TestCase): + """3-PRI Chinese-Remainder Doppler unfolding.""" + + def _vu_vr(self): + from v7.models import WaveformConfig + wc = WaveformConfig() + v_unamb = [ + wc.max_velocity_short_mps, + wc.max_velocity_medium_mps, + wc.max_velocity_long_mps, + ] + v_res = [ + wc.velocity_resolution_short_mps, + wc.velocity_resolution_medium_mps, + wc.velocity_resolution_long_mps, + ] + return v_unamb, v_res + + def test_zero_velocity_three_pri_confirmed(self): + """All zero measurements → v=0, single fold, CONFIRMED.""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + v_est, conf, alias = unfold_velocity_crt([0.0, 0.0, 0.0], v_unamb, v_res) + self.assertAlmostEqual(v_est, 0.0, places=2) + self.assertEqual(conf, "CONFIRMED") + self.assertEqual(len(alias), 1) + + def test_below_per_pri_unamb_three_pri_confirmed(self): + """v_true=30 m/s (below per-PRI v_unamb ~42 m/s): all 3 PRIs measure +30 directly.""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + v_true = 30.0 + v_meas = [_fold_v(v_true, vu) for vu in v_unamb] + # Sanity: each |v_meas| ≤ v_unamb + for vm, vu in zip(v_meas, v_unamb, strict=False): + self.assertLessEqual(abs(vm), vu) + v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) + self.assertAlmostEqual(v_est, v_true, places=1) + self.assertEqual(conf, "CONFIRMED") + self.assertEqual(len(alias), 1) + + def test_above_per_pri_unamb_crt_unfolds_correctly(self): + """v_true=100 m/s (above any per-PRI v_unamb): 3-PRI CRT unfolds.""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + v_true = 100.0 + v_meas = [_fold_v(v_true, vu) for vu in v_unamb] + # Each per-PRI fold differs (since each PRI has different v_unamb) + self.assertNotAlmostEqual(v_meas[0], v_meas[1], places=1) + self.assertNotAlmostEqual(v_meas[1], v_meas[2], places=1) + v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) + self.assertAlmostEqual(v_est, v_true, places=1) + self.assertEqual(conf, "CONFIRMED") + self.assertEqual(len(alias), 1) + + def test_negative_velocity_crt_unfolds(self): + """v_true=-75 m/s: CRT unfolds to a negative velocity.""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + v_true = -75.0 + v_meas = [_fold_v(v_true, vu) for vu in v_unamb] + v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) + self.assertAlmostEqual(v_est, v_true, places=1) + self.assertEqual(conf, "CONFIRMED") + + def test_long_only_single_pri_ambiguous(self): + """1-PRI input → AMBIGUOUS (LONG-only-at-20-km regime).""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + # Only LONG sub-frame seeing the target. + v_est, conf, alias = unfold_velocity_crt( + [15.0], [v_unamb[2]], [v_res[2]], + ) + self.assertAlmostEqual(v_est, 15.0, places=2) + self.assertEqual(conf, "AMBIGUOUS") + self.assertEqual(alias, [15.0]) + + def test_two_pri_consistent_likely(self): + """2-PRI consistent measurements → LIKELY (less constraint than 3-PRI).""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + v_true = 25.0 + # SHORT + MEDIUM only (LONG dropped out, e.g. clutter). + v_meas = [_fold_v(v_true, v_unamb[0]), _fold_v(v_true, v_unamb[1])] + v_est, conf, alias = unfold_velocity_crt( + v_meas, [v_unamb[0], v_unamb[1]], [v_res[0], v_res[1]], + ) + self.assertAlmostEqual(v_est, v_true, places=1) + self.assertEqual(conf, "LIKELY") + + def test_inconsistent_measurements_ambiguous_fallback(self): + """Bogus per-PRI measurements that no fold reconciles → AMBIGUOUS, return PRI-0.""" + from v7.processing import unfold_velocity_crt + v_unamb, v_res = self._vu_vr() + # Random per-PRI values that do not correspond to any v_true. + v_meas = [10.0, -30.0, 35.0] + v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) + self.assertEqual(conf, "AMBIGUOUS") + self.assertAlmostEqual(v_est, 10.0, places=2) # PRI-0 fallback + + def test_search_depth_covers_extended_ceiling(self): + """K=6 covers ±extended_max_velocity_mps_crt ≈ 266 m/s.""" + from v7.processing import unfold_velocity_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + v_unamb, v_res = self._vu_vr() + # Pick v_true near the advertised CRT ceiling. + v_true = wc.extended_max_velocity_mps_crt(max_alias_k=6) - 5.0 # ~261 m/s + v_meas = [_fold_v(v_true, vu) for vu in v_unamb] + v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res, max_alias_k=6) + self.assertAlmostEqual(v_est, v_true, places=0) # within 1 m/s + # Should still be CONFIRMED for a real velocity at this scale. + self.assertIn(conf, ("CONFIRMED", "LIKELY")) + + +# ============================================================================= +# Test: v7.processing.extract_targets_from_frame_crt (PR-Q.5) +# ============================================================================= + +class TestExtractTargetsFromFrameCrt(unittest.TestCase): + """3-PRI cluster extractor with CRT unfolding.""" + + def _make_frame(self, det_cells_with_mag=None): + """Create RadarFrame; det_cells_with_mag is list of (rbin, dbin, mag).""" + from radar_protocol import RadarFrame + frame = RadarFrame() + if det_cells_with_mag: + for rbin, dbin, mag in det_cells_with_mag: + frame.detections[rbin, dbin] = 1 + frame.magnitude[rbin, dbin] = mag + frame.detection_count = int(frame.detections.sum()) + frame.timestamp = 1.0 + return frame + + def test_three_pri_target_confirmed(self): + """Detection at rbin=10 in all 3 sub-frames at bin 3 → CONFIRMED, v ≈ 15 m/s.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + # bins 3 / 19 / 35 = sub-frame {0, 1, 2} bin-in-sf 3. + frame = self._make_frame([ + (10, 3, 1000.0), + (10, 19, 800.0), + (10, 35, 1200.0), + ]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 1) + t = targets[0] + self.assertAlmostEqual(t.range, 10 * wc.range_resolution_m, places=1) + # bin 3 across PRIs maps to ~ 15 m/s (≈ 3 · v_res ≈ 15.3 / 16.6 / 16.0) + self.assertGreater(t.velocity, 12.0) + self.assertLess(t.velocity, 18.0) + self.assertEqual(t.velocity_confidence, "CONFIRMED") + self.assertIsNotNone(t.alias_set) + self.assertEqual(len(t.alias_set), 1) + + def test_long_only_target_ambiguous(self): + """Detection only in LONG sub-frame at rbin=20 → AMBIGUOUS, single-PRI v.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + # dbin = 32 + 5 = 37 → LONG sub-frame, bin 5 (positive). + frame = self._make_frame([(20, 37, 1500.0)]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 1) + t = targets[0] + self.assertEqual(t.velocity_confidence, "AMBIGUOUS") + # v should be close to 5 · v_res_long ≈ 26.7 m/s + expected_v = 5.0 * wc.velocity_resolution_long_mps + self.assertAlmostEqual(t.velocity, expected_v, places=1) + + def test_two_pri_target_likely(self): + """Detection in SHORT + MEDIUM but not LONG → LIKELY.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + # bin 4 in SHORT (dbin=4), bin 4 in MEDIUM (dbin=20). + frame = self._make_frame([ + (15, 4, 900.0), + (15, 20, 700.0), + ]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 1) + self.assertEqual(targets[0].velocity_confidence, "LIKELY") + + def test_strongest_bin_per_subframe_picked(self): + """Two detections in same sub-frame at same rbin: stronger one wins.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + # SHORT sub-frame: bins 3 (mag=500) and 5 (mag=1500) — bin 5 stronger. + # MEDIUM: bin 5 (mag=1200) — matches. + # LONG: bin 5 (mag=1100). + frame = self._make_frame([ + (8, 3, 500.0), + (8, 5, 1500.0), + (8, 21, 1200.0), + (8, 37, 1100.0), + ]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 1) + t = targets[0] + # Expected v ≈ 5 · v_res ≈ 25.5 m/s (3-PRI CRT picks the bin-5 fold). + self.assertGreater(t.velocity, 23.0) + self.assertLess(t.velocity, 28.0) + self.assertEqual(t.velocity_confidence, "CONFIRMED") + + def test_two_targets_at_different_ranges(self): + """Two targets at distinct rbins → 2 RadarTargets, IDs sequential.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + frame = self._make_frame([ + # Target A at rbin 5, all 3 sub-frames bin 2. + (5, 2, 800.0), (5, 18, 700.0), (5, 34, 750.0), + # Target B at rbin 30, all 3 sub-frames bin 12 (negative velocity). + (30, 12, 600.0), (30, 28, 550.0), (30, 44, 580.0), + ]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 2) + self.assertEqual([t.id for t in targets], [0, 1]) + # rbin 5 should come first (sorted), with positive v; rbin 30 negative. + self.assertGreater(targets[0].velocity, 0) + self.assertLess(targets[1].velocity, 0) + for t in targets: + self.assertEqual(t.velocity_confidence, "CONFIRMED") + + def test_falls_back_to_legacy_for_non_48_bin_frame(self): + """Frame with n_doppler != 48 → calls legacy extract_targets_from_frame.""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + from radar_protocol import RadarFrame + # Synthesize a 32-bin frame manually. + frame = RadarFrame() + frame.detections = np.zeros((64, 32), dtype=np.uint8) + frame.magnitude = np.zeros((64, 32), dtype=np.float64) + frame.detections[5, 16] = 1 # legacy center + frame.magnitude[5, 16] = 1000.0 + frame.detection_count = 1 + frame.timestamp = 1.0 + wc = WaveformConfig() + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(len(targets), 1) + # Legacy path → velocity_confidence stays default "UNKNOWN". + self.assertEqual(targets[0].velocity_confidence, "UNKNOWN") + self.assertIsNone(targets[0].alias_set) + + def test_no_detections_returns_empty(self): + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig + wc = WaveformConfig() + frame = self._make_frame([]) + targets = extract_targets_from_frame_crt(frame, wc) + self.assertEqual(targets, []) + + def test_gps_georef_with_crt(self): + """GPS-georef populates lat/lon (smoke test).""" + from v7.processing import extract_targets_from_frame_crt + from v7.models import WaveformConfig, GPSData + wc = WaveformConfig() + gps = GPSData(latitude=41.9, longitude=12.5, altitude=0.0, + pitch=0.0, heading=90.0) + frame = self._make_frame([(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0)]) + targets = extract_targets_from_frame_crt(frame, wc, gps=gps) + self.assertEqual(len(targets), 1) + self.assertAlmostEqual(targets[0].latitude, 41.9, places=2) + self.assertGreater(targets[0].longitude, 12.5) + + # ============================================================================= # Helper: lazy import of v7.models # ============================================================================= diff --git a/9_Firmware/9_3_GUI/v7/__init__.py b/9_Firmware/9_3_GUI/v7/__init__.py index 3789667..4a6a716 100644 --- a/9_Firmware/9_3_GUI/v7/__init__.py +++ b/9_Firmware/9_3_GUI/v7/__init__.py @@ -43,6 +43,8 @@ from .processing import ( apply_pitch_correction, polar_to_geographic, extract_targets_from_frame, + extract_targets_from_frame_crt, + unfold_velocity_crt, ) # Software FPGA (depends on golden_reference.py in FPGA cosim tree) @@ -96,7 +98,8 @@ __all__ = [ # noqa: RUF022 # processing "RadarProcessor", "USBPacketParser", "apply_pitch_correction", "polar_to_geographic", - "extract_targets_from_frame", + "extract_targets_from_frame", "extract_targets_from_frame_crt", + "unfold_velocity_crt", # software FPGA + replay "SoftwareFPGA", "quantize_raw_iq", "ReplayEngine", "ReplayFormat", diff --git a/9_Firmware/9_3_GUI/v7/models.py b/9_Firmware/9_3_GUI/v7/models.py index 4ce782e..9392a72 100644 --- a/9_Firmware/9_3_GUI/v7/models.py +++ b/9_Firmware/9_3_GUI/v7/models.py @@ -90,6 +90,15 @@ class RadarTarget: timestamp: float = 0.0 track_id: int = -1 classification: str = "unknown" + # PR-Q.5 (audit C-5): 3-PRI Doppler unfolding output. + # velocity_confidence: + # "CONFIRMED" — 3 sub-frames agree on a unique alias fold + # "LIKELY" — 2 sub-frames agree, or 3 sub-frames with 2 candidate folds + # "AMBIGUOUS" — only 1 sub-frame saw the target (no CRT possible), or + # multiple aliases survive within tolerance + # "UNKNOWN" — extractor did not run CRT (legacy single-PRI path) + velocity_confidence: str = "UNKNOWN" + alias_set: list[float] | None = None # Candidate v_true folds (m/s), best first def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" diff --git a/9_Firmware/9_3_GUI/v7/processing.py b/9_Firmware/9_3_GUI/v7/processing.py index 6b8cc66..acc8bf4 100644 --- a/9_Firmware/9_3_GUI/v7/processing.py +++ b/9_Firmware/9_3_GUI/v7/processing.py @@ -551,3 +551,222 @@ def extract_targets_from_frame( timestamp=frame.timestamp, )) return targets + + +# ============================================================================ +# PR-Q.5 — 3-PRI Chinese-Remainder Doppler unfolding (audit C-5) +# ============================================================================ + +def unfold_velocity_crt( + v_meas_per_sf: list[float], + v_unamb_per_sf: list[float], + v_res_per_sf: list[float] | None = None, + max_alias_k: int = 6, + tol_factor: float = 0.5, +) -> tuple[float, str, list[float]]: + """3-PRI Chinese-Remainder Doppler velocity unfolding. + + Each per-subframe FFT measures v_true folded into a signed + [-v_unamb_i, +v_unamb_i] interval (the standard fftshift + convention). With 3 coprime PRIs (PR-Q ladder: 175/161/167 us, + giving v_unamb ≈ 40.79/44.34/42.79 m/s), brute-force search over + alias depth k_0 ∈ [-K, K] generates candidates + ``v_true = v_meas_0 + k_0 · 2 · v_unamb_0``. A candidate is + *valid* when it folds back into all other active PRIs to within + ``tol_factor × max(v_res)``. + + Args: + v_meas_per_sf: signed velocity measurement per active sub-frame + (m/s), already folded by the FFT. Length 1, 2, or 3. + v_unamb_per_sf: per-sub-frame v_unamb (m/s), same length. + v_res_per_sf: per-sub-frame v_res (m/s). If None, assumes + ``v_res = v_unamb / 8`` (matches chirps_per_subframe = 16). + max_alias_k: alias search depth in PRI-0 fold steps. K=6 covers + ±6 · 2 · v_unamb_0 ≈ ±490 m/s, well above + ``WaveformConfig.extended_max_velocity_mps_crt(K=6) ≈ ±266``. + tol_factor: per-PRI agreement tolerance, in units of max(v_res). + 1.0 = within one bin width. + + Returns: + (v_est, confidence, alias_set): + + - v_est (m/s): best-fit unfolded velocity. Falls back to PRI-0's + measurement if no candidate satisfies all PRIs within tolerance. + - confidence: ``"CONFIRMED"`` / ``"LIKELY"`` / ``"AMBIGUOUS"``. + * CONFIRMED — 3-PRI input, exactly one fold within tolerance. + * LIKELY — 3-PRI input with 2 candidates, or 2-PRI input + with a unique solution. + * AMBIGUOUS — 1-PRI input (no CRT possible), 3+ candidates, + 2-PRI input with 2 candidates, or no candidate + within tolerance. + - alias_set (m/s): all candidate v_true within tolerance, sorted + by goodness-of-fit (best first). + """ + n_sf = len(v_meas_per_sf) + if n_sf != len(v_unamb_per_sf): + raise ValueError("v_meas_per_sf and v_unamb_per_sf must have same length") + if n_sf == 0: + return (0.0, "AMBIGUOUS", []) + + # 1-PRI input — no CRT possible (LONG-only-at-20-km regime). + if n_sf == 1: + return (v_meas_per_sf[0], "AMBIGUOUS", [v_meas_per_sf[0]]) + + if v_res_per_sf is None: + v_res_per_sf = [vu / 8.0 for vu in v_unamb_per_sf] + elif len(v_res_per_sf) != n_sf: + raise ValueError("v_res_per_sf, when provided, must match v_meas_per_sf length") + + pri0_meas = v_meas_per_sf[0] + pri0_span = 2.0 * v_unamb_per_sf[0] + + candidates: list[tuple[float, float]] = [] # (v_candidate, max_err) + for k in range(-max_alias_k, max_alias_k + 1): + v_cand = pri0_meas + k * pri0_span + max_err = 0.0 + rejected = False + for i in range(1, n_sf): + vu_i = v_unamb_per_sf[i] + span_i = 2.0 * vu_i + v_pred_i = ((v_cand + vu_i) % span_i) - vu_i + err = abs(v_pred_i - v_meas_per_sf[i]) + tol_i = tol_factor * v_res_per_sf[i] + if err > tol_i: + rejected = True + break + if err > max_err: + max_err = err + if not rejected: + candidates.append((v_cand, max_err)) + + if not candidates: + # No fold satisfies all PRIs — fall back to PRI-0, mark AMBIGUOUS. + return (pri0_meas, "AMBIGUOUS", [pri0_meas]) + + candidates.sort(key=lambda c: c[1]) + v_best = candidates[0][0] + alias_set = [v for (v, _) in candidates] + n_cands = len(alias_set) + + if n_cands >= 3: + confidence = "AMBIGUOUS" + elif n_sf == 3 and n_cands == 1: + confidence = "CONFIRMED" + elif n_sf == 3 and n_cands == 2: + confidence = "LIKELY" + elif n_sf == 2 and n_cands == 1: + confidence = "LIKELY" + else: # n_sf == 2 and n_cands == 2 + confidence = "AMBIGUOUS" + + return (v_best, confidence, alias_set) + + +def extract_targets_from_frame_crt( + frame, + waveform, + gps: GPSData | None = None, + max_alias_k: int = 6, +) -> list[RadarTarget]: + """Extract RadarTargets from a 48-bin frame using 3-PRI CRT unfolding. + + The 48 Doppler bins are organized as 3 sub-frames of 16: + + bins 0..15: SHORT PRI (``waveform.pri_short_s``) + bins 16..31: MEDIUM PRI (``waveform.pri_medium_s``) + bins 32..47: LONG PRI (``waveform.pri_long_s``) + + Within each sub-frame, the 16-pt FFT uses the standard signed-bin + convention: bin 0 = DC, bins 1..7 = positive v, bin 8 = Nyquist + (treated as +v_unamb), bins 9..15 = negative v. + + Detections at the same range bin across different sub-frames are + grouped, and the strongest bin per (rbin, sub-frame) is taken as + that PRI's primary Doppler measurement. ``unfold_velocity_crt`` + resolves aliases when ≥2 sub-frames see the target. + + Falls back to the legacy single-PRI ``extract_targets_from_frame`` + when the frame is not 48-bin (e.g. 32-bin legacy recordings). + """ + if frame.detections.ndim != 2 or frame.detections.shape[1] != 48: + return extract_targets_from_frame( + frame, + range_resolution=waveform.range_resolution_m, + velocity_resolution=waveform.velocity_resolution_long_mps, + gps=gps, + ) + + chirps_per_sf = waveform.chirps_per_subframe # 16 + v_res_per_sf_all = [ + waveform.velocity_resolution_short_mps, + waveform.velocity_resolution_medium_mps, + waveform.velocity_resolution_long_mps, + ] + v_unamb_per_sf_all = [ + waveform.max_velocity_short_mps, + waveform.max_velocity_medium_mps, + waveform.max_velocity_long_mps, + ] + + # Group detections: rbin -> {sf_id: (peak_bin_in_sf, peak_mag)} + clusters: dict[int, dict[int, tuple[int, float]]] = {} + det_indices = np.argwhere(frame.detections > 0) + for idx in det_indices: + rbin, dbin = int(idx[0]), int(idx[1]) + sf_id = dbin // chirps_per_sf + bin_in_sf = dbin % chirps_per_sf + mag = float(frame.magnitude[rbin, dbin]) + existing = clusters.setdefault(rbin, {}).get(sf_id) + if existing is None or mag > existing[1]: + clusters[rbin][sf_id] = (bin_in_sf, mag) + + targets: list[RadarTarget] = [] + range_resolution = waveform.range_resolution_m + + for rbin in sorted(clusters.keys()): + sf_map = clusters[rbin] + active_sfs = sorted(sf_map.keys()) + v_meas_list: list[float] = [] + v_unamb_list: list[float] = [] + v_res_list: list[float] = [] + peak_mag = 0.0 + for sf_id in active_sfs: + bin_in_sf, mag = sf_map[sf_id] + # Signed bin: 0..7 positive, 8 = Nyquist (treat as +8), + # 9..15 negative. Yields v in [-8·v_res, +8·v_res]. + signed_bin = bin_in_sf if bin_in_sf <= 8 else bin_in_sf - chirps_per_sf + v_meas_list.append(float(signed_bin) * v_res_per_sf_all[sf_id]) + v_unamb_list.append(v_unamb_per_sf_all[sf_id]) + v_res_list.append(v_res_per_sf_all[sf_id]) + if mag > peak_mag: + peak_mag = mag + + v_est, confidence, alias_set = unfold_velocity_crt( + v_meas_list, v_unamb_list, v_res_list, max_alias_k=max_alias_k, + ) + + range_m = float(rbin) * range_resolution + snr = 10.0 * math.log10(max(peak_mag, 1.0)) if peak_mag > 0 else 0.0 + + lat, lon, azimuth, elevation = 0.0, 0.0, 0.0, 0.0 + if gps is not None: + azimuth = gps.heading + lat, lon = polar_to_geographic( + gps.latitude, gps.longitude, range_m, azimuth, + ) + + targets.append(RadarTarget( + id=len(targets), + range=range_m, + velocity=v_est, + azimuth=azimuth, + elevation=elevation, + latitude=lat, + longitude=lon, + snr=snr, + timestamp=frame.timestamp, + velocity_confidence=confidence, + alias_set=alias_set if alias_set else None, + )) + + return targets