chore(tests): retire v1 cross-layer iverilog cosim tier

The v1-era tb_cross_layer_ft2232h.v cosim TB no longer matches
production after the protocol-v2 / opcode dispatch rework (PR-G).
Equivalent v2 coverage now lives in the FPGA regression's
tb_usb_protocol_v2.v and tb_system_opcodes.v.

Removed:
  - tb_cross_layer_ft2232h.v (716 lines)
  - Tier 2 (Verilog cosimulation) from test_cross_layer_contract.py
  - iverilog/vvp tool detection and CI install step in ci-tests.yml

Tier 1 (static parser) and Tier 3 (C stub execution) remain. CI
no longer needs apt-get install iverilog.

contract_parser.py updated to reflect the slimmer two-tier model.
This commit is contained in:
Jason
2026-05-04 21:06:34 +05:45
parent b84aa6a6f3
commit 416601d1d0
4 changed files with 196 additions and 978 deletions
+4 -4
View File
@@ -106,7 +106,10 @@ jobs:
# =========================================================================== # ===========================================================================
# Cross-Layer Contract Tests (Python ↔ Verilog ↔ C) # Cross-Layer Contract Tests (Python ↔ Verilog ↔ C)
# Validates opcode maps, bit widths, packet layouts, and round-trip # Validates opcode maps, bit widths, packet layouts, and round-trip
# correctness across FPGA RTL, Python GUI, and STM32 firmware. # correctness across FPGA RTL, Python GUI, and STM32 firmware. Runs
# entirely as Python static parsers + the Tier 3 C stub; the v1-era
# iverilog cosim TB was retired post-PR-G (equivalent v2 coverage now
# lives in the FPGA regression's tb_usb_protocol_v2 / tb_system_opcodes).
# =========================================================================== # ===========================================================================
cross-layer-tests: cross-layer-tests:
name: Cross-Layer Contract Tests name: Cross-Layer Contract Tests
@@ -124,9 +127,6 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: uv sync --group dev run: uv sync --group dev
- name: Install Icarus Verilog
run: sudo apt-get update && sudo apt-get install -y iverilog
- name: Run cross-layer contract tests - name: Run cross-layer contract tests
run: > run: >
uv run pytest uv run pytest
+122 -36
View File
@@ -128,7 +128,16 @@ def parse_python_opcodes(filepath: Path | None = None) -> dict[int, OpcodeEntry]
def parse_python_packet_constants(filepath: Path | None = None) -> dict[str, PacketConstants]: def parse_python_packet_constants(filepath: Path | None = None) -> dict[str, PacketConstants]:
"""Extract HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, packet sizes.""" """
Extract HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, STATUS_PACKET_SIZE.
Note on the data packet: PR-G replaced the fixed 11-byte v1 frame with a
variable-length bulk frame (header + optional sections + footer), so a
single ``DATA_PACKET_SIZE`` constant no longer characterizes the data
layer. Python keeps ``DATA_PACKET_SIZE = 11`` as a back-compat alias for
legacy FT601 log files; we deliberately do NOT cross-check it against
the FPGA, which has no equivalent localparam in either USB module.
"""
if filepath is None: if filepath is None:
filepath = GUI_DIR / "radar_protocol.py" filepath = GUI_DIR / "radar_protocol.py"
text = filepath.read_text() text = filepath.read_text()
@@ -140,14 +149,11 @@ def parse_python_packet_constants(filepath: Path | None = None) -> dict[str, Pac
val = m.group(1) val = m.group(1)
return int(val, 16) if val.startswith("0x") else int(val) return int(val, 16) if val.startswith("0x") else int(val)
header = _find(r'HEADER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)')
footer = _find(r'FOOTER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)') footer = _find(r'FOOTER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)')
status_header = _find(r'STATUS_HEADER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)') status_header = _find(r'STATUS_HEADER_BYTE\s*=\s*(0x[0-9a-fA-F]+|\d+)')
data_size = _find(r'DATA_PACKET_SIZE\s*=\s*(\d+)')
status_size = _find(r'STATUS_PACKET_SIZE\s*=\s*(\d+)') status_size = _find(r'STATUS_PACKET_SIZE\s*=\s*(\d+)')
return { return {
"data": PacketConstants(header=header, footer=footer, size=data_size),
"status": PacketConstants(header=status_header, footer=footer, size=status_size), "status": PacketConstants(header=status_header, footer=footer, size=status_size),
} }
@@ -372,39 +378,106 @@ def parse_verilog_opcodes(filepath: Path | None = None) -> dict[int, OpcodeEntry
return opcodes return opcodes
def _resolve_verilog_value(rhs: str, macros: dict[str, int]) -> int | None:
"""
Resolve a Verilog RHS expression to an integer. Supports:
* Plain decimal: ``1234``
* Verilog literal: ``16'd1234``, ``8'hAA``, ``2'b01``, ``6'b000_111``
* Macro reference: ```MACRO_NAME`` (looked up in *macros*)
* Width-prefixed macro: ``16'd`MACRO_NAME``
Returns None when the RHS can't be resolved (e.g. RHS is a wire name,
concatenation, or an undefined macro).
"""
rhs = rhs.strip()
# Width-prefixed form: "16'd...", "8'h...", "2'b...", "4'o..."
width_match = re.match(r"^(\d+)'([bdho])(.*)$", rhs)
if width_match:
base_char = width_match.group(2)
rest = width_match.group(3).strip()
# Width-prefixed macro reference: 16'd`RP_DEF_FOO
if rest.startswith("`"):
return macros.get(rest[1:].strip())
digits = rest.replace("_", "")
if not re.fullmatch(r"[0-9a-fA-F]+", digits):
return None
base = {"b": 2, "d": 10, "h": 16, "o": 8}[base_char]
try:
return int(digits, base)
except ValueError:
return None
# Bare macro reference: `MACRO_NAME
if rhs.startswith("`"):
return macros.get(rhs[1:].strip())
# Plain decimal
if rhs.isdigit():
return int(rhs)
return None
def parse_radar_params_macros(filepath: Path | None = None) -> dict[str, int]:
"""
Parse `define directives in radar_params.vh into a name → integer map.
Resolves up to two macro→macro indirections (none expected today, kept
for forward compatibility).
"""
if filepath is None:
filepath = FPGA_DIR / "radar_params.vh"
text = filepath.read_text()
raw: dict[str, str] = {}
for line in text.splitlines():
m = re.match(r"^\s*`define\s+(\w+)\s+(\S.*?)(?:\s*//.*)?\s*$", line)
if m:
raw[m.group(1)] = m.group(2).strip()
macros: dict[str, int] = {}
for _ in range(3): # bounded fixed-point — file is small, runs in microseconds
progressed = False
for name, rhs in raw.items():
if name in macros:
continue
val = _resolve_verilog_value(rhs, macros)
if val is not None:
macros[name] = val
progressed = True
if not progressed:
break
return macros
def parse_verilog_reset_defaults(filepath: Path | None = None) -> dict[str, int]: def parse_verilog_reset_defaults(filepath: Path | None = None) -> dict[str, int]:
""" """
Parse the reset block from radar_system_top.v. Parse the reset block from radar_system_top.v.
Returns {register_name: reset_value}. Returns {register_name: reset_value}.
Expands ```RP_DEF_*`` style macro references against radar_params.vh so
that fields like ``host_long_chirp_cycles <= 16'd`RP_DEF_LONG_CHIRP_CYCLES``
resolve to integers instead of being silently dropped.
""" """
if filepath is None: if filepath is None:
filepath = FPGA_DIR / "radar_system_top.v" filepath = FPGA_DIR / "radar_system_top.v"
text = filepath.read_text() text = filepath.read_text()
macros = parse_radar_params_macros()
defaults: dict[str, int] = {} defaults: dict[str, int] = {}
# Match patterns like: host_radar_mode <= 2'b01; # Capture every "host_X <= <expr>;" assignment, regardless of RHS form.
# Also: host_detect_threshold <= 16'd10000; # Resolution to an integer happens via _resolve_verilog_value, which
for m in re.finditer( # rejects (returns None for) RHSes that aren't statically known
r'(host_\w+)\s*<=\s*(\d+\'[bdho][0-9a-fA-F_]+|\d+)\s*;', # constants (e.g. concatenations or wire names from the opcode decode
text # block — those land below the reset block, and we keep only the first
): # occurrence anyway).
for m in re.finditer(r'(host_\w+)\s*<=\s*([^;]+?)\s*;', text):
reg = m.group(1) reg = m.group(1)
val_str = m.group(2) if reg in defaults:
continue # reset block precedes opcode block; first wins
# Parse Verilog literal val = _resolve_verilog_value(m.group(2), macros)
if "'" in val_str: if val is not None:
base_char = val_str.split("'")[1][0].lower() defaults[reg] = val
digits = val_str.split("'")[1][1:].replace("_", "")
base = {"b": 2, "d": 10, "h": 16, "o": 8}[base_char]
value = int(digits, base)
else:
value = int(val_str)
# Only keep first occurrence (the reset block comes before the
# opcode decode which also has <= assignments)
if reg not in defaults:
defaults[reg] = value
return defaults return defaults
@@ -435,7 +508,15 @@ def parse_verilog_register_widths(filepath: Path | None = None) -> dict[str, int
def parse_verilog_packet_constants( def parse_verilog_packet_constants(
filepath: Path | None = None, filepath: Path | None = None,
) -> dict[str, PacketConstants]: ) -> dict[str, PacketConstants]:
"""Extract HEADER, FOOTER, STATUS_HEADER, packet size localparams.""" """
Extract HEADER, FOOTER, STATUS_HEADER, STATUS_PKT_LEN localparams.
Note: ``DATA_PKT_LEN`` was retired in PR-G when the data path moved to a
variable-length bulk frame (9-byte header + optional sections + 1-byte
footer). There is no equivalent constant in the v2 module; the data
layer is exercised separately by `parse_verilog_data_mux()` against the
fixed 9-byte header section.
"""
if filepath is None: if filepath is None:
filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" filepath = FPGA_DIR / "usb_data_interface_ft2232h.v"
text = filepath.read_text() text = filepath.read_text()
@@ -454,15 +535,11 @@ def parse_verilog_packet_constants(
return int(vlog_m.group(1)) return int(vlog_m.group(1))
return int(val, 16) if val.startswith("0x") else int(val) return int(val, 16) if val.startswith("0x") else int(val)
header_val = _find(r"localparam\s+HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)")
footer_val = _find(r"localparam\s+FOOTER\s*=\s*(\d+'h[0-9a-fA-F]+)") footer_val = _find(r"localparam\s+FOOTER\s*=\s*(\d+'h[0-9a-fA-F]+)")
status_hdr = _find(r"localparam\s+STATUS_HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)") status_hdr = _find(r"localparam\s+STATUS_HEADER\s*=\s*(\d+'h[0-9a-fA-F]+)")
data_size = _find(r"DATA_PKT_LEN\s*=\s*(\d+'d\d+)")
status_size = _find(r"STATUS_PKT_LEN\s*=\s*(\d+'d\d+)") status_size = _find(r"STATUS_PKT_LEN\s*=\s*(\d+'d\d+)")
return { return {
"data": PacketConstants(header=header_val, footer=footer_val, size=data_size),
"status": PacketConstants(header=status_hdr, footer=footer_val, size=status_size), "status": PacketConstants(header=status_hdr, footer=footer_val, size=status_size),
} }
@@ -582,26 +659,35 @@ def parse_verilog_data_mux(
filepath: Path | None = None, filepath: Path | None = None,
) -> list[DataPacketField]: ) -> list[DataPacketField]:
""" """
Parse the data_pkt_byte mux from usb_data_interface_ft2232h.v. Parse the v2 data-frame 9-byte fixed header mux from
Returns fields with byte positions and signal names. usb_data_interface_ft2232h.v. Returns fields with byte positions and
signal names.
PR-G replaced the v1 11-byte fixed data packet (combinational
``always @(*) begin case (wr_byte_idx) ... data_pkt_byte = ...``) with
a clocked FSM that emits a fixed 9-byte header followed by optional
variable-length sections. This parser walks the WR_FRAME_HEADER
``case (wr_byte_idx[3:0]) ... ft_data_out <= ...`` block.
""" """
if filepath is None: if filepath is None:
filepath = FPGA_DIR / "usb_data_interface_ft2232h.v" filepath = FPGA_DIR / "usb_data_interface_ft2232h.v"
text = filepath.read_text() text = filepath.read_text()
# Find the data mux case block # Find the WR_FRAME_HEADER mux: the case block that drives ft_data_out
# from the low 4 bits of the byte index, one assignment per fixed-header
# byte (4'd0..4'd8).
match = re.search( match = re.search(
r'always\s+@\(\*\)\s+begin\s+case\s*\(wr_byte_idx\)(.*?)endcase', r'case\s*\(\s*wr_byte_idx\s*\[\s*3\s*:\s*0\s*\]\s*\)(.*?)endcase',
text, re.DOTALL text, re.DOTALL
) )
if not match: if not match:
raise ValueError("Could not find data_pkt_byte mux") raise ValueError("Could not find v2 data-frame header mux")
mux_body = match.group(1) mux_body = match.group(1)
entries: list[tuple[int, str]] = [] entries: list[tuple[int, str]] = []
for m in re.finditer( for m in re.finditer(
r"5'd(\d+)\s*:\s*data_pkt_byte\s*=\s*(.+?);", r"4'd(\d+)\s*:\s*ft_data_out\s*<=\s*(.+?);",
mux_body, re.DOTALL mux_body, re.DOTALL
): ):
idx = int(m.group(1)) idx = int(m.group(1))
@@ -1,716 +0,0 @@
`timescale 1ns / 1ps
/**
* tb_cross_layer_ft2232h.v
*
* Cross-layer contract testbench for the FT2232H USB interface.
* Exercises three packet types with known distinctive values and dumps
* captured bytes to text files that the Python orchestrator can parse.
*
* Exercise A: Command round-trip (Host -> FPGA)
* - Send every opcode through the 4-byte read FSM
* - Dump cmd_opcode, cmd_addr, cmd_value to cmd_results.txt
*
* Exercise B: Data packet generation (FPGA -> Host)
* - Inject known range/doppler/cfar values
* - Capture all 11 output bytes
* - Dump to data_packet.txt
*
* Exercise C: Status packet generation (FPGA -> Host)
* - Set all status inputs to known non-zero values
* - Trigger status request
* - Capture all 26 output bytes
* - Dump to status_packet.txt
*/
module tb_cross_layer_ft2232h;
// Clock periods
localparam CLK_PERIOD = 10.0; // 100 MHz system clock
localparam FT_CLK_PERIOD = 16.67; // 60 MHz FT2232H clock
// ---- Signals ----
reg clk;
reg reset_n;
reg ft_reset_n;
// Radar data inputs
reg [31:0] range_profile;
reg range_valid;
reg [15:0] doppler_real;
reg [15:0] doppler_imag;
reg doppler_valid;
reg cfar_detection;
reg cfar_valid;
// FT2232H physical interface
wire [7:0] ft_data;
reg ft_rxf_n;
reg ft_txe_n;
wire ft_rd_n;
wire ft_wr_n;
wire ft_oe_n;
wire ft_siwu;
reg ft_clk;
// Host-side bus driver (for command injection)
reg [7:0] host_data_drive;
reg host_data_drive_en;
assign ft_data = host_data_drive_en ? host_data_drive : 8'hZZ;
// Pulldown to avoid X during idle
pulldown pd[7:0] (ft_data);
// DUT command outputs
wire [31:0] cmd_data;
wire cmd_valid;
wire [7:0] cmd_opcode;
wire [7:0] cmd_addr;
wire [15:0] cmd_value;
// Stream control
reg [2:0] stream_control;
// Status inputs
reg status_request;
reg [15:0] status_cfar_threshold;
reg [2:0] status_stream_ctrl;
reg [1:0] status_radar_mode;
reg [15:0] status_long_chirp;
reg [15:0] status_long_listen;
reg [15:0] status_guard;
reg [15:0] status_short_chirp;
reg [15:0] status_short_listen;
reg [5:0] status_chirps_per_elev;
reg [1:0] status_range_mode;
reg [4:0] status_self_test_flags;
reg [7:0] status_self_test_detail;
reg status_self_test_busy;
reg [3:0] status_agc_current_gain;
reg [7:0] status_agc_peak_magnitude;
reg [7:0] status_agc_saturation_count;
reg status_agc_enable;
// ---- Clock generators ----
always #(CLK_PERIOD / 2) clk = ~clk;
always #(FT_CLK_PERIOD / 2) ft_clk = ~ft_clk;
// ---- DUT instantiation ----
usb_data_interface_ft2232h uut (
.clk (clk),
.reset_n (reset_n),
.ft_reset_n (ft_reset_n),
.range_profile (range_profile),
.range_valid (range_valid),
.doppler_real (doppler_real),
.doppler_imag (doppler_imag),
.doppler_valid (doppler_valid),
.cfar_detection (cfar_detection),
.cfar_valid (cfar_valid),
.ft_data (ft_data),
.ft_rxf_n (ft_rxf_n),
.ft_txe_n (ft_txe_n),
.ft_rd_n (ft_rd_n),
.ft_wr_n (ft_wr_n),
.ft_oe_n (ft_oe_n),
.ft_siwu (ft_siwu),
.ft_clk (ft_clk),
.cmd_data (cmd_data),
.cmd_valid (cmd_valid),
.cmd_opcode (cmd_opcode),
.cmd_addr (cmd_addr),
.cmd_value (cmd_value),
.stream_control (stream_control),
.status_request (status_request),
.status_cfar_threshold (status_cfar_threshold),
.status_stream_ctrl (status_stream_ctrl),
.status_radar_mode (status_radar_mode),
.status_long_chirp (status_long_chirp),
.status_long_listen (status_long_listen),
.status_guard (status_guard),
.status_short_chirp (status_short_chirp),
.status_short_listen (status_short_listen),
.status_chirps_per_elev (status_chirps_per_elev),
.status_range_mode (status_range_mode),
.status_self_test_flags (status_self_test_flags),
.status_self_test_detail(status_self_test_detail),
.status_self_test_busy (status_self_test_busy),
.status_agc_current_gain (status_agc_current_gain),
.status_agc_peak_magnitude (status_agc_peak_magnitude),
.status_agc_saturation_count(status_agc_saturation_count),
.status_agc_enable (status_agc_enable)
);
// ---- Test bookkeeping ----
integer pass_count;
integer fail_count;
integer test_num;
integer cmd_file;
integer data_file;
integer status_file;
// ---- Check task ----
task check;
input cond;
input [511:0] label;
begin
test_num = test_num + 1;
if (cond) begin
$display("[PASS] Test %0d: %0s", test_num, label);
pass_count = pass_count + 1;
end else begin
$display("[FAIL] Test %0d: %0s", test_num, label);
fail_count = fail_count + 1;
end
end
endtask
// ---- Helper: apply reset ----
task apply_reset;
begin
reset_n = 0;
ft_reset_n = 0;
range_profile = 32'h0;
range_valid = 0;
doppler_real = 16'h0;
doppler_imag = 16'h0;
doppler_valid = 0;
cfar_detection = 0;
cfar_valid = 0;
ft_rxf_n = 1; // No host data available
ft_txe_n = 0; // TX FIFO ready
host_data_drive = 8'h0;
host_data_drive_en = 0;
stream_control = 3'b111;
status_request = 0;
status_cfar_threshold = 16'd0;
status_stream_ctrl = 3'b000;
status_radar_mode = 2'b00;
status_long_chirp = 16'd0;
status_long_listen = 16'd0;
status_guard = 16'd0;
status_short_chirp = 16'd0;
status_short_listen = 16'd0;
status_chirps_per_elev = 6'd0;
status_range_mode = 2'b00;
status_self_test_flags = 5'b00000;
status_self_test_detail = 8'd0;
status_self_test_busy = 1'b0;
status_agc_current_gain = 4'd0;
status_agc_peak_magnitude = 8'd0;
status_agc_saturation_count = 8'd0;
status_agc_enable = 1'b0;
repeat (6) @(posedge ft_clk);
reset_n = 1;
ft_reset_n = 1;
// Wait for stream_control CDC to propagate
repeat (8) @(posedge ft_clk);
end
endtask
// ---- Helper: send one 4-byte command via FT2232H read path ----
//
// FT2232H read FSM cycle-by-cycle:
// Cycle 0 (RD_IDLE): sees !ft_rxf_n → ft_oe_n<=0, → RD_OE_ASSERT
// Cycle 1 (RD_OE_ASSERT): sees !ft_rxf_n → ft_rd_n<=0, → RD_READING
// Cycle 2 (RD_READING): samples ft_data=byte0, cnt 0→1
// Cycle 3 (RD_READING): samples ft_data=byte1, cnt 1→2
// Cycle 4 (RD_READING): samples ft_data=byte2, cnt 2→3
// Cycle 5 (RD_READING): samples ft_data=byte3, cnt=3→0, → RD_DEASSERT
// Cycle 6 (RD_DEASSERT): ft_oe_n<=1, → RD_PROCESS
// Cycle 7 (RD_PROCESS): cmd_valid<=1, decode, → RD_IDLE
//
// Data must be stable BEFORE the sampling posedge. We use #1 after
// posedge to change data in the "delta after" region.
task send_command_ft2232h;
input [7:0] byte0; // opcode
input [7:0] byte1; // addr
input [7:0] byte2; // value_hi
input [7:0] byte3; // value_lo
begin
// Pre-drive byte0 and signal data available
@(posedge ft_clk); #1;
host_data_drive = byte0;
host_data_drive_en = 1;
ft_rxf_n = 0;
// Cycle 0: RD_IDLE sees !ft_rxf_n, goes to OE_ASSERT
@(posedge ft_clk); #1;
// Cycle 1: RD_OE_ASSERT, ft_rd_n goes low, goes to RD_READING
@(posedge ft_clk); #1;
// Cycle 2: RD_READING, byte0 is sampled, cnt 0→1
// Now change to byte1 for next sample
@(posedge ft_clk); #1;
host_data_drive = byte1;
// Cycle 3: RD_READING, byte1 is sampled, cnt 1→2
@(posedge ft_clk); #1;
host_data_drive = byte2;
// Cycle 4: RD_READING, byte2 is sampled, cnt 2→3
@(posedge ft_clk); #1;
host_data_drive = byte3;
// Cycle 5: RD_READING, byte3 is sampled, cnt=3, → RD_DEASSERT
@(posedge ft_clk); #1;
// Cycle 6: RD_DEASSERT, ft_oe_n←1, → RD_PROCESS
@(posedge ft_clk); #1;
// Cycle 7: RD_PROCESS, cmd decoded, cmd_valid←1, → RD_IDLE
@(posedge ft_clk); #1;
// cmd_valid was asserted at cycle 7's posedge. cmd_opcode/addr/value
// are now valid (registered outputs hold until next RD_PROCESS).
// Release bus
host_data_drive_en = 0;
host_data_drive = 8'h0;
ft_rxf_n = 1;
// Settle
repeat (2) @(posedge ft_clk);
end
endtask
// ---- Helper: capture N write bytes from the DUT ----
// Monitors ft_wr_n and ft_data_out, captures bytes into array.
// Used for data packets (11 bytes) and status packets (26 bytes).
reg [7:0] captured_bytes [0:31];
integer capture_count;
task capture_write_bytes;
input integer expected_count;
integer timeout;
begin
capture_count = 0;
timeout = 0;
while (capture_count < expected_count && timeout < 2000) begin
@(posedge ft_clk); #1;
timeout = timeout + 1;
// DUT drives byte when ft_wr_n=0 and ft_data_oe=1
// Sample AFTER posedge so registered outputs are settled
if (!ft_wr_n && uut.ft_data_oe) begin
captured_bytes[capture_count] = uut.ft_data_out;
capture_count = capture_count + 1;
end
end
end
endtask
// ---- Helper: pulse range_valid with CDC wait ----
// Toggle CDC needs 3 sync stages + edge detect = 4+ ft_clk cycles.
// Use 12 for safety margin.
task assert_range_valid;
input [31:0] data;
begin
@(posedge clk); #1;
range_profile = data;
range_valid = 1;
@(posedge clk); #1;
range_valid = 0;
// Wait for toggle CDC propagation
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse doppler_valid ----
task pulse_doppler;
input [15:0] dr;
input [15:0] di;
begin
@(posedge clk); #1;
doppler_real = dr;
doppler_imag = di;
doppler_valid = 1;
@(posedge clk); #1;
doppler_valid = 0;
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse cfar_valid ----
task pulse_cfar;
input det;
begin
@(posedge clk); #1;
cfar_detection = det;
cfar_valid = 1;
@(posedge clk); #1;
cfar_valid = 0;
repeat (12) @(posedge ft_clk);
end
endtask
// ---- Helper: pulse status_request ----
task pulse_status_request;
begin
@(posedge clk); #1;
status_request = 1;
@(posedge clk); #1;
status_request = 0;
// Wait for toggle CDC propagation
repeat (12) @(posedge ft_clk);
end
endtask
// ================================================================
// Main stimulus
// ================================================================
integer i;
initial begin
$dumpfile("tb_cross_layer_ft2232h.vcd");
$dumpvars(0, tb_cross_layer_ft2232h);
clk = 0;
ft_clk = 0;
pass_count = 0;
fail_count = 0;
test_num = 0;
// ============================================================
// EXERCISE A: Command Round-Trip
// Send commands with known opcode/addr/value, verify decoding.
// Dump results to cmd_results.txt for Python validation.
// ============================================================
$display("\n=== EXERCISE A: Command Round-Trip ===");
apply_reset;
cmd_file = $fopen("cmd_results.txt", "w");
$fwrite(cmd_file, "# opcode_sent addr_sent value_sent opcode_got addr_got value_got\n");
// Test all real opcodes from radar_system_top.v
// Format: opcode, addr=0x00, value
// Basic control
send_command_ft2232h(8'h01, 8'h00, 8'h00, 8'h02); // RADAR_MODE=2
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h01, 8'h00, 16'h0002, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h01 && cmd_value === 16'h0002,
"Cmd 0x01: RADAR_MODE=2");
send_command_ft2232h(8'h02, 8'h00, 8'h00, 8'h01); // TRIGGER_PULSE
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h02, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h02 && cmd_value === 16'h0001,
"Cmd 0x02: TRIGGER_PULSE");
send_command_ft2232h(8'h03, 8'h00, 8'h27, 8'h10); // DETECT_THRESHOLD=10000
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h03, 8'h00, 16'h2710, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h03 && cmd_value === 16'h2710,
"Cmd 0x03: DETECT_THRESHOLD=10000");
send_command_ft2232h(8'h04, 8'h00, 8'h00, 8'h07); // STREAM_CONTROL=7
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h04, 8'h00, 16'h0007, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h04 && cmd_value === 16'h0007,
"Cmd 0x04: STREAM_CONTROL=7");
// Chirp timing
send_command_ft2232h(8'h10, 8'h00, 8'h0B, 8'hB8); // LONG_CHIRP=3000
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h10, 8'h00, 16'h0BB8, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h10 && cmd_value === 16'h0BB8,
"Cmd 0x10: LONG_CHIRP=3000");
send_command_ft2232h(8'h11, 8'h00, 8'h35, 8'h84); // LONG_LISTEN=13700
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h11, 8'h00, 16'h3584, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h11 && cmd_value === 16'h3584,
"Cmd 0x11: LONG_LISTEN=13700");
send_command_ft2232h(8'h12, 8'h00, 8'h44, 8'h84); // GUARD=17540
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h12, 8'h00, 16'h4484, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h12 && cmd_value === 16'h4484,
"Cmd 0x12: GUARD=17540");
send_command_ft2232h(8'h13, 8'h00, 8'h00, 8'h32); // SHORT_CHIRP=50
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h13, 8'h00, 16'h0032, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h13 && cmd_value === 16'h0032,
"Cmd 0x13: SHORT_CHIRP=50");
send_command_ft2232h(8'h14, 8'h00, 8'h44, 8'h2A); // SHORT_LISTEN=17450
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h14, 8'h00, 16'h442A, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h14 && cmd_value === 16'h442A,
"Cmd 0x14: SHORT_LISTEN=17450");
send_command_ft2232h(8'h15, 8'h00, 8'h00, 8'h20); // CHIRPS_PER_ELEV=32
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h15, 8'h00, 16'h0020, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h15 && cmd_value === 16'h0020,
"Cmd 0x15: CHIRPS_PER_ELEV=32");
// Digital gain
send_command_ft2232h(8'h16, 8'h00, 8'h00, 8'h05); // GAIN_SHIFT=5
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h16, 8'h00, 16'h0005, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h16 && cmd_value === 16'h0005,
"Cmd 0x16: GAIN_SHIFT=5");
// Signal processing
send_command_ft2232h(8'h20, 8'h00, 8'h00, 8'h01); // RANGE_MODE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h20, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h20 && cmd_value === 16'h0001,
"Cmd 0x20: RANGE_MODE=1");
send_command_ft2232h(8'h21, 8'h00, 8'h00, 8'h03); // CFAR_GUARD=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h21, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h21 && cmd_value === 16'h0003,
"Cmd 0x21: CFAR_GUARD=3");
send_command_ft2232h(8'h22, 8'h00, 8'h00, 8'h0C); // CFAR_TRAIN=12
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h22, 8'h00, 16'h000C, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h22 && cmd_value === 16'h000C,
"Cmd 0x22: CFAR_TRAIN=12");
send_command_ft2232h(8'h23, 8'h00, 8'h00, 8'h30); // CFAR_ALPHA=0x30
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h23, 8'h00, 16'h0030, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h23 && cmd_value === 16'h0030,
"Cmd 0x23: CFAR_ALPHA=0x30");
send_command_ft2232h(8'h24, 8'h00, 8'h00, 8'h01); // CFAR_MODE=1 (GO)
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h24, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h24 && cmd_value === 16'h0001,
"Cmd 0x24: CFAR_MODE=1");
send_command_ft2232h(8'h25, 8'h00, 8'h00, 8'h01); // CFAR_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h25, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h25 && cmd_value === 16'h0001,
"Cmd 0x25: CFAR_ENABLE=1");
send_command_ft2232h(8'h26, 8'h00, 8'h00, 8'h01); // MTI_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h26, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h26 && cmd_value === 16'h0001,
"Cmd 0x26: MTI_ENABLE=1");
send_command_ft2232h(8'h27, 8'h00, 8'h00, 8'h03); // DC_NOTCH_WIDTH=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h27, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h27 && cmd_value === 16'h0003,
"Cmd 0x27: DC_NOTCH_WIDTH=3");
// AGC registers (0x28-0x2C)
send_command_ft2232h(8'h28, 8'h00, 8'h00, 8'h01); // AGC_ENABLE=1
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h28, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h28 && cmd_value === 16'h0001,
"Cmd 0x28: AGC_ENABLE=1");
send_command_ft2232h(8'h29, 8'h00, 8'h00, 8'hC8); // AGC_TARGET=200
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h29, 8'h00, 16'h00C8, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h29 && cmd_value === 16'h00C8,
"Cmd 0x29: AGC_TARGET=200");
send_command_ft2232h(8'h2A, 8'h00, 8'h00, 8'h02); // AGC_ATTACK=2
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2A, 8'h00, 16'h0002, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2A && cmd_value === 16'h0002,
"Cmd 0x2A: AGC_ATTACK=2");
send_command_ft2232h(8'h2B, 8'h00, 8'h00, 8'h03); // AGC_DECAY=3
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2B, 8'h00, 16'h0003, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2B && cmd_value === 16'h0003,
"Cmd 0x2B: AGC_DECAY=3");
send_command_ft2232h(8'h2C, 8'h00, 8'h00, 8'h06); // AGC_HOLDOFF=6
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h2C, 8'h00, 16'h0006, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h2C && cmd_value === 16'h0006,
"Cmd 0x2C: AGC_HOLDOFF=6");
// Self-test / status
send_command_ft2232h(8'h30, 8'h00, 8'h00, 8'h01); // SELF_TEST_TRIGGER
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h30, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h30 && cmd_value === 16'h0001,
"Cmd 0x30: SELF_TEST_TRIGGER");
send_command_ft2232h(8'h31, 8'h00, 8'h00, 8'h01); // SELF_TEST_STATUS
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h31, 8'h00, 16'h0001, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h31 && cmd_value === 16'h0001,
"Cmd 0x31: SELF_TEST_STATUS");
send_command_ft2232h(8'hFF, 8'h00, 8'h00, 8'h00); // STATUS_REQUEST
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'hFF, 8'h00, 16'h0000, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'hFF && cmd_value === 16'h0000,
"Cmd 0xFF: STATUS_REQUEST");
// Non-zero addr test
send_command_ft2232h(8'h01, 8'hAB, 8'hCD, 8'hEF); // addr=0xAB, value=0xCDEF
$fwrite(cmd_file, "%02x %02x %04x %02x %02x %04x\n",
8'h01, 8'hAB, 16'hCDEF, cmd_opcode, cmd_addr, cmd_value);
check(cmd_opcode === 8'h01 && cmd_addr === 8'hAB && cmd_value === 16'hCDEF,
"Cmd 0x01 with addr=0xAB, value=0xCDEF");
$fclose(cmd_file);
// ============================================================
// EXERCISE B: Data Packet Generation
// Inject known values, capture 11-byte output.
// ============================================================
$display("\n=== EXERCISE B: Data Packet Generation ===");
apply_reset;
ft_txe_n = 0; // TX FIFO ready
// Use distinctive values that make truncation/swap bugs obvious
// range_profile = {Q[15:0], I[15:0]} = {0xCAFE, 0xBEEF}
// doppler_real = 0x1234, doppler_imag = 0x5678
// cfar_detection = 1
// First inject doppler and cfar so pending flags are set
pulse_doppler(16'h1234, 16'h5678);
pulse_cfar(1'b1);
// Now inject range_valid which triggers the write FSM.
// CRITICAL: Must capture bytes IN PARALLEL with the trigger,
// because the write FSM starts sending bytes ~3-4 ft_clk cycles
// after the toggle CDC propagates. If we wait for CDC propagation
// first, capture_write_bytes misses the early bytes.
fork
assert_range_valid(32'hCAFE_BEEF);
capture_write_bytes(11);
join
check(capture_count === 11,
"Data packet: captured 11 bytes");
// Dump captured bytes to file
data_file = $fopen("data_packet.txt", "w");
$fwrite(data_file, "# byte_index hex_value\n");
for (i = 0; i < capture_count; i = i + 1) begin
$fwrite(data_file, "%0d %02x\n", i, captured_bytes[i]);
end
$fclose(data_file);
// Verify locally too
check(captured_bytes[0] === 8'hAA,
"Data pkt: byte 0 = 0xAA (header)");
check(captured_bytes[1] === 8'hCA,
"Data pkt: byte 1 = 0xCA (range MSB = Q high)");
check(captured_bytes[2] === 8'hFE,
"Data pkt: byte 2 = 0xFE (range Q low)");
check(captured_bytes[3] === 8'hBE,
"Data pkt: byte 3 = 0xBE (range I high)");
check(captured_bytes[4] === 8'hEF,
"Data pkt: byte 4 = 0xEF (range I low)");
check(captured_bytes[5] === 8'h12,
"Data pkt: byte 5 = 0x12 (doppler_real MSB)");
check(captured_bytes[6] === 8'h34,
"Data pkt: byte 6 = 0x34 (doppler_real LSB)");
check(captured_bytes[7] === 8'h56,
"Data pkt: byte 7 = 0x56 (doppler_imag MSB)");
check(captured_bytes[8] === 8'h78,
"Data pkt: byte 8 = 0x78 (doppler_imag LSB)");
// Byte 9 = {frame_start, 6'b0, cfar_detection}
// After reset sample_counter==0, so frame_start=1 → 0x81
check(captured_bytes[9] === 8'h81,
"Data pkt: byte 9 = 0x81 (frame_start=1, cfar_detection=1)");
check(captured_bytes[10] === 8'h55,
"Data pkt: byte 10 = 0x55 (footer)");
// ============================================================
// EXERCISE C: Status Packet Generation
// Set known status values, trigger readback, capture 26 bytes.
// Uses distinctive non-zero values to detect truncation/swap.
// ============================================================
$display("\n=== EXERCISE C: Status Packet Generation ===");
apply_reset;
ft_txe_n = 0;
// Set known distinctive status values
status_cfar_threshold = 16'hABCD;
status_stream_ctrl = 3'b101;
status_radar_mode = 2'b11; // Use 0b11 to test both bits
status_long_chirp = 16'h1234;
status_long_listen = 16'h5678;
status_guard = 16'h9ABC;
status_short_chirp = 16'hDEF0;
status_short_listen = 16'hFACE;
status_chirps_per_elev = 6'd42;
status_range_mode = 2'b10;
status_self_test_flags = 5'b10101;
status_self_test_detail = 8'hA5;
status_self_test_busy = 1'b1;
status_agc_current_gain = 4'd7;
status_agc_peak_magnitude = 8'd200;
status_agc_saturation_count = 8'd15;
status_agc_enable = 1'b1;
// Pulse status_request and capture bytes IN PARALLEL
// (same reason as Exercise B — write FSM starts before CDC wait ends)
fork
pulse_status_request;
capture_write_bytes(26);
join
check(capture_count === 26,
"Status packet: captured 26 bytes");
// Dump captured bytes to file
status_file = $fopen("status_packet.txt", "w");
$fwrite(status_file, "# byte_index hex_value\n");
for (i = 0; i < capture_count; i = i + 1) begin
$fwrite(status_file, "%0d %02x\n", i, captured_bytes[i]);
end
// Also dump the raw status_words for debugging
$fwrite(status_file, "# status_words (internal):\n");
for (i = 0; i < 6; i = i + 1) begin
$fwrite(status_file, "# word[%0d] = %08x\n", i, uut.status_words[i]);
end
$fclose(status_file);
// Verify header/footer locally
check(captured_bytes[0] === 8'hBB,
"Status pkt: byte 0 = 0xBB (status header)");
check(captured_bytes[25] === 8'h55,
"Status pkt: byte 25 = 0x55 (footer)");
// Verify status_words[1] = {long_chirp, long_listen} = {0x1234, 0x5678}
check(captured_bytes[5] === 8'h12 && captured_bytes[6] === 8'h34 &&
captured_bytes[7] === 8'h56 && captured_bytes[8] === 8'h78,
"Status pkt: word1 = {long_chirp=0x1234, long_listen=0x5678}");
// Verify status_words[2] = {guard, short_chirp} = {0x9ABC, 0xDEF0}
check(captured_bytes[9] === 8'h9A && captured_bytes[10] === 8'hBC &&
captured_bytes[11] === 8'hDE && captured_bytes[12] === 8'hF0,
"Status pkt: word2 = {guard=0x9ABC, short_chirp=0xDEF0}");
// ============================================================
// Summary
// ============================================================
$display("");
$display("========================================");
$display(" CROSS-LAYER FT2232H TB RESULTS");
$display(" PASSED: %0d / %0d", pass_count, test_num);
$display(" FAILED: %0d / %0d", fail_count, test_num);
if (fail_count == 0)
$display(" ** ALL TESTS PASSED **");
else
$display(" ** SOME TESTS FAILED **");
$display("========================================");
#100;
$finish;
end
endmodule
@@ -1,24 +1,22 @@
""" """
Cross-Layer Contract Tests Cross-Layer Contract Tests
========================== ==========================
Single pytest file orchestrating three tiers of verification: Single pytest file orchestrating two tiers of verification:
Tier 1 Static Contract Parsing: Tier 1 Static Contract Parsing:
Compares Python, Verilog, and C source code at parse-time to catch Compares Python, Verilog, and C source code at parse-time to catch
opcode mismatches, bit-width errors, packet constant drift, and opcode mismatches, bit-width errors, packet constant drift, and
layout bugs like the status_words[0] 37-bit truncation. layout bugs like the status_words[0] 37-bit truncation.
Tier 2 Verilog Cosimulation (iverilog):
Compiles and runs tb_cross_layer_ft2232h.v, then parses its output
files (cmd_results.txt, data_packet.txt, status_packet.txt) and
runs Python parsers on the captured bytes to verify round-trip
correctness.
Tier 3 C Stub Execution: Tier 3 C Stub Execution:
Compiles stm32_settings_stub.cpp, generates a binary settings Compiles stm32_settings_stub.cpp, generates a binary settings
packet from Python, runs the stub, and verifies all parsed field packet from Python, runs the stub, and verifies all parsed field
values match. values match.
(Tier 2 Verilog cosimulation was retired post-PR-G; the v1 TB no
longer matched production. Equivalent v2 coverage lives in the FPGA
regression's tb_usb_protocol_v2 and tb_system_opcodes.)
The goal is to find UNKNOWN bugs by testing each layer against The goal is to find UNKNOWN bugs by testing each layer against
independently-derived ground truth not just checking that two independently-derived ground truth not just checking that two
layers agree (because both could be wrong). layers agree (because both could be wrong).
@@ -53,31 +51,20 @@ sys.path.insert(0, str(cp.GUI_DIR))
# Helpers # Helpers
# =================================================================== # ===================================================================
IVERILOG = os.environ.get("IVERILOG", "iverilog")
VVP = os.environ.get("VVP", "vvp")
CXX = os.environ.get("CXX", "c++") CXX = os.environ.get("CXX", "c++")
# Check tool availability for conditional skipping # Check tool availability for conditional skipping
_has_iverilog = Path(IVERILOG).exists() if "/" in IVERILOG else bool(
subprocess.run(["which", IVERILOG], capture_output=True).returncode == 0
)
_has_cxx = subprocess.run( _has_cxx = subprocess.run(
[CXX, "--version"], capture_output=True [CXX, "--version"], capture_output=True
).returncode == 0 ).returncode == 0
# In CI, missing tools must be a hard failure — never silently skip. # In CI, missing tools must be a hard failure — never silently skip.
_in_ci = os.environ.get("GITHUB_ACTIONS") == "true" _in_ci = os.environ.get("GITHUB_ACTIONS") == "true"
if _in_ci: if _in_ci and not _has_cxx:
if not _has_iverilog: raise RuntimeError(
raise RuntimeError( "C++ compiler is required in CI but was not found. "
"iverilog is required in CI but was not found. " "Ensure build-essential is installed."
"Ensure 'apt-get install iverilog' ran and IVERILOG/VVP are on PATH." )
)
if not _has_cxx:
raise RuntimeError(
"C++ compiler is required in CI but was not found. "
"Ensure build-essential is installed."
)
def _strip_cxx_comments_and_strings(src: str) -> str: def _strip_cxx_comments_and_strings(src: str) -> str:
@@ -182,6 +169,9 @@ GROUND_TRUTH_OPCODES = {
0x14: ("host_short_listen_cycles", 16), 0x14: ("host_short_listen_cycles", 16),
0x15: ("host_chirps_per_elev", 6), 0x15: ("host_chirps_per_elev", 6),
0x16: ("host_gain_shift", 4), 0x16: ("host_gain_shift", 4),
0x17: ("host_medium_chirp_cycles", 16), # PR-G G2.1
0x18: ("host_medium_listen_cycles", 16), # PR-G G2.1
0x19: ("host_subframe_enable", 3), # PR-U M-8
0x20: ("host_range_mode", 2), 0x20: ("host_range_mode", 2),
0x21: ("host_cfar_guard", 4), 0x21: ("host_cfar_guard", 4),
0x22: ("host_cfar_train", 5), 0x22: ("host_cfar_train", 5),
@@ -195,8 +185,11 @@ GROUND_TRUTH_OPCODES = {
0x2A: ("host_agc_attack", 4), 0x2A: ("host_agc_attack", 4),
0x2B: ("host_agc_decay", 4), 0x2B: ("host_agc_decay", 4),
0x2C: ("host_agc_holdoff", 4), 0x2C: ("host_agc_holdoff", 4),
0x2D: ("host_cfar_alpha_soft", 8), # PR-G G1
0x30: ("host_self_test_trigger", 1), # pulse 0x30: ("host_self_test_trigger", 1), # pulse
0x31: ("host_status_request", 1), # pulse 0x31: ("host_status_request", 1), # pulse
0x32: ("host_adc_pwdn", 1), # PR-R M-3
0x33: ("host_adc_format", 2), # PR-R M-4
0xFF: ("host_status_request", 1), # alias, pulse 0xFF: ("host_status_request", 1), # alias, pulse
} }
@@ -207,14 +200,22 @@ GROUND_TRUTH_RESET_DEFAULTS = {
"host_long_chirp_cycles": 3000, "host_long_chirp_cycles": 3000,
"host_long_listen_cycles": 13700, "host_long_listen_cycles": 13700,
"host_guard_cycles": 17540, "host_guard_cycles": 17540,
"host_short_chirp_cycles": 50, # PR-E V2: 1 us chirp / SHORT PRI 175 us (was 0.5 us legacy macros).
"host_short_listen_cycles": 17450, "host_short_chirp_cycles": 100,
"host_chirps_per_elev": 32, "host_short_listen_cycles": 17400,
# PR-G G2.1 + PR-Q stagger: MEDIUM PRI 161 us (5 us chirp + 156 us listen).
"host_medium_chirp_cycles": 500,
"host_medium_listen_cycles": 15600,
# PR-U M-8: 3'b111 (SHORT|MEDIUM|LONG all on by default).
"host_subframe_enable": 7,
# m-6 (PR-S): bumped from 32 in PR-F.
"host_chirps_per_elev": 48,
"host_gain_shift": 0, "host_gain_shift": 0,
"host_range_mode": 0, "host_range_mode": 0,
"host_cfar_guard": 2, "host_cfar_guard": 2,
"host_cfar_train": 8, "host_cfar_train": 8,
"host_cfar_alpha": 0x30, "host_cfar_alpha": 0x30,
"host_cfar_alpha_soft": 0x18, # PR-F — 1.5 in Q4.4
"host_cfar_mode": 0, "host_cfar_mode": 0,
"host_cfar_enable": 0, "host_cfar_enable": 0,
"host_mti_enable": 0, "host_mti_enable": 0,
@@ -224,11 +225,16 @@ GROUND_TRUTH_RESET_DEFAULTS = {
"host_agc_attack": 1, "host_agc_attack": 1,
"host_agc_decay": 1, "host_agc_decay": 1,
"host_agc_holdoff": 4, "host_agc_holdoff": 4,
"host_adc_pwdn": 0, # PR-R M-3 — 1'b0 (ADC powered)
"host_adc_format": 0, # PR-R M-4 — 2'b00 (offset binary)
} }
GROUND_TRUTH_PACKET_CONSTANTS = { GROUND_TRUTH_PACKET_CONSTANTS = {
"data": {"header": 0xAA, "footer": 0x55, "size": 11}, # Data packet retired as a fixed-size construct in PR-G (variable-length
"status": {"header": 0xBB, "footer": 0x55, "size": 26}, # bulk frame). Only the status packet still has a single canonical size,
# so the cross-layer constants check now scopes to status only. The data
# layer is exercised via the 9-byte fixed header in TestTier1DataPacketLayout.
"status": {"header": 0xBB, "footer": 0x55, "size": 30}, # PR-G — was 26 in v1
} }
@@ -425,7 +431,10 @@ class TestTier1PacketConstants:
"""Python and Verilog packet constants must match each other.""" """Python and Verilog packet constants must match each other."""
py = cp.parse_python_packet_constants() py = cp.parse_python_packet_constants()
v = cp.parse_verilog_packet_constants() v = cp.parse_verilog_packet_constants()
for ptype in ("data", "status"): # Iterate over whatever the parsers expose. Post-PR-G that is just
# the status packet; if a future protocol re-introduces a fixed-size
# data packet, both parsers will surface it here automatically.
for ptype in py.keys() & v.keys():
assert py[ptype].header == v[ptype].header assert py[ptype].header == v[ptype].header
assert py[ptype].footer == v[ptype].footer assert py[ptype].footer == v[ptype].footer
assert py[ptype].size == v[ptype].size assert py[ptype].size == v[ptype].size
@@ -1045,23 +1054,32 @@ class TestTier1DataPacketLayout:
"""Verify data packet byte layout matches between Python and Verilog.""" """Verify data packet byte layout matches between Python and Verilog."""
def test_verilog_data_mux_field_positions(self): def test_verilog_data_mux_field_positions(self):
"""Verilog data_pkt_byte mux must have correct byte positions.""" """
v2 frame header (PR-G): bytes 0-8 are a fixed prefix written by the
WR_FRAME_HEADER state. Bytes 0-2 are constants/flags (HEADER,
protocol version, flags byte) and the parser skips them. Bytes 3-8
carry three 16-bit fields the host needs before it can size the
variable sections that follow:
bytes 3-4 frame_number_snapshot
bytes 5-6 NUM_RANGE_BINS (= RP_NUM_RANGE_BINS)
bytes 7-8 NUM_DOPPLER_BINS (= RP_NUM_DOPPLER_BINS)
"""
v_fields = cp.parse_verilog_data_mux() v_fields = cp.parse_verilog_data_mux()
# Expected: range_profile at bytes 1-4 (32-bit), doppler_real 5-6,
# doppler_imag 7-8, cfar 9
field_map = {f.name: f for f in v_fields} field_map = {f.name: f for f in v_fields}
assert "range_profile" in field_map assert "frame_number_snapshot" in field_map, (
rp = field_map["range_profile"] f"v2 header byte 3-4 must carry frame_number_snapshot; got {list(field_map)}"
assert rp.byte_start == 1 and rp.byte_end == 4 and rp.width_bits == 32 )
fn = field_map["frame_number_snapshot"]
assert fn.byte_start == 3 and fn.byte_end == 4 and fn.width_bits == 16
assert "doppler_real" in field_map assert "NUM_RANGE_BINS" in field_map
dr = field_map["doppler_real"] nr = field_map["NUM_RANGE_BINS"]
assert dr.byte_start == 5 and dr.byte_end == 6 and dr.width_bits == 16 assert nr.byte_start == 5 and nr.byte_end == 6 and nr.width_bits == 16
assert "doppler_imag" in field_map assert "NUM_DOPPLER_BINS" in field_map
di = field_map["doppler_imag"] nd = field_map["NUM_DOPPLER_BINS"]
assert di.byte_start == 7 and di.byte_end == 8 and di.width_bits == 16 assert nd.byte_start == 7 and nd.byte_end == 8 and nd.width_bits == 16
def test_python_data_packet_byte_positions(self): def test_python_data_packet_byte_positions(self):
"""Python parse_data_packet byte offsets must be correct.""" """Python parse_data_packet byte offsets must be correct."""
@@ -1352,187 +1370,17 @@ class TestTier2Adar1000VmTableGroundTruth:
) )
# =================================================================== # Tier 2 Verilog cosimulation (was tb_cross_layer_ft2232h.v) was retired
# TIER 2: Verilog Cosimulation # after PR-G v2: the v1 11-byte fixed data packet, 26-byte status packet,
# =================================================================== # and `cfar_detection` 1-bit input it exercised no longer exist on the
# production module. Equivalent and stronger v2 coverage now lives in the
@pytest.mark.skipif(not _has_iverilog, reason="iverilog not available") # FPGA regression: `tb_usb_protocol_v2` (PR-G frame header v2, 30-byte
class TestTier2VerilogCosim: # status with soft tier, FSM length consistency, MEDIUM ladder opcodes
"""Compile and run the FT2232H TB, validate output against Python parsers.""" # 0x17/0x18 round-trip, opcode 0x2D byte-order) and `tb_system_opcodes`
# (every host opcode dispatched through the FT2232H 4-cycle read FSM at
@pytest.fixture(scope="class") # the radar_system_top integration level). Per-byte v2 header layout is
def tb_results(self, tmp_path_factory): # checked statically here by TestTier1DataPacketLayout against the actual
"""Compile and run TB once, return output file contents.""" # `case (wr_byte_idx[3:0])` in usb_data_interface_ft2232h.v.
workdir = tmp_path_factory.mktemp("verilog_cosim")
tb_path = THIS_DIR / "tb_cross_layer_ft2232h.v"
rtl_path = cp.FPGA_DIR / "usb_data_interface_ft2232h.v"
out_bin = workdir / "tb_cross_layer_ft2232h"
# Compile
result = subprocess.run(
[IVERILOG, "-o", str(out_bin), "-I", str(cp.FPGA_DIR),
str(tb_path), str(rtl_path)],
capture_output=True, text=True, timeout=30,
)
assert result.returncode == 0, f"iverilog compile failed:\n{result.stderr}"
# Run
result = subprocess.run(
[VVP, str(out_bin)],
capture_output=True, text=True, timeout=60,
cwd=str(workdir),
)
assert result.returncode == 0, f"vvp failed:\n{result.stderr}"
# Parse output
return {
"stdout": result.stdout,
"cmd_results": (workdir / "cmd_results.txt").read_text(),
"data_packet": (workdir / "data_packet.txt").read_text(),
"status_packet": (workdir / "status_packet.txt").read_text(),
}
def test_all_tb_tests_pass(self, tb_results):
"""All Verilog TB internal checks must pass."""
stdout = tb_results["stdout"]
assert "ALL TESTS PASSED" in stdout, f"TB had failures:\n{stdout}"
def test_command_round_trip(self, tb_results):
"""Verify every command decoded correctly by matching sent vs received."""
rows = _parse_hex_results(tb_results["cmd_results"])
assert len(rows) >= 20, f"Expected >= 20 command results, got {len(rows)}"
for row in rows:
assert len(row) == 6, f"Bad row format: {row}"
sent_op, sent_addr, sent_val = row[0], row[1], row[2]
got_op, got_addr, got_val = row[3], row[4], row[5]
assert sent_op == got_op, (
f"Opcode mismatch: sent 0x{sent_op} got 0x{got_op}"
)
assert sent_addr == got_addr, (
f"Addr mismatch: sent 0x{sent_addr} got 0x{got_addr}"
)
assert sent_val == got_val, (
f"Value mismatch: sent 0x{sent_val} got 0x{got_val}"
)
def test_data_packet_python_round_trip(self, tb_results):
"""
Take the 11 bytes captured by the Verilog TB, run Python's
parse_data_packet() on them, verify the parsed values match
what was injected into the TB.
"""
from radar_protocol import RadarProtocol
rows = _parse_hex_results(tb_results["data_packet"])
assert len(rows) == 11, f"Expected 11 data packet bytes, got {len(rows)}"
# Reconstruct raw bytes
raw = bytes(int(row[1], 16) for row in rows)
assert len(raw) == 11
parsed = RadarProtocol.parse_data_packet(raw)
assert parsed is not None, "parse_data_packet returned None"
# The TB injected: range_profile = 0xCAFE_BEEF = {Q=0xCAFE, I=0xBEEF}
# doppler_real = 0x1234, doppler_imag = 0x5678
# cfar_detection = 1
#
# range_q = 0xCAFE → signed = 0xCAFE - 0x10000 = -13570
# range_i = 0xBEEF → signed = 0xBEEF - 0x10000 = -16657
# doppler_i = 0x1234 → signed = 4660
# doppler_q = 0x5678 → signed = 22136
assert parsed["range_q"] == (0xCAFE - 0x10000), (
f"range_q: {parsed['range_q']} != {0xCAFE - 0x10000}"
)
assert parsed["range_i"] == (0xBEEF - 0x10000), (
f"range_i: {parsed['range_i']} != {0xBEEF - 0x10000}"
)
assert parsed["doppler_i"] == 0x1234, (
f"doppler_i: {parsed['doppler_i']} != {0x1234}"
)
assert parsed["doppler_q"] == 0x5678, (
f"doppler_q: {parsed['doppler_q']} != {0x5678}"
)
assert parsed["detection"] == 1, (
f"detection: {parsed['detection']} != 1"
)
def test_status_packet_python_round_trip(self, tb_results):
"""
Take the 26 bytes captured by the Verilog TB, run Python's
parse_status_packet() on them, verify against injected values.
"""
from radar_protocol import RadarProtocol
lines = tb_results["status_packet"].strip().splitlines()
# Filter out comments and status_words debug lines
rows = []
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
rows.append(line.split())
assert len(rows) == 26, f"Expected 26 status bytes, got {len(rows)}"
raw = bytes(int(row[1], 16) for row in rows)
assert len(raw) == 26
sr = RadarProtocol.parse_status_packet(raw)
assert sr is not None, "parse_status_packet returned None"
# Injected values (from TB):
# status_cfar_threshold = 0xABCD
# status_stream_ctrl = 3'b101 = 5
# status_radar_mode = 2'b11 = 3
# status_long_chirp = 0x1234
# status_long_listen = 0x5678
# status_guard = 0x9ABC
# status_short_chirp = 0xDEF0
# status_short_listen = 0xFACE
# status_chirps_per_elev = 42
# status_range_mode = 2'b10 = 2
# status_self_test_flags = 5'b10101 = 21
# status_self_test_detail = 0xA5
# status_self_test_busy = 1
# status_agc_current_gain = 7
# status_agc_peak_magnitude = 200
# status_agc_saturation_count = 15
# status_agc_enable = 1
# Words 1-5 should be correct (no truncation bug)
assert sr.cfar_threshold == 0xABCD, f"cfar_threshold: 0x{sr.cfar_threshold:04X}"
assert sr.long_chirp == 0x1234, f"long_chirp: 0x{sr.long_chirp:04X}"
assert sr.long_listen == 0x5678, f"long_listen: 0x{sr.long_listen:04X}"
assert sr.guard == 0x9ABC, f"guard: 0x{sr.guard:04X}"
assert sr.short_chirp == 0xDEF0, f"short_chirp: 0x{sr.short_chirp:04X}"
assert sr.short_listen == 0xFACE, f"short_listen: 0x{sr.short_listen:04X}"
assert sr.chirps_per_elev == 42, f"chirps_per_elev: {sr.chirps_per_elev}"
assert sr.range_mode == 2, f"range_mode: {sr.range_mode}"
assert sr.self_test_flags == 21, f"self_test_flags: {sr.self_test_flags}"
assert sr.self_test_detail == 0xA5, f"self_test_detail: 0x{sr.self_test_detail:02X}"
assert sr.self_test_busy == 1, f"self_test_busy: {sr.self_test_busy}"
# AGC fields (word 4)
assert sr.agc_current_gain == 7, f"agc_current_gain: {sr.agc_current_gain}"
assert sr.agc_peak_magnitude == 200, f"agc_peak_magnitude: {sr.agc_peak_magnitude}"
assert sr.agc_saturation_count == 15, f"agc_saturation_count: {sr.agc_saturation_count}"
assert sr.agc_enable == 1, f"agc_enable: {sr.agc_enable}"
# Word 0: stream_ctrl should be 5 (3'b101)
assert sr.stream_ctrl == 5, (
f"stream_ctrl: {sr.stream_ctrl} != 5. "
f"Check status_words[0] bit positions."
)
# radar_mode should be 3 (2'b11)
assert sr.radar_mode == 3, (
f"radar_mode={sr.radar_mode} != 3. "
f"Check status_words[0] bit positions."
)
# =================================================================== # ===================================================================