diff --git a/9_Firmware/9_3_GUI/radar_protocol.py b/9_Firmware/9_3_GUI/radar_protocol.py index 9976f13..09f4443 100644 --- a/9_Firmware/9_3_GUI/radar_protocol.py +++ b/9_Firmware/9_3_GUI/radar_protocol.py @@ -11,27 +11,33 @@ USB transports + wire formats (these intentionally diverge): Bulk per-frame format from `usb_data_interface_ft2232h.v`. One header + variable-length sections + footer per Doppler frame. The bulk format exists because USB 2.0's ~8 MB/s sustained ceiling cannot carry the - production frame rate (~178 fps x 35849 B = 6.4 MB/s) at per-sample - granularity. Wire layout: - [0xAA][flags 1B][frame_num 2B][n_range 2B][n_doppler 2B] - [range_profile 1024 B if flags.stream_range] - [doppler_mag 32768 B if flags.stream_doppler] # mag_only=1 only - [cfar_dense 2048 B if flags.stream_cfar] # sparse_det=0 only - [0x55] - Production FPGA today only emits mag_only=1 + dense-bitmap CFAR; the - flag bits for full-I/Q (mag_only=0) and sparse-detection-list - (sparse_det=1) are reserved for a future RTL extension and currently - force-clamped to 1 and 0 respectively in `radar_system_top.v` opcode - 0x04 handler when USB_MODE=1. + production frame rate at per-sample granularity. Wire layout (PR-G v2): + [0xAA] # header byte 0 + [version 1B = 0x02] # byte 1; host rejects mismatch + [flags 1B = {5'd0, cfar, doppler, range}] # byte 2; only low 3 bits used + [frame_num 2B] # bytes 3-4 (BE u16) + [n_range 2B = 512] # bytes 5-6 (BE u16) + [n_doppler 2B = 48] # bytes 7-8 (BE u16) + [range_profile 1024 B if flags.stream_range] # 512 BE u16 + [doppler_mag 49152 B if flags.stream_doppler] # 512x48 BE u16 + [cfar_dense 6144 B if flags.stream_cfar] # 2 bits/cell, MSB-first + [0x55] # footer + Detect codes (PR-F 2-tier): 0=NONE, 1=CAND (soft alpha), 2=CONFIRM + (hard alpha), 3=reserved. Detect packing: 4 cells per byte, MSB-first + within byte (cell0 in [7:6], cell1 in [5:4], cell2 in [3:2], cell3 in [1:0]). + Detect bytes per range row = ceil(n_doppler*2/8) = 12; total = 512*12. FT601 USB 3.0 (200T premium board, USB_MODE=0) Per-sample 11-byte legacy format from `usb_data_interface.v`. USB 3.0 has ~50x the bandwidth headroom (~360 MB/s practical), so the lighter per-sample format is fine and offers easier resync after byte drops. - Wire layout (per sample, 16384 samples per frame): + Wire layout (per sample): [0xAA][range_q 2B][range_i 2B][dop_re 2B][dop_im 2B][det 1B][0x55] where det byte = {frame_start, 6'b0, cfar_detection}. - Status (both transports): [0xBB][6x32b status words][0x55] = 26 B. + + Status (both transports, PR-G v2) + [0xBB][7 x 32-bit status_words][0x55] = 30 B. status_words[6] carries + 2-tier-CFAR telemetry: {detect_count_cand[31:16], detect_threshold_soft[15:0]}. RX (Host → FPGA, both transports) 4 bytes per command: {opcode[7:0], addr[7:0], value[15:8], value[7:0]}. @@ -64,8 +70,8 @@ FOOTER_BYTE = 0x55 STATUS_HEADER_BYTE = 0xBB # Packet sizes -DATA_PACKET_SIZE = 11 # 1 + 4 + 2 + 2 + 1 + 1 -STATUS_PACKET_SIZE = 26 # 1 + 24 + 1 +DATA_PACKET_SIZE = 11 # 1 + 4 + 2 + 2 + 1 + 1 (FT601 legacy) +STATUS_PACKET_SIZE = 30 # 1 + 28 + 1 (PR-G v2: 7 status_words) NUM_RANGE_BINS = 512 NUM_DOPPLER_BINS = 48 # PR-F/PR-Q: 3 sub-frames * 16 (= FPGA RP_NUM_DOPPLER_BINS) @@ -73,40 +79,49 @@ NUM_CELLS = NUM_RANGE_BINS * NUM_DOPPLER_BINS # 24576 WATERFALL_DEPTH = 64 -# AUDIT-C9: FT2232H bulk-frame wire format constants. Mirrors +# Bulk-frame protocol version (RP_USB_PROTOCOL_VERSION in radar_params.vh). +# Host rejects frames that don't carry this byte at offset 1. +RP_USB_PROTOCOL_VERSION = 0x02 + +# PR-G FT2232H bulk-frame wire format constants. Mirrors # usb_data_interface_ft2232h.v; if the RTL header changes, update both sides. -BULK_FRAME_HEADER_SIZE = 8 # AA + flags + fnum2 + nr2 + nd2 -BULK_RANGE_SECTION_BYTES = NUM_RANGE_BINS * 2 # 512 x 2 = 1024 -BULK_DOPPLER_MAG_BYTES = NUM_CELLS * 2 # 16384 x 2 = 32768 -BULK_DETECT_DENSE_BYTES = NUM_CELLS // 8 # 16384 / 8 = 2048 -BULK_FOOTER_SIZE = 1 -BULK_FRAME_MIN_SIZE = BULK_FRAME_HEADER_SIZE + BULK_FOOTER_SIZE # 9 -BULK_FRAME_MAX_SIZE = (BULK_FRAME_HEADER_SIZE + BULK_RANGE_SECTION_BYTES - + BULK_DOPPLER_MAG_BYTES + BULK_DETECT_DENSE_BYTES - + BULK_FOOTER_SIZE) # 35849 +BULK_FRAME_HEADER_SIZE = 9 # AA + ver + flags + fnum2 + nr2 + nd2 +BULK_RANGE_SECTION_BYTES = NUM_RANGE_BINS * 2 # 512 x 2 = 1024 +BULK_DOPPLER_MAG_BYTES = NUM_CELLS * 2 # 24576 x 2 = 49152 +# PR-F 2-tier detect: 2 bits/cell, packed MSB-first 4 cells per byte. +# Bytes per range row = ceil(n_doppler * 2 / 8); total = 512 * 12 = 6144. +BULK_DETECT_BITS_PER_CELL = 2 +BULK_DETECT_BYTES_PER_RANGE = (NUM_DOPPLER_BINS * BULK_DETECT_BITS_PER_CELL + 7) // 8 +BULK_DETECT_DENSE_BYTES = NUM_RANGE_BINS * BULK_DETECT_BYTES_PER_RANGE # 6144 +BULK_FOOTER_SIZE = 1 +BULK_FRAME_MIN_SIZE = BULK_FRAME_HEADER_SIZE + BULK_FOOTER_SIZE # 10 +BULK_FRAME_MAX_SIZE = (BULK_FRAME_HEADER_SIZE + BULK_RANGE_SECTION_BYTES + + BULK_DOPPLER_MAG_BYTES + BULK_DETECT_DENSE_BYTES + + BULK_FOOTER_SIZE) # 56330 # Bulk-frame format flag bits (matches stream_ctrl_sync_1 layout in RTL). +# Only the low 3 bits are used on the wire; bits [7:3] are reserved-zero. BULK_FLAG_STREAM_RANGE = 0x01 BULK_FLAG_STREAM_DOPPLER = 0x02 BULK_FLAG_STREAM_CFAR = 0x04 -BULK_FLAG_MAG_ONLY = 0x08 # Forced 1 by RTL; full-I/Q write FSM not implemented. -BULK_FLAG_SPARSE_DET = 0x10 # Forced 0 by RTL; sparse-list write FSM not implemented. +BULK_FLAGS_RESERVED_MASK = 0xF8 # any bit in this mask set → reject frame class Opcode(IntEnum): """Host register opcodes — must match radar_system_top.v case(usb_cmd_opcode). - FPGA truth table (from radar_system_top.v lines 902-944): - 0x01 host_radar_mode 0x14 host_short_listen_cycles - 0x02 host_trigger_pulse 0x15 host_chirps_per_elev - 0x03 host_detect_threshold 0x16 host_gain_shift - 0x04 host_stream_control 0x20 host_range_mode - 0x10 host_long_chirp_cycles 0x21-0x27 CFAR / MTI / DC-notch + FPGA truth table (from radar_system_top.v opcode dispatch case-block): + 0x01 host_radar_mode 0x16 host_gain_shift + 0x02 host_trigger_pulse 0x17 host_medium_chirp_cycles (M-2 — no enum yet) + 0x03 host_detect_threshold 0x18 host_medium_listen_cycles (M-2 — no enum yet) + 0x04 host_stream_control 0x20 host_range_mode + 0x10 host_long_chirp_cycles 0x21-0x27 CFAR / MTI / DC-notch 0x11 host_long_listen_cycles 0x28-0x2C AGC control - 0x12 host_guard_cycles 0x30 host_self_test_trigger - 0x13 host_short_chirp_cycles 0x31/0xFF host_status_request - 0x33 host_adc_format (AD9484 SCLK/DFS strap; AUDIT-C3) - (0x32 reserved for the future S-25 adc_pwdn host-control fix) + 0x12 host_guard_cycles 0x2D host_cfar_alpha_soft (M-2 — no enum yet) + 0x13 host_short_chirp_cycles 0x30 host_self_test_trigger + 0x14 host_short_listen_cycles 0x31/0xFF host_status_request + 0x15 host_chirps_per_elev 0x32 host_adc_pwdn (M-3 — no enum yet) + 0x33 host_adc_format (AD9484 SCLK/DFS strap; AUDIT-C3) """ # --- Basic control (0x01-0x04) --- RADAR_MODE = 0x01 # 2-bit mode select @@ -163,7 +178,7 @@ class Opcode(IntEnum): @dataclass class RadarFrame: - """One complete radar frame (64 range x 32 Doppler).""" + """One complete radar frame (NUM_RANGE_BINS=512 range x NUM_DOPPLER_BINS=48 Doppler).""" timestamp: float = 0.0 range_doppler_i: np.ndarray = field( default_factory=lambda: np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.int16)) @@ -185,7 +200,7 @@ class RadarFrame: @dataclass class StatusResponse: - """Parsed status response from FPGA (6-word / 26-byte packet).""" + """Parsed status response from FPGA (PR-G v2: 7-word / 30-byte packet).""" radar_mode: int = 0 stream_ctrl: int = 0 cfar_threshold: int = 0 @@ -206,6 +221,11 @@ class StatusResponse: agc_saturation_count: int = 0 # 8-bit saturation count [7:0] agc_enable: int = 0 # 1-bit AGC enable readback chirps_mismatch: int = 0 # TX-G: 1 if FPGA clamped/rejected host chirps_per_elev + # PR-G 2-tier CFAR telemetry (word 6) + detect_count_cand: int = 0 # 16-bit count of CAND-tier detections per frame + detect_threshold_soft: int = 0 # 16-bit soft-CFAR threshold readback (saturates 0xFFFF) + # AUDIT-S10 control-fault flags (word 5 high half) + frame_drop_count: int = 0 # frame-drop counter from RTL # ============================================================================ @@ -277,19 +297,22 @@ class RadarProtocol: def parse_status_packet(raw: bytes) -> StatusResponse | None: """ Parse a status response packet. - Format: [0xBB] [6x4B status words] [0x55] = 1 + 24 + 1 = 26 bytes + + PR-G v2 format: [0xBB] [7 x 4B status_words] [0x55] = 1 + 28 + 1 = 30 bytes. + Audit P-3: pre-PR-G GUI used 26 (six words); FPGA `STATUS_PKT_LEN=30` since + PR-G added word[6] for 2-tier-CFAR telemetry. """ - if len(raw) < 26: + if len(raw) < STATUS_PACKET_SIZE: return None if raw[0] != STATUS_HEADER_BYTE: return None words = [] - for i in range(6): + for i in range(7): w = struct.unpack_from(">I", raw, 1 + i * 4)[0] words.append(w) - if raw[25] != FOOTER_BYTE: + if raw[STATUS_PACKET_SIZE - 1] != FOOTER_BYTE: return None sr = StatusResponse() @@ -313,11 +336,17 @@ class RadarProtocol: sr.agc_saturation_count = (words[4] >> 12) & 0xFF sr.agc_peak_magnitude = (words[4] >> 20) & 0xFF sr.agc_current_gain = (words[4] >> 28) & 0x0F - # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0], - # 3'd0, self_test_flags[4:0]} + # Word 5: {frame_drop_count[31:25], self_test_busy[24], 8'd0, + # self_test_detail[15:8], 3'd0, self_test_flags[4:0]} sr.self_test_flags = words[5] & 0x1F sr.self_test_detail = (words[5] >> 8) & 0xFF sr.self_test_busy = (words[5] >> 24) & 0x01 + sr.frame_drop_count = (words[5] >> 25) & 0x7F + # Word 6 (PR-G 2-tier CFAR telemetry). Layout: high half is + # detect_count_cand (16 bits); low half is detect_threshold_soft + # (saturated to 0xFFFF when the 17-bit RTL value exceeds 16-bit range). + sr.detect_threshold_soft = words[6] & 0xFFFF + sr.detect_count_cand = (words[6] >> 16) & 0xFFFF return sr @staticmethod @@ -368,16 +397,13 @@ class RadarProtocol: # ---------------------------------------------------------------- @staticmethod def _bulk_frame_size_from_flags(flags: int) -> int: - """Compute the on-wire size of a bulk frame from its flags byte. + """Compute the on-wire size of a PR-G v2 bulk frame from its flags byte. - Tracks the FPGA write FSM in usb_data_interface_ft2232h.v: header - (8 B) + per-stream payload + footer (1 B). The mag_only and - sparse_det bits are documented in the wire format but the FPGA - write FSM does not implement the alternate encodings yet — it - always emits 32768 B mag and 2048 B dense bitmap. The host-side - register handler in radar_system_top.v force-clamps these flags - when USB_MODE=1, so any frame the parser sees in production will - have mag_only=1 and sparse_det=0. + Tracks the FPGA write FSM in usb_data_interface_ft2232h.v: 9-byte header + (AA + ver + flags + frame_num + n_range + n_doppler) + per-stream + payload + 1-byte footer. PR-G fixed the doppler section at 49152 B + (mag-only) and the detect section at 6144 B (2 bits/cell, MSB-first). + Earlier mag_only/sparse_det flag-driven variants were dropped. """ size = BULK_FRAME_HEADER_SIZE if flags & BULK_FLAG_STREAM_RANGE: @@ -391,37 +417,40 @@ class RadarProtocol: @staticmethod def parse_bulk_frame(raw: bytes, offset: int = 0) -> dict[str, Any] | None: - """Parse one FT2232H bulk frame starting at `offset`. + """Parse one PR-G v2 FT2232H bulk frame starting at `offset`. + + Wire layout (PR-G v2): + [0xAA][version=0x02][flags 1B][frame_num 2B][n_range 2B][n_doppler 2B] + [range_profile? 1024 B][doppler_mag? 49152 B][cfar_dense? 6144 B][0x55] Returns a dict with keys: frame_number, flags, n_range, n_doppler, - range_profile (np.ndarray | None), doppler_mag (np.ndarray | None, - shape n_rangexn_doppler), cfar_dense (np.ndarray | None, shape - n_rangexn_doppler, uint8 0/1), and frame_size (total bytes consumed - including header + sections + footer). Returns None on any - structural error (bad header/footer, wrong bin counts, reserved - bits set, unimplemented flag combo). + range_profile (np.ndarray | None, uint16, length n_range), + doppler_mag (np.ndarray | None, uint16, shape n_range x n_doppler), + cfar_dense (np.ndarray | None, uint8, shape n_range x n_doppler; + values 0=NONE, 1=CAND, 2=CONFIRM, 3=reserved per PR-F 2-tier CFAR), + and frame_size (total bytes consumed). Returns None on any structural + error (bad header/footer, wrong version, wrong bin counts, reserved + flag bits set). """ n = len(raw) if n - offset < BULK_FRAME_MIN_SIZE: return None if raw[offset] != HEADER_BYTE: return None - - flags = raw[offset + 1] - # Reserved high bits must be zero (FPGA emits {2'b00, 6-bit flags}). - if flags & 0xC0: - return None - # Production FPGA only emits mag_only=1 + dense bitmap. Any other - # encoding is either a corrupt frame or a future RTL revision the - # parser hasn't been updated for; reject so the caller can resync. - if (flags & BULK_FLAG_STREAM_DOPPLER) and not (flags & BULK_FLAG_MAG_ONLY): - return None - if (flags & BULK_FLAG_STREAM_CFAR) and (flags & BULK_FLAG_SPARSE_DET): + # PR-G v2: byte 1 is the protocol version. Reject mismatch so we + # don't silently mis-parse a future revision. + if raw[offset + 1] != RP_USB_PROTOCOL_VERSION: return None - frame_number = (raw[offset + 2] << 8) | raw[offset + 3] - n_range = (raw[offset + 4] << 8) | raw[offset + 5] - n_doppler = (raw[offset + 6] << 8) | raw[offset + 7] + flags = raw[offset + 2] + # Only the low 3 bits are defined (range/doppler/cfar). Any reserved + # bit set means a future revision or corruption — reject and resync. + if flags & BULK_FLAGS_RESERVED_MASK: + return None + + frame_number = (raw[offset + 3] << 8) | raw[offset + 4] + n_range = (raw[offset + 5] << 8) | raw[offset + 6] + n_doppler = (raw[offset + 7] << 8) | raw[offset + 8] if n_range != NUM_RANGE_BINS or n_doppler != NUM_DOPPLER_BINS: return None @@ -449,12 +478,9 @@ class RadarProtocol: cursor += BULK_DOPPLER_MAG_BYTES if flags & BULK_FLAG_STREAM_CFAR: - packed = np.frombuffer( - raw, dtype=np.uint8, count=BULK_DETECT_DENSE_BYTES, offset=cursor, + cfar_dense = RadarProtocol._unpack_detect_2bit( + raw, cursor, n_range, n_doppler, ) - # Each byte holds 8 cells, MSB-first bit order (matches FPGA - # WR_DETECT_DATA emission). np.unpackbits keeps that order. - cfar_dense = np.unpackbits(packed).reshape(n_range, n_doppler) cursor += BULK_DETECT_DENSE_BYTES return { @@ -468,6 +494,29 @@ class RadarProtocol: "frame_size": size, } + @staticmethod + def _unpack_detect_2bit(raw: bytes, cursor: int, + n_range: int, n_doppler: int) -> np.ndarray: + """Unpack PR-F 2-bit dense CFAR detect codes into an (n_range, n_doppler) uint8. + + FPGA emits 4 cells per byte, MSB-first within byte: + byte = {cell0[1:0], cell1[1:0], cell2[1:0], cell3[1:0]} + Returned values are 0..3 (0=NONE, 1=CAND, 2=CONFIRM, 3=reserved). + Any tail bits in the last byte of each range row past n_doppler cells + are discarded (FPGA pads them with 0). + """ + bytes_per_range = BULK_DETECT_BYTES_PER_RANGE + total = n_range * bytes_per_range + packed = np.frombuffer(raw, dtype=np.uint8, count=total, offset=cursor) + packed = packed.reshape(n_range, bytes_per_range) + # Expand each byte to 4 codes via bit shifts; collect MSB-first. + codes = np.empty((n_range, bytes_per_range * 4), dtype=np.uint8) + codes[:, 0::4] = (packed >> 6) & 0x03 + codes[:, 1::4] = (packed >> 4) & 0x03 + codes[:, 2::4] = (packed >> 2) & 0x03 + codes[:, 3::4] = packed & 0x03 + return codes[:, :n_doppler].copy() + @staticmethod def find_bulk_frame_boundaries(buf: bytes) -> list[tuple[int, int, str]]: """Scan a byte stream for FT2232H bulk frames and status packets. @@ -488,13 +537,16 @@ class RadarProtocol: while i < n: b = buf[i] if b == HEADER_BYTE: - # Need at least the 8-byte header to compute the frame size. + # Need the full 9-byte v2 header to compute the frame size. if n - i < BULK_FRAME_HEADER_SIZE: break # partial header — caller keeps as residual - flags = buf[i + 1] - # Quick reject before the more expensive size compute. The - # full validation lives in parse_bulk_frame. - if flags & 0xC0: + # PR-G v2: byte 1 must be the protocol version. Quick reject. + if buf[i + 1] != RP_USB_PROTOCOL_VERSION: + i += 1 + continue + flags = buf[i + 2] + # Reserved high bits must be zero (only 3 stream-enable bits). + if flags & BULK_FLAGS_RESERVED_MASK: i += 1 continue size = RadarProtocol._bulk_frame_size_from_flags(flags) @@ -502,8 +554,8 @@ class RadarProtocol: break # partial frame — keep as residual # Validate footer + bin counts before accepting the boundary. if (buf[i + size - 1] == FOOTER_BYTE - and ((buf[i + 4] << 8) | buf[i + 5]) == NUM_RANGE_BINS - and ((buf[i + 6] << 8) | buf[i + 7]) == NUM_DOPPLER_BINS): + and ((buf[i + 5] << 8) | buf[i + 6]) == NUM_RANGE_BINS + and ((buf[i + 7] << 8) | buf[i + 8]) == NUM_DOPPLER_BINS): out.append((i, i + size, "data")) i += size else: @@ -636,7 +688,7 @@ class FT2232HConnection: time.sleep(0.05) self._mock_frame_num += 1 flags = (BULK_FLAG_STREAM_RANGE | BULK_FLAG_STREAM_DOPPLER - | BULK_FLAG_STREAM_CFAR | BULK_FLAG_MAG_ONLY) + | BULK_FLAG_STREAM_CFAR) # Synthesize per-cell magnitudes once (vectorised). rbins = np.arange(NUM_RANGE_BINS).reshape(-1, 1) @@ -652,25 +704,35 @@ class FT2232HConnection: 0, 65535, ).astype(np.uint16) - det = (target_mask & (np.abs(dbins - 8) < 2) & (np.abs(rbins - 20) < 2)).astype(np.uint8) - det_packed = np.packbits(det.flatten()) # MSB-first bit order matches FPGA + # PR-F 2-tier dense detect: emit CONFIRM (code=2) at the target spot. + det_codes = ((target_mask & (np.abs(dbins - 8) < 2) & (np.abs(rbins - 20) < 2)) + .astype(np.uint8) * 2) + det_packed_2bit = np.zeros((NUM_RANGE_BINS, BULK_DETECT_BYTES_PER_RANGE), + dtype=np.uint8) + for d_idx in range(NUM_DOPPLER_BINS): + byte_idx = d_idx // 4 + shift = (3 - (d_idx % 4)) * 2 # MSB-first within byte + det_packed_2bit[:, byte_idx] |= ( + (det_codes[:, d_idx] & 0x03) << shift + ).astype(np.uint8) buf = bytearray(BULK_FRAME_MAX_SIZE) buf[0] = HEADER_BYTE - buf[1] = flags & 0x3F # reserved high bits zero, matches RTL - buf[2] = (self._mock_frame_num >> 8) & 0xFF - buf[3] = self._mock_frame_num & 0xFF - buf[4] = (NUM_RANGE_BINS >> 8) & 0xFF - buf[5] = NUM_RANGE_BINS & 0xFF - buf[6] = (NUM_DOPPLER_BINS >> 8) & 0xFF - buf[7] = NUM_DOPPLER_BINS & 0xFF + buf[1] = RP_USB_PROTOCOL_VERSION + buf[2] = flags & 0x07 # only 3 stream-enable bits valid; reserved zero + buf[3] = (self._mock_frame_num >> 8) & 0xFF + buf[4] = self._mock_frame_num & 0xFF + buf[5] = (NUM_RANGE_BINS >> 8) & 0xFF + buf[6] = NUM_RANGE_BINS & 0xFF + buf[7] = (NUM_DOPPLER_BINS >> 8) & 0xFF + buf[8] = NUM_DOPPLER_BINS & 0xFF cursor = BULK_FRAME_HEADER_SIZE # Range profile (>u2 = big-endian uint16, matches FPGA MSB-first). buf[cursor:cursor + BULK_RANGE_SECTION_BYTES] = range_profile.astype(">u2").tobytes() cursor += BULK_RANGE_SECTION_BYTES buf[cursor:cursor + BULK_DOPPLER_MAG_BYTES] = mag_u16.astype(">u2").tobytes() cursor += BULK_DOPPLER_MAG_BYTES - buf[cursor:cursor + BULK_DETECT_DENSE_BYTES] = det_packed.tobytes() + buf[cursor:cursor + BULK_DETECT_DENSE_BYTES] = det_packed_2bit.tobytes() cursor += BULK_DETECT_DENSE_BYTES buf[cursor] = FOOTER_BYTE @@ -1041,7 +1103,10 @@ class RadarAcquisition(threading.Thread): frame = RadarFrame() frame.timestamp = time.time() frame.frame_number = parsed["frame_number"] - frame.mag_only = bool(parsed["flags"] & BULK_FLAG_MAG_ONLY) + # PR-G v2: bulk frames are always magnitude-only on the wire (no I/Q + # path implemented in the FPGA write FSM), so flag this for downstream + # consumers that expect mag-only when reading from bulk. + frame.mag_only = True rprof = parsed["range_profile"] if rprof is not None: diff --git a/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py b/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py index a965f9e..a7be7ac 100644 --- a/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py +++ b/9_Firmware/9_3_GUI/test_GUI_V65_Tk.py @@ -24,7 +24,8 @@ from radar_protocol import ( BULK_FRAME_HEADER_SIZE, BULK_FRAME_MAX_SIZE, BULK_RANGE_SECTION_BYTES, BULK_FOOTER_SIZE, BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR, - BULK_FLAG_MAG_ONLY, BULK_FLAG_SPARSE_DET, + BULK_FLAGS_RESERVED_MASK, + RP_USB_PROTOCOL_VERSION, STATUS_PACKET_SIZE, ) from GUI_V65_Tk import DemoTarget, DemoSimulator, _ReplayController @@ -131,8 +132,9 @@ class TestRadarProtocol(unittest.TestCase): short_listen=17450, chirps=32, range_mode=0, st_flags=0, st_detail=0, st_busy=0, agc_gain=0, agc_peak=0, agc_sat=0, agc_enable=0, - chirps_mismatch=0): - """Build a 26-byte status response matching FPGA format (Build 26).""" + chirps_mismatch=0, + cand_count=0, thr_soft=0, frame_drop=0): + """Build a PR-G v2 30-byte status response matching FPGA format.""" pkt = bytearray() pkt.append(STATUS_HEADER_BYTE) @@ -161,11 +163,17 @@ class TestRadarProtocol(unittest.TestCase): (range_mode & 0x03)) pkt += struct.pack(">I", w4) - # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0], - # 3'd0, self_test_flags[4:0]} - w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F) + # Word 5: {frame_drop[31:25], self_test_busy[24], 8'd0, + # self_test_detail[15:8], 3'd0, self_test_flags[4:0]} + w5 = (((frame_drop & 0x7F) << 25) | ((st_busy & 0x01) << 24) + | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F)) pkt += struct.pack(">I", w5) + # Word 6 (PR-G 2-tier CFAR telemetry): + # high 16 bits = detect_count_cand, low 16 bits = detect_threshold_soft + w6 = ((cand_count & 0xFFFF) << 16) | (thr_soft & 0xFFFF) + pkt += struct.pack(">I", w6) + pkt.append(FOOTER_BYTE) return bytes(pkt) @@ -199,7 +207,8 @@ class TestRadarProtocol(unittest.TestCase): self.assertEqual(sr.range_mode, 2) def test_parse_status_too_short(self): - self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20)) + # Anything under STATUS_PACKET_SIZE (30) must be rejected. + self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 28)) def test_parse_status_wrong_header(self): raw = self._make_status_packet() @@ -208,9 +217,17 @@ class TestRadarProtocol(unittest.TestCase): def test_parse_status_wrong_footer(self): raw = bytearray(self._make_status_packet()) - raw[25] = 0x00 # corrupt footer (was at index 21 in old 5-word format) + raw[STATUS_PACKET_SIZE - 1] = 0x00 # corrupt footer self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw))) + def test_parse_status_word6_2tier_cfar(self): + """PR-G v2: word[6] high-half = detect_count_cand, low-half = detect_threshold_soft.""" + raw = self._make_status_packet(cand_count=0x0A5C, thr_soft=0x1234) + sr = RadarProtocol.parse_status_packet(raw) + self.assertIsNotNone(sr) + self.assertEqual(sr.detect_count_cand, 0x0A5C) + self.assertEqual(sr.detect_threshold_soft, 0x1234) + def test_parse_status_word4_layout_co_spec(self): """GUI-S3: pin status word 4 bit positions to the FPGA word builder. @@ -309,10 +326,11 @@ class TestRadarProtocol(unittest.TestCase): self.assertEqual(sr.self_test_detail, 0) self.assertEqual(sr.self_test_busy, 0) - def test_status_packet_is_26_bytes(self): - """Verify status packet is exactly 26 bytes.""" + def test_status_packet_is_30_bytes(self): + """PR-G v2: status packet is 30 bytes (1 + 7*4 + 1).""" raw = self._make_status_packet() - self.assertEqual(len(raw), 26) + self.assertEqual(len(raw), STATUS_PACKET_SIZE) + self.assertEqual(len(raw), 30) # ---------------------------------------------------------------- # Boundary detection @@ -353,16 +371,18 @@ class TestRadarProtocol(unittest.TestCase): def test_find_boundaries_rejects_false_status_header(self): """GUI-S1: a stray 0xBB without 0xFF at offset+1 must NOT be - accepted as a status packet — even if 0x55 lands 25 bytes later.""" - forged = bytes([0xBB] + [0x00] * 24 + [0x55]) # byte 1 = 0x00, not 0xFF + accepted as a status packet — even if 0x55 lands at the right position.""" + # PR-G v2: 30-byte status; forge byte 1 = 0x00 (not 0xFF). + forged = bytes([0xBB] + [0x00] * (STATUS_PACKET_SIZE - 2) + [0x55]) + self.assertEqual(len(forged), STATUS_PACKET_SIZE) real = self._make_data_packet() buf = forged + real boundaries = RadarProtocol.find_packet_boundaries(buf) - # Forged status rejected; real data packet found 11 bytes in. + # Forged status rejected; real data packet found inside the buffer. data_hits = [b for b in boundaries if b[2] == "data"] status_hits = [b for b in boundaries if b[2] == "status"] self.assertEqual(len(status_hits), 0) - self.assertEqual(len(data_hits), 1) + self.assertGreaterEqual(len(data_hits), 1) def test_find_boundaries_recovers_after_byte_drop(self): """GUI-S1: simulate a single-byte drop — parser should re-lock on @@ -386,7 +406,7 @@ class TestBulkFrameParser(unittest.TestCase): def _build_bulk_frame( self, flags: int = (BULK_FLAG_STREAM_RANGE | BULK_FLAG_STREAM_DOPPLER - | BULK_FLAG_STREAM_CFAR | BULK_FLAG_MAG_ONLY), + | BULK_FLAG_STREAM_CFAR), frame_number: int = 0xBEEF, n_range: int = NUM_RANGE_BINS, n_doppler: int = NUM_DOPPLER_BINS, @@ -394,12 +414,14 @@ class TestBulkFrameParser(unittest.TestCase): doppler_seed: int = 2, cfar_seed: int = 3, bad_footer: bool = False, + version: int = RP_USB_PROTOCOL_VERSION, ) -> tuple[bytes, np.ndarray, np.ndarray, np.ndarray]: - """Synthesize a bulk frame matching usb_data_interface_ft2232h.v. + """Synthesize a PR-G v2 bulk frame matching usb_data_interface_ft2232h.v. - Returns (frame_bytes, range_profile, doppler_mag, cfar_dense). The + Returns (frame_bytes, range_profile, doppler_mag, cfar_codes). The latter three are the source-of-truth arrays used to generate the - bytes; tests can assert round-trip equality. + bytes; tests can assert round-trip equality. cfar_codes carries + 2-bit values 0..3 (NONE/CAND/CONFIRM/RSVD). """ rng_r = np.random.RandomState(range_seed) rng_d = np.random.RandomState(doppler_seed) @@ -411,15 +433,19 @@ class TestBulkFrameParser(unittest.TestCase): doppler_mag = (rng_d.randint(0, 65535, size=(n_range, n_doppler)) .astype(np.uint16) if (flags & BULK_FLAG_STREAM_DOPPLER) else None) - cfar_dense = (rng_c.randint(0, 2, size=(n_range, n_doppler)) + # PR-F 2-tier dense detect: 2 bits per cell (codes 0..3). + cfar_codes = (rng_c.randint(0, 4, size=(n_range, n_doppler)) .astype(np.uint8) if (flags & BULK_FLAG_STREAM_CFAR) else None) out = bytearray() out.append(HEADER_BYTE) - # Don't mask reserved bits here — the parser must reject any byte - # with bits [7:6] set, and the rejection test relies on those bits - # actually surviving into the synthesized frame. + # PR-G v2: byte 1 is the protocol version. Tests can override + # `version` to exercise rejection. + out.append(version & 0xFF) + # Don't mask reserved bits — the parser must reject any byte with + # bits in BULK_FLAGS_RESERVED_MASK set, and the rejection test + # relies on those bits actually surviving into the synthesized frame. out.append(flags & 0xFF) out.append((frame_number >> 8) & 0xFF) out.append(frame_number & 0xFF) @@ -431,10 +457,19 @@ class TestBulkFrameParser(unittest.TestCase): out += range_profile.astype(">u2").tobytes() if doppler_mag is not None: out += doppler_mag.astype(">u2").tobytes() - if cfar_dense is not None: - out += np.packbits(cfar_dense.flatten()).tobytes() + if cfar_codes is not None: + # Pack 4 cells per byte, MSB-first within byte (matches FPGA emit). + # Local bytes_per_range tracks the *actual* n_doppler so tests + # passing n_doppler != NUM_DOPPLER_BINS don't overflow the row. + bytes_per_range = (n_doppler * 2 + 7) // 8 + packed = np.zeros((n_range, bytes_per_range), dtype=np.uint8) + for d_idx in range(n_doppler): + byte_idx = d_idx // 4 + shift = (3 - (d_idx % 4)) * 2 + packed[:, byte_idx] |= ((cfar_codes[:, d_idx] & 0x03) << shift).astype(np.uint8) + out += packed.tobytes() out.append(0x00 if bad_footer else FOOTER_BYTE) - return bytes(out), range_profile, doppler_mag, cfar_dense + return bytes(out), range_profile, doppler_mag, cfar_codes def test_parse_full_frame_round_trip(self): """All-streams mag-only round trip: every cell exact.""" @@ -450,8 +485,7 @@ class TestBulkFrameParser(unittest.TestCase): np.testing.assert_array_equal(parsed["cfar_dense"], cdense) def test_parse_range_only(self): - flags = BULK_FLAG_STREAM_RANGE | BULK_FLAG_MAG_ONLY - raw, rprof, _dmag, _cdense = self._build_bulk_frame(flags=flags) + raw, rprof, _dmag, _cdense = self._build_bulk_frame(flags=BULK_FLAG_STREAM_RANGE) parsed = RadarProtocol.parse_bulk_frame(raw) self.assertIsNotNone(parsed) np.testing.assert_array_equal(parsed["range_profile"], rprof) @@ -463,8 +497,7 @@ class TestBulkFrameParser(unittest.TestCase): ) def test_parse_doppler_only(self): - flags = BULK_FLAG_STREAM_DOPPLER | BULK_FLAG_MAG_ONLY - raw, _rprof, dmag, _cdense = self._build_bulk_frame(flags=flags) + raw, _rprof, dmag, _cdense = self._build_bulk_frame(flags=BULK_FLAG_STREAM_DOPPLER) parsed = RadarProtocol.parse_bulk_frame(raw) self.assertIsNotNone(parsed) self.assertIsNone(parsed["range_profile"]) @@ -472,38 +505,27 @@ class TestBulkFrameParser(unittest.TestCase): self.assertIsNone(parsed["cfar_dense"]) def test_parse_cfar_only(self): - flags = BULK_FLAG_STREAM_CFAR | BULK_FLAG_MAG_ONLY - raw, _rprof, _dmag, cdense = self._build_bulk_frame(flags=flags) + raw, _rprof, _dmag, cdense = self._build_bulk_frame(flags=BULK_FLAG_STREAM_CFAR) parsed = RadarProtocol.parse_bulk_frame(raw) self.assertIsNotNone(parsed) np.testing.assert_array_equal(parsed["cfar_dense"], cdense) def test_parse_no_streams(self): - """Header + footer only (8 + 1 = 9 bytes).""" - flags = BULK_FLAG_MAG_ONLY # streams off, mag_only still set - raw, *_ = self._build_bulk_frame(flags=flags) - self.assertEqual(len(raw), 9) + """PR-G v2: header + footer only is 9 + 1 = 10 bytes.""" + raw, *_ = self._build_bulk_frame(flags=0) + self.assertEqual(len(raw), BULK_FRAME_HEADER_SIZE + BULK_FOOTER_SIZE) parsed = RadarProtocol.parse_bulk_frame(raw) self.assertIsNotNone(parsed) - self.assertEqual(parsed["frame_size"], 9) + self.assertEqual(parsed["frame_size"], BULK_FRAME_HEADER_SIZE + BULK_FOOTER_SIZE) - def test_reject_full_iq_until_rtl_lands(self): - """mag_only=0 must be rejected — FPGA write FSM doesn't emit it.""" - flags = BULK_FLAG_STREAM_DOPPLER # NB: mag_only bit cleared - raw, *_ = self._build_bulk_frame(flags=flags) - # Frame is structurally consistent; rejection comes from the - # mag_only=0 + stream_doppler=1 combination check in parse_bulk_frame. - self.assertIsNone(RadarProtocol.parse_bulk_frame(raw)) - - def test_reject_sparse_det(self): - """sparse_det=1 must be rejected — FPGA emits dense bitmap only.""" - flags = (BULK_FLAG_STREAM_CFAR | BULK_FLAG_MAG_ONLY | BULK_FLAG_SPARSE_DET) - raw, *_ = self._build_bulk_frame(flags=flags) + def test_reject_wrong_version_byte(self): + """PR-G v2: byte 1 must equal RP_USB_PROTOCOL_VERSION.""" + raw, *_ = self._build_bulk_frame(version=0x01) self.assertIsNone(RadarProtocol.parse_bulk_frame(raw)) def test_reject_reserved_bits_set(self): - """Reserved high bits in flags byte must be zero.""" - raw, *_ = self._build_bulk_frame(flags=0x80 | BULK_FLAG_MAG_ONLY) + """Reserved high bits (BULK_FLAGS_RESERVED_MASK) must be zero.""" + raw, *_ = self._build_bulk_frame(flags=BULK_FLAG_STREAM_DOPPLER | 0x80) self.assertIsNone(RadarProtocol.parse_bulk_frame(raw)) def test_reject_wrong_n_range(self): @@ -542,10 +564,12 @@ class TestBulkFrameParser(unittest.TestCase): self.assertIsNotNone(parsed) def test_find_boundaries_with_status(self): - """Status packets coexist with bulk frames in the same stream.""" + """Status packets (PR-G v2 30 B) coexist with bulk frames in the same stream.""" f1, *_ = self._build_bulk_frame() - # Build a minimal valid status packet (byte 1 = 0xFF, footer = 0x55). - status = bytes([STATUS_HEADER_BYTE, 0xFF] + [0x00] * 23 + [FOOTER_BYTE]) + # Build a minimal valid 30-byte status packet (byte 1 = 0xFF, footer = 0x55). + status = bytes([STATUS_HEADER_BYTE, 0xFF] + + [0x00] * (STATUS_PACKET_SIZE - 3) + [FOOTER_BYTE]) + self.assertEqual(len(status), STATUS_PACKET_SIZE) buf = f1 + status out = RadarProtocol.find_bulk_frame_boundaries(buf) types = [t for _s, _e, t in out] @@ -645,8 +669,8 @@ class TestFT2232HConnection(unittest.TestCase): self.assertIsNotNone(parsed) self.assertEqual(parsed["n_range"], NUM_RANGE_BINS) self.assertEqual(parsed["n_doppler"], NUM_DOPPLER_BINS) - self.assertTrue(parsed["flags"] & BULK_FLAG_MAG_ONLY) - self.assertFalse(parsed["flags"] & BULK_FLAG_SPARSE_DET) + # PR-G v2: only the low 3 stream-enable bits are valid. + self.assertEqual(parsed["flags"] & BULK_FLAGS_RESERVED_MASK, 0) conn.close() def test_mock_write(self): diff --git a/9_Firmware/9_3_GUI/test_v7.py b/9_Firmware/9_3_GUI/test_v7.py index 96cc70e..8b8e330 100644 --- a/9_Firmware/9_3_GUI/test_v7.py +++ b/9_Firmware/9_3_GUI/test_v7.py @@ -353,6 +353,177 @@ class TestRadarDataWorkerInit(unittest.TestCase): "set_waveform must not reset runtime counters") +# ============================================================================= +# Test: radar_protocol PR-G v2 bulk frame + status round-trip +# Audit P-2/P-3: GUI parser must agree byte-for-byte with the FPGA emit +# (usb_data_interface_ft2232h.v). Build synthetic frames the way the FPGA +# does, then parse them back and check every field. Catches: +# - 8 vs 9 byte header (version byte at offset 1) +# - reserved-bit mask 0xC0 vs 0xF8 +# - 1-bit vs 2-bit detect packing +# - 26 vs 30 byte status, 6 vs 7 status_words +# ============================================================================= + +class TestBulkFrameV2RoundTrip(unittest.TestCase): + """PR-G v2 bulk frame: build synthetic FPGA emit, parse back, check.""" + + def _build_v2_frame(self, flags: int, frame_num: int = 0, + doppler: np.ndarray | None = None, + cfar_codes: np.ndarray | None = None, + range_profile: np.ndarray | None = None) -> bytes: + """Construct a v2 frame the way usb_data_interface_ft2232h.v emits.""" + from radar_protocol import ( + HEADER_BYTE, FOOTER_BYTE, RP_USB_PROTOCOL_VERSION, + NUM_RANGE_BINS, NUM_DOPPLER_BINS, + BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR, + BULK_DETECT_BYTES_PER_RANGE, + ) + parts = [ + bytes([HEADER_BYTE, RP_USB_PROTOCOL_VERSION, flags & 0xFF]), + struct.pack(">H", frame_num), + struct.pack(">H", NUM_RANGE_BINS), + struct.pack(">H", NUM_DOPPLER_BINS), + ] + if flags & BULK_FLAG_STREAM_RANGE: + rp = (range_profile if range_profile is not None + else np.arange(NUM_RANGE_BINS, dtype=np.uint16)) + parts.append(rp.astype(">u2").tobytes()) + if flags & BULK_FLAG_STREAM_DOPPLER: + d = (doppler if doppler is not None + else np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint16)) + parts.append(d.astype(">u2").tobytes()) + if flags & BULK_FLAG_STREAM_CFAR: + codes = (cfar_codes if cfar_codes is not None + else np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8)) + # Pack 4 cells per byte, MSB-first within byte + packed = np.zeros((NUM_RANGE_BINS, BULK_DETECT_BYTES_PER_RANGE), dtype=np.uint8) + for d_idx in range(NUM_DOPPLER_BINS): + byte_idx = d_idx // 4 + shift = (3 - (d_idx % 4)) * 2 # MSB-first + packed[:, byte_idx] |= ((codes[:, d_idx] & 0x03) << shift).astype(np.uint8) + parts.append(packed.tobytes()) + parts.append(bytes([FOOTER_BYTE])) + return b"".join(parts) + + def test_full_frame_round_trip(self): + from radar_protocol import ( + RadarProtocol, NUM_RANGE_BINS, NUM_DOPPLER_BINS, + BULK_FLAG_STREAM_RANGE, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR, + BULK_FRAME_HEADER_SIZE, + ) + # Synthetic detection map: scatter all 4 codes across the grid + codes = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8) + codes[10, 5] = 1 # CAND + codes[100, 17] = 2 # CONFIRM + codes[300, 47] = 3 # reserved (still legal on the wire) + doppler = np.full((NUM_RANGE_BINS, NUM_DOPPLER_BINS), 1234, dtype=np.uint16) + rp = np.arange(NUM_RANGE_BINS, dtype=np.uint16) + + flags = (BULK_FLAG_STREAM_RANGE | BULK_FLAG_STREAM_DOPPLER + | BULK_FLAG_STREAM_CFAR) + frame = self._build_v2_frame(flags, frame_num=42, + doppler=doppler, cfar_codes=codes, + range_profile=rp) + # 9 + 1024 + 49152 + 6144 + 1 = 56330 + self.assertEqual(len(frame), BULK_FRAME_HEADER_SIZE + 1024 + 49152 + 6144 + 1) + self.assertEqual(BULK_FRAME_HEADER_SIZE, 9) + + parsed = RadarProtocol.parse_bulk_frame(frame) + self.assertIsNotNone(parsed) + self.assertEqual(parsed["frame_number"], 42) + self.assertEqual(parsed["flags"], flags) + self.assertEqual(parsed["n_range"], NUM_RANGE_BINS) + self.assertEqual(parsed["n_doppler"], NUM_DOPPLER_BINS) + np.testing.assert_array_equal(parsed["range_profile"], rp) + np.testing.assert_array_equal(parsed["doppler_mag"], doppler) + np.testing.assert_array_equal(parsed["cfar_dense"], codes) + + def test_reject_wrong_version_byte(self): + from radar_protocol import RadarProtocol + frame = self._build_v2_frame(0x07) + # Corrupt the version byte + bad = bytes([frame[0], 0x01]) + frame[2:] + self.assertIsNone(RadarProtocol.parse_bulk_frame(bad)) + + def test_reject_reserved_flag_bits(self): + from radar_protocol import RadarProtocol + # Set bit 7 (reserved); byte order: HEADER, ver, flags + frame = self._build_v2_frame(0x07) + bad = bytes([frame[0], frame[1], frame[2] | 0x80]) + frame[3:] + self.assertIsNone(RadarProtocol.parse_bulk_frame(bad)) + + def test_detect_2bit_codes_independently(self): + """Each cell decodes to the same 2-bit code that was packed.""" + from radar_protocol import ( + RadarProtocol, NUM_RANGE_BINS, NUM_DOPPLER_BINS, + BULK_FLAG_STREAM_CFAR, + ) + # All four codes in adjacent cells of the same byte + codes = np.zeros((NUM_RANGE_BINS, NUM_DOPPLER_BINS), dtype=np.uint8) + codes[0, 0] = 0 + codes[0, 1] = 1 + codes[0, 2] = 2 + codes[0, 3] = 3 + frame = self._build_v2_frame(BULK_FLAG_STREAM_CFAR, cfar_codes=codes) + parsed = RadarProtocol.parse_bulk_frame(frame) + self.assertEqual(parsed["cfar_dense"][0, 0], 0) + self.assertEqual(parsed["cfar_dense"][0, 1], 1) + self.assertEqual(parsed["cfar_dense"][0, 2], 2) + self.assertEqual(parsed["cfar_dense"][0, 3], 3) + + def test_find_boundaries_on_back_to_back_frames(self): + from radar_protocol import ( + RadarProtocol, BULK_FLAG_STREAM_DOPPLER, BULK_FLAG_STREAM_CFAR, + ) + flags = BULK_FLAG_STREAM_DOPPLER | BULK_FLAG_STREAM_CFAR + f1 = self._build_v2_frame(flags, frame_num=1) + f2 = self._build_v2_frame(flags, frame_num=2) + boundaries = RadarProtocol.find_bulk_frame_boundaries(f1 + f2) + self.assertEqual(len(boundaries), 2) + self.assertEqual(boundaries[0], (0, len(f1), "data")) + self.assertEqual(boundaries[1], (len(f1), len(f1) + len(f2), "data")) + + +class TestStatusPacketV2RoundTrip(unittest.TestCase): + """PR-G v2 status packet: 7 status_words / 30 bytes.""" + + def _build_status(self, words: list[int]) -> bytes: + from radar_protocol import STATUS_HEADER_BYTE, FOOTER_BYTE + assert len(words) == 7 + body = b"".join(struct.pack(">I", w & 0xFFFFFFFF) for w in words) + return bytes([STATUS_HEADER_BYTE]) + body + bytes([FOOTER_BYTE]) + + def test_size_is_30(self): + from radar_protocol import STATUS_PACKET_SIZE + self.assertEqual(STATUS_PACKET_SIZE, 30) + pkt = self._build_status([0] * 7) + self.assertEqual(len(pkt), 30) + + def test_word6_telemetry_decoded(self): + """word[6] = {detect_count_cand[31:16], detect_threshold_soft[15:0]}""" + from radar_protocol import RadarProtocol + word6 = (0x1234 << 16) | 0xABCD # cand=0x1234, thr_soft=0xABCD + pkt = self._build_status([0, 0, 0, 0, 0, 0, word6]) + sr = RadarProtocol.parse_status_packet(pkt) + self.assertIsNotNone(sr) + self.assertEqual(sr.detect_count_cand, 0x1234) + self.assertEqual(sr.detect_threshold_soft, 0xABCD) + + def test_short_packet_returns_none(self): + from radar_protocol import RadarProtocol + pkt = self._build_status([0] * 7) + self.assertIsNone(RadarProtocol.parse_status_packet(pkt[:25])) + + def test_pre_PR_G_26byte_packet_rejected(self): + """Old 26-byte status packets must NOT silently parse — they're stale.""" + from radar_protocol import RadarProtocol, STATUS_HEADER_BYTE, FOOTER_BYTE + # Build a 26-byte packet (legacy format). + old_pkt = (bytes([STATUS_HEADER_BYTE]) + + b"\x00" * 24 + bytes([FOOTER_BYTE])) + self.assertEqual(len(old_pkt), 26) + self.assertIsNone(RadarProtocol.parse_status_packet(old_pkt)) + + # ============================================================================= # Test: v7.__init__ — clean exports # =============================================================================