mirror of
https://github.com/NawfalMotii79/PLFM_RADAR.git
synced 2026-06-09 23:17:33 +00:00
feat(rtl,gui): PR-U / M-8 — sub-frame enable mask routed end-to-end (C-5 hardening)
The chirp_scheduler had a 3-bit host_subframe_enable input {LONG, MEDIUM, SHORT}
that was tied to the constant RP_DEF_SUBFRAME_ENABLE at the receiver instance,
so the host could neither change it nor know what mask was active. With the
mask not at 3'b111 the scheduler skips a sub-frame at TX but doppler_processor
still writes 48 chirp slots, so the host CRT (`dbin // 16 → {SHORT, MED, LONG}`)
silently mis-attributes the SF axis and unfolds to the wrong velocity.
Plumb the mask through:
- radar_system_top.v: new reg [2:0] host_subframe_enable, cold-reset
RP_DEF_SUBFRAME_ENABLE, opcode 0x19 setter, wired to rx_inst and usb_inst.
- radar_receiver_final.v: new host_subframe_enable[2:0] input port; the
chirp_scheduler instance is untied from the constant.
- usb_data_interface_ft2232h.v: new subframe_enable[2:0] input + per-frame
snapshot reg latched at frame_complete (stable for ft_clk read, same
pattern as stream_flags_snapshot). Byte 2 emission is now
{2'b00, subframe_enable[2:0], stream_flags[2:0]} — was {5'b00000, stream}.
- radar_protocol.py: Opcode.SUBFRAME_ENABLE = 0x19; RadarFrame.subframe_enable
field; parse_bulk_frame surfaces bits[5:3]; reserved-mask 0xF8 → 0xC0.
Bulk-frame mock encodes the mask in its emit so dashboard replay is correct.
- v7/processing.py: extract_targets_from_frame_crt forces every target to
AMBIGUOUS when frame.subframe_enable != 0b111. Operator sees the red `?`
flag in the targets table instead of a silently-wrong velocity.
- v7/software_fpga.py + v7/dashboard.py: subframe_enable mirror + setter, and
replay dispatch routes 0x19 to set_subframe_enable.
Tests (test_v7.py): TestSubframeEnableRoundTrip (4), TestSoftwareFpgaSubframeEnable
(2), TestCrtSubframeMaskGating (3), 0x19 added to TestOpcodeEnumFillIn and
TestReplayOpcodeDispatch. Existing test_full_frame_round_trip updated to expect
byte 2 = 0x3F (mask 0b111 default + stream 0x07).
Cosim TBs (tb/tb_usb_protocol_v2.v, tb/tb_ft2232h_frame_drop.v) drive the new
input with 3'b111 and assert the new byte-2 layout (T2.3: 0x00 → 0x38).
Regression: test_v7 146/146, test_GUI_V65_Tk 117/117, ruff clean.
iverilog: tb_usb_protocol_v2 27/27 PASS, tb_ft2232h_frame_drop 10/10 PASS.
This commit is contained in:
@@ -370,16 +370,26 @@ class TestBulkFrameV2RoundTrip(unittest.TestCase):
|
||||
def _build_v2_frame(self, flags: int, frame_num: int = 0,
|
||||
doppler: np.ndarray | None = None,
|
||||
cfar_codes: np.ndarray | None = None,
|
||||
range_profile: np.ndarray | None = None) -> bytes:
|
||||
"""Construct a v2 frame the way usb_data_interface_ft2232h.v emits."""
|
||||
range_profile: np.ndarray | None = None,
|
||||
subframe_enable: int = 0b111) -> bytes:
|
||||
"""Construct a v2 frame the way usb_data_interface_ft2232h.v emits.
|
||||
|
||||
``subframe_enable`` lands in byte 2 bits[5:3] (PR-U / M-8). Caller
|
||||
passes raw stream bits in ``flags`` (low 3 bits); helper composes the
|
||||
full byte 2 = {2'b00, subframe_enable[2:0], stream[2:0]}.
|
||||
"""
|
||||
from radar_protocol import (
|
||||
HEADER_BYTE, FOOTER_BYTE, RP_USB_PROTOCOL_VERSION,
|
||||
NUM_RANGE_BINS, NUM_DOPPLER_BINS,
|
||||
BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR,
|
||||
BULK_DETECT_BYTES_PER_RANGE,
|
||||
BULK_SUBFRAME_ENABLE_SHIFT,
|
||||
)
|
||||
flags_byte = (((subframe_enable & 0x07) << BULK_SUBFRAME_ENABLE_SHIFT)
|
||||
| (flags & 0x07)
|
||||
| (flags & 0xC0)) # preserve reserved bits if caller injects them
|
||||
parts = [
|
||||
bytes([HEADER_BYTE, RP_USB_PROTOCOL_VERSION, flags & 0xFF]),
|
||||
bytes([HEADER_BYTE, RP_USB_PROTOCOL_VERSION, flags_byte & 0xFF]),
|
||||
struct.pack(">H", frame_num),
|
||||
struct.pack(">H", NUM_RANGE_BINS),
|
||||
struct.pack(">H", NUM_DOPPLER_BINS),
|
||||
@@ -431,7 +441,11 @@ class TestBulkFrameV2RoundTrip(unittest.TestCase):
|
||||
parsed = RadarProtocol.parse_bulk_frame(frame)
|
||||
self.assertIsNotNone(parsed)
|
||||
self.assertEqual(parsed["frame_number"], 42)
|
||||
self.assertEqual(parsed["flags"], flags)
|
||||
# PR-U / M-8: byte 2 now packs subframe_enable into bits[5:3]; helper
|
||||
# defaults to 0b111 (production 3-PRI ladder) so the wire flags byte
|
||||
# is (0b111 << 3) | 0x07 = 0x3F.
|
||||
self.assertEqual(parsed["flags"], flags | (0b111 << 3))
|
||||
self.assertEqual(parsed["subframe_enable"], 0b111)
|
||||
self.assertEqual(parsed["n_range"], NUM_RANGE_BINS)
|
||||
self.assertEqual(parsed["n_doppler"], NUM_DOPPLER_BINS)
|
||||
np.testing.assert_array_equal(parsed["range_profile"], rp)
|
||||
@@ -484,6 +498,56 @@ class TestBulkFrameV2RoundTrip(unittest.TestCase):
|
||||
self.assertEqual(boundaries[1], (len(f1), len(f1) + len(f2), "data"))
|
||||
|
||||
|
||||
class TestSubframeEnableRoundTrip(TestBulkFrameV2RoundTrip):
|
||||
"""PR-U / M-8: byte 2 bits[5:3] carry the per-frame sub-frame mask."""
|
||||
|
||||
def test_default_mask_round_trip(self):
|
||||
"""Production default 0b111 round-trips and is the helper default."""
|
||||
from radar_protocol import (
|
||||
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
|
||||
)
|
||||
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=1)
|
||||
parsed = RadarProtocol.parse_bulk_frame(frame)
|
||||
self.assertIsNotNone(parsed)
|
||||
self.assertEqual(parsed["subframe_enable"], 0b111)
|
||||
|
||||
def test_short_disabled_mask(self):
|
||||
"""subframe_enable = 0b110 (LONG|MEDIUM, no SHORT) survives the wire."""
|
||||
from radar_protocol import (
|
||||
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
|
||||
)
|
||||
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=1,
|
||||
subframe_enable=0b110)
|
||||
parsed = RadarProtocol.parse_bulk_frame(frame)
|
||||
self.assertIsNotNone(parsed)
|
||||
self.assertEqual(parsed["subframe_enable"], 0b110)
|
||||
|
||||
def test_short_only_mask(self):
|
||||
"""subframe_enable = 0b001 (SHORT only) survives the wire."""
|
||||
from radar_protocol import (
|
||||
RadarProtocol, BULK_FLAG_STREAM_DOPPLER,
|
||||
)
|
||||
frame = self._build_v2_frame(BULK_FLAG_STREAM_DOPPLER, frame_num=2,
|
||||
subframe_enable=0b001)
|
||||
parsed = RadarProtocol.parse_bulk_frame(frame)
|
||||
self.assertIsNotNone(parsed)
|
||||
self.assertEqual(parsed["subframe_enable"], 0b001)
|
||||
|
||||
def test_subframe_bits_no_longer_in_reserved_mask(self):
|
||||
"""Bits[5:3] are now valid SF mask, not reserved — must NOT reject."""
|
||||
from radar_protocol import (
|
||||
RadarProtocol, BULK_FLAGS_RESERVED_MASK,
|
||||
BULK_SUBFRAME_ENABLE_MASK,
|
||||
)
|
||||
# The new reserved mask must not overlap the SF-enable bit field.
|
||||
self.assertEqual(BULK_FLAGS_RESERVED_MASK & BULK_SUBFRAME_ENABLE_MASK, 0)
|
||||
# And bit 6 (top of new reserved mask) STILL rejects.
|
||||
from radar_protocol import BULK_FLAG_STREAM_RANGE
|
||||
frame = self._build_v2_frame(BULK_FLAG_STREAM_RANGE | 0x40)
|
||||
bad = bytes([frame[0], frame[1], frame[2] | 0x40]) + frame[3:]
|
||||
self.assertIsNone(RadarProtocol.parse_bulk_frame(bad))
|
||||
|
||||
|
||||
class TestStatusPacketV2RoundTrip(unittest.TestCase):
|
||||
"""PR-G v2 status packet: 7 status_words / 30 bytes."""
|
||||
|
||||
@@ -1492,6 +1556,52 @@ class TestExtractTargetsFromFrameCrt(unittest.TestCase):
|
||||
self.assertGreater(targets[0].longitude, 12.5)
|
||||
|
||||
|
||||
class TestCrtSubframeMaskGating(unittest.TestCase):
|
||||
"""PR-U / M-8: CRT downgrades confidence to AMBIGUOUS when SF mask != 0b111."""
|
||||
|
||||
def _make_3pri_frame(self, subframe_enable: int):
|
||||
from radar_protocol import RadarFrame
|
||||
frame = RadarFrame()
|
||||
# Detection at rbin=10 in all 3 sub-frames at bin 3 — would normally
|
||||
# CONFIRM, but a non-default mask must force AMBIGUOUS.
|
||||
for rbin, dbin, mag in [(10, 3, 1000.0), (10, 19, 800.0), (10, 35, 1200.0)]:
|
||||
frame.detections[rbin, dbin] = 1
|
||||
frame.magnitude[rbin, dbin] = mag
|
||||
frame.detection_count = int(frame.detections.sum())
|
||||
frame.timestamp = 1.0
|
||||
frame.subframe_enable = subframe_enable
|
||||
return frame
|
||||
|
||||
def test_default_mask_keeps_confirmed_path(self):
|
||||
from v7.processing import extract_targets_from_frame_crt
|
||||
from v7.models import WaveformConfig
|
||||
wc = WaveformConfig()
|
||||
frame = self._make_3pri_frame(0b111)
|
||||
targets = extract_targets_from_frame_crt(frame, wc)
|
||||
self.assertEqual(len(targets), 1)
|
||||
self.assertEqual(targets[0].velocity_confidence, "CONFIRMED")
|
||||
|
||||
def test_short_disabled_forces_ambiguous(self):
|
||||
"""SHORT off → CRT can't trust dbin // 16 attribution → AMBIGUOUS."""
|
||||
from v7.processing import extract_targets_from_frame_crt
|
||||
from v7.models import WaveformConfig
|
||||
wc = WaveformConfig()
|
||||
frame = self._make_3pri_frame(0b110)
|
||||
targets = extract_targets_from_frame_crt(frame, wc)
|
||||
self.assertEqual(len(targets), 1)
|
||||
self.assertEqual(targets[0].velocity_confidence, "AMBIGUOUS")
|
||||
|
||||
def test_long_only_forces_ambiguous(self):
|
||||
"""LONG only mask: scheduler skips SHORT+MEDIUM, all targets AMBIGUOUS."""
|
||||
from v7.processing import extract_targets_from_frame_crt
|
||||
from v7.models import WaveformConfig
|
||||
wc = WaveformConfig()
|
||||
frame = self._make_3pri_frame(0b100)
|
||||
targets = extract_targets_from_frame_crt(frame, wc)
|
||||
self.assertEqual(len(targets), 1)
|
||||
self.assertEqual(targets[0].velocity_confidence, "AMBIGUOUS")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Test: PR-Q.6 — workers route through extract_targets_from_frame_crt
|
||||
# RadarDataWorker._run_host_dsp + ReplayWorker._extract_targets must use the
|
||||
@@ -1651,6 +1761,11 @@ class TestOpcodeEnumFillIn(unittest.TestCase):
|
||||
from radar_protocol import Opcode
|
||||
self.assertEqual(Opcode.ADC_FORMAT.value, 0x33)
|
||||
|
||||
def test_subframe_enable_opcode(self):
|
||||
"""PR-U / M-8: 0x19 sets host_subframe_enable mask."""
|
||||
from radar_protocol import Opcode
|
||||
self.assertEqual(Opcode.SUBFRAME_ENABLE.value, 0x19)
|
||||
|
||||
def test_no_duplicate_opcodes(self):
|
||||
"""All Opcode values are unique (catches accidental collisions)."""
|
||||
from radar_protocol import Opcode
|
||||
@@ -1674,6 +1789,21 @@ class TestSoftwareFpgaCfarAlphaSoft(unittest.TestCase):
|
||||
self.assertEqual(fpga.cfar_alpha_soft, 0x34)
|
||||
|
||||
|
||||
class TestSoftwareFpgaSubframeEnable(unittest.TestCase):
|
||||
"""PR-U / M-8: SoftwareFPGA mirrors host_subframe_enable, masks to 3 bits."""
|
||||
|
||||
def test_default(self):
|
||||
from v7.software_fpga import SoftwareFPGA
|
||||
fpga = SoftwareFPGA()
|
||||
self.assertEqual(fpga.subframe_enable, 0b111) # RP_DEF_SUBFRAME_ENABLE
|
||||
|
||||
def test_setter_masks_to_3_bits(self):
|
||||
from v7.software_fpga import SoftwareFPGA
|
||||
fpga = SoftwareFPGA()
|
||||
fpga.set_subframe_enable(0xFE)
|
||||
self.assertEqual(fpga.subframe_enable, 0b110)
|
||||
|
||||
|
||||
@unittest.skipUnless(_pyqt6_available(), "PyQt6 not installed")
|
||||
class TestReplayOpcodeDispatch(unittest.TestCase):
|
||||
"""M-6: replay dispatch routes 0x2D to SoftwareFPGA + acknowledges inert opcodes."""
|
||||
@@ -1695,6 +1825,12 @@ class TestReplayOpcodeDispatch(unittest.TestCase):
|
||||
dispatch(fake, 0x2D, 42)
|
||||
self.assertEqual(fake._software_fpga.cfar_alpha_soft, 42)
|
||||
|
||||
def test_0x19_routed_to_set_subframe_enable(self):
|
||||
"""PR-U / M-8: 0x19 lands on SoftwareFPGA.set_subframe_enable."""
|
||||
dispatch, fake = self._dashboard_with_replay()
|
||||
dispatch(fake, 0x19, 0b101)
|
||||
self.assertEqual(fake._software_fpga.subframe_enable, 0b101)
|
||||
|
||||
def test_inert_opcode_does_not_raise(self):
|
||||
"""Inert opcodes (e.g. 0x32 ADC_PWDN) accepted without exception."""
|
||||
dispatch, fake = self._dashboard_with_replay()
|
||||
|
||||
Reference in New Issue
Block a user