chore(repo): cosim_dir replay revival + ruff lint cleanup

cosim_dir revival:
- gen_realdata_hex.py: also emit decimated_range_{i,q}.npy (48x512)
  and doppler_map_{i,q}.npy (512x48) at production dimensions; the
  same Python pipeline that produces the RTL .hex stimuli now writes
  the .npy intermediates v7.replay COSIM_DIR loads. Replaces the
  workflow lost when golden_reference.py was deleted in e8b495c
- test_v7.py: update test_get_frame_cosim shape from pre-PR-O.6
  (64,32) to (NUM_RANGE_BINS, NUM_DOPPLER_BINS)
- check in 4 .npy reference files (~400 KB, deterministic SCENE_SEED=42)

Ruff lint cleanup (was 66 errors; now 0):
- pyproject.toml: ignore T20 in tb/cosim/**.py (CLI tools)
- compare_independent.py: drop redundant int() casts (RUF046),
  swap try/except scipy import for importlib.util.find_spec,
  remove dead duplicate np import, ASCII-ize comment unicode,
  wrap E501 format strings
- fpga_reference.py: drop unused fs arg from nco_reference,
  collapse if/else to ternary, mark _out_im unused
- v7/processing.py: ASCII-ize x in docstring, collapse if-branches
- {dashboard,software_fpga,workers,radar_protocol}.py: wrap E501
- test_v7.py: ASCII-ize comment unicode, _alias renames where unused

Result: test_v7 100/100 (0 skips on radar_venv, was 9 graceful
skips); 5 cosim_dir orphan tests now active and passing.
This commit is contained in:
Jason
2026-05-02 15:45:56 +05:45
parent 5a7e8b8689
commit 3d2ffc3f2c
14 changed files with 107 additions and 62 deletions
@@ -38,14 +38,14 @@ from pathlib import Path
# Required: numpy + scipy. If either is missing, exit code 2 with a [SKIP] # Required: numpy + scipy. If either is missing, exit code 2 with a [SKIP]
# marker so the regression can distinguish missing-deps from real failures # marker so the regression can distinguish missing-deps from real failures
# (see run_regression.sh "Independent Reference Drift (T-6)" block). # (see run_regression.sh "Independent Reference Drift (T-6)" block).
import importlib.util
_MISSING = [] _MISSING = []
try: try:
import numpy as np # noqa: F401 import numpy as np
except ImportError: except ImportError:
_MISSING.append("numpy") _MISSING.append("numpy")
try: if importlib.util.find_spec("scipy.signal") is None:
import scipy.signal.windows # noqa: F401
except ImportError:
_MISSING.append("scipy") _MISSING.append("scipy")
if _MISSING: if _MISSING:
print( print(
@@ -56,8 +56,6 @@ if _MISSING:
) )
sys.exit(2) sys.exit(2)
import numpy as np # re-import to get module binding now that we know it's there
# Make local imports work when invoked from anywhere # Make local imports work when invoked from anywhere
THIS_DIR = Path(__file__).resolve().parent THIS_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(THIS_DIR)) sys.path.insert(0, str(THIS_DIR))
@@ -80,9 +78,9 @@ from fpga_model import ( # noqa: E402
TOL_NCO_LUT_LSB = 1 # NCO_SINE_LUT: tightest possible TOL_NCO_LUT_LSB = 1 # NCO_SINE_LUT: tightest possible
TOL_TWIDDLE_LSB = 1 # twiddle ROMs: same — quarter-wave Q15 cosine TOL_TWIDDLE_LSB = 1 # twiddle ROMs: same — quarter-wave Q15 cosine
TOL_WINDOW_LSB = 4 # 4 LSB 1.2e-4 rounding budget on Q15 round TOL_WINDOW_LSB = 4 # 4 LSB ~= 1.2e-4 rounding budget on Q15 round
TOL_NCO_MAG_REL = 0.04 # quarter-wave LUT artifact at quadrant edges TOL_NCO_MAG_REL = 0.04 # quarter-wave LUT artifact at quadrant edges
TOL_FFT_ROUNDTRIP_LSB = 60 # 11 stages × Q15 noise on 2048-pt; empirical TOL_FFT_ROUNDTRIP_LSB = 60 # 11 stages * Q15 noise on 2048-pt; empirical
# ============================================================================= # =============================================================================
@@ -169,7 +167,10 @@ def check_twiddle_rom(result: CheckResult, n: int, mem_filename: str):
bad.append((k, cos_rom[k], ideal, dev)) bad.append((k, cos_rom[k], ideal, dev))
result.check( result.check(
max_dev <= TOL_TWIDDLE_LSB, max_dev <= TOL_TWIDDLE_LSB,
f"{mem_filename}: all {expected_entries} entries match cos(2pi*k/{n}) Q15 (tol {TOL_TWIDDLE_LSB} LSB)", (
f"{mem_filename}: all {expected_entries} entries match "
f"cos(2pi*k/{n}) Q15 (tol {TOL_TWIDDLE_LSB} LSB)"
),
f"max |ROM - ideal| = {max_dev} LSB" + ( f"max |ROM - ideal| = {max_dev} LSB" + (
f"; {len(bad)} bad, e.g. k={bad[0][0]}: ROM={bad[0][1]}, ideal={bad[0][2]}" f"; {len(bad)} bad, e.g. k={bad[0][0]}: ROM={bad[0][1]}, ideal={bad[0][2]}"
if bad else "" if bad else ""
@@ -186,7 +187,10 @@ def check_doppler_window_lut(result: CheckResult):
worst_idx = int(np.argmax(diff)) worst_idx = int(np.argmax(diff))
result.check( result.check(
max_dev <= TOL_WINDOW_LSB, max_dev <= TOL_WINDOW_LSB,
f"DOPPLER_WINDOW_COEFF: all 16 entries match Dolph-Chebyshev 60 dB Q15 (tol {TOL_WINDOW_LSB} LSB)", (
f"DOPPLER_WINDOW_COEFF: all 16 entries match "
f"Dolph-Chebyshev 60 dB Q15 (tol {TOL_WINDOW_LSB} LSB)"
),
f"max |LUT - ideal| = {max_dev} LSB at n={worst_idx} " f"max |LUT - ideal| = {max_dev} LSB at n={worst_idx} "
f"(LUT={int(win_lut[worst_idx])}, ideal={int(win_ref[worst_idx])})" f"(LUT={int(win_lut[worst_idx])}, ideal={int(win_ref[worst_idx])})"
) )
@@ -233,7 +237,7 @@ def check_nco_invariants(result: CheckResult):
z = cos_arr + 1j * sin_arr z = cos_arr + 1j * sin_arr
Z = np.fft.fft(z) Z = np.fft.fft(z)
peak_bin = int(np.argmax(np.abs(Z))) peak_bin = int(np.argmax(np.abs(Z)))
expected_bin = int(round(ftw / (1 << 32) * n_capture)) expected_bin = round(ftw / (1 << 32) * n_capture)
result.check( result.check(
abs(peak_bin - expected_bin) <= 1, abs(peak_bin - expected_bin) <= 1,
f"NCO dominant frequency at FTW = {ftw:08X} (expected bin {expected_bin})", f"NCO dominant frequency at FTW = {ftw:08X} (expected bin {expected_bin})",
@@ -264,8 +268,8 @@ def check_fft_invariants(result: CheckResult):
# peak = amp*N stays below Q15 saturation (32767). # peak = amp*N stays below Q15 saturation (32767).
bin_k = 137 bin_k = 137
amp = 15 amp = 15
in_re = [int(round(amp * math.cos(2 * math.pi * bin_k * i / n))) for i in range(n)] in_re = [round(amp * math.cos(2 * math.pi * bin_k * i / n)) for i in range(n)]
in_im = [int(round(amp * math.sin(2 * math.pi * bin_k * i / n))) for i in range(n)] in_im = [round(amp * math.sin(2 * math.pi * bin_k * i / n)) for i in range(n)]
twin_re, twin_im = fft.compute(in_re, in_im, inverse=False) twin_re, twin_im = fft.compute(in_re, in_im, inverse=False)
ref_re, ref_im = ref.fft_reference(in_re, in_im, n=n) ref_re, ref_im = ref.fft_reference(in_re, in_im, n=n)
twin_mag2 = np.array(twin_re) ** 2 + np.array(twin_im) ** 2 twin_mag2 = np.array(twin_re) ** 2 + np.array(twin_im) ** 2
@@ -280,7 +284,7 @@ def check_fft_invariants(result: CheckResult):
# Roundtrip — small amplitude (peak = amp*N/2 ≤ 32767 → amp ≤ 32) so the # Roundtrip — small amplitude (peak = amp*N/2 ≤ 32767 → amp ≤ 32) so the
# forward FFT does not saturate, then IFFT should recover input within # forward FFT does not saturate, then IFFT should recover input within
# 11×Q15 butterfly noise. # 11*Q15 butterfly noise.
rt_amp = 30 rt_amp = 30
in_re = [int(rt_amp * math.sin(2 * math.pi * 73 * i / n)) for i in range(n)] in_re = [int(rt_amp * math.sin(2 * math.pi * 73 * i / n)) for i in range(n)]
in_im = [0] * n in_im = [0] * n
@@ -289,7 +293,10 @@ def check_fft_invariants(result: CheckResult):
rt_max_err = max(abs(rt_re[i] - in_re[i]) for i in range(n)) rt_max_err = max(abs(rt_re[i] - in_re[i]) for i in range(n))
result.check( result.check(
rt_max_err <= TOL_FFT_ROUNDTRIP_LSB, rt_max_err <= TOL_FFT_ROUNDTRIP_LSB,
f"FFT-2048(roundtrip, amp={rt_amp}): FFT->IFFT recovers input within {TOL_FFT_ROUNDTRIP_LSB} LSB", (
f"FFT-2048(roundtrip, amp={rt_amp}): FFT->IFFT recovers input "
f"within {TOL_FFT_ROUNDTRIP_LSB} LSB"
),
f"max |rt - in| = {rt_max_err}" f"max |rt - in| = {rt_max_err}"
) )
@@ -309,8 +316,8 @@ def check_mf_invariants(result: CheckResult):
ref_im_in = [0] * n ref_im_in = [0] * n
pulse_len = 256 pulse_len = 256
for i in range(pulse_len): for i in range(pulse_len):
ref_re_in[i] = int(round(amp * math.cos(2 * math.pi * bin_k * i / pulse_len))) ref_re_in[i] = round(amp * math.cos(2 * math.pi * bin_k * i / pulse_len))
ref_im_in[i] = int(round(amp * math.sin(2 * math.pi * bin_k * i / pulse_len))) ref_im_in[i] = round(amp * math.sin(2 * math.pi * bin_k * i / pulse_len))
sig_re[i + delay] = ref_re_in[i] sig_re[i + delay] = ref_re_in[i]
sig_im[i + delay] = ref_im_in[i] sig_im[i + delay] = ref_im_in[i]
@@ -328,7 +335,7 @@ def check_mf_invariants(result: CheckResult):
f"twin={twin_peak}, ref={ref_peak}" f"twin={twin_peak}, ref={ref_peak}"
) )
# Sidelobe behaviour: peak should be N×stronger than median. # Sidelobe behaviour: peak should be N*stronger than median.
twin_peak_val = float(twin_mag[delay]) twin_peak_val = float(twin_mag[delay])
twin_median = float(np.median(twin_mag)) twin_median = float(np.median(twin_mag))
pk_ratio = twin_peak_val / max(twin_median, 1.0) pk_ratio = twin_peak_val / max(twin_median, 1.0)
@@ -356,8 +363,8 @@ def check_doppler_invariants(result: CheckResult):
for c in range(chirps_per_subframe): for c in range(chirps_per_subframe):
chirp_idx = sf * chirps_per_subframe + c chirp_idx = sf * chirps_per_subframe + c
phase = 2 * math.pi * dop_bin * c / chirps_per_subframe phase = 2 * math.pi * dop_bin * c / chirps_per_subframe
chirp_i[chirp_idx, target_rbin] = int(round(amp * math.cos(phase))) chirp_i[chirp_idx, target_rbin] = round(amp * math.cos(phase))
chirp_q[chirp_idx, target_rbin] = int(round(amp * math.sin(phase))) chirp_q[chirp_idx, target_rbin] = round(amp * math.sin(phase))
dop = DopplerProcessor(num_subframes=num_subframes, dop = DopplerProcessor(num_subframes=num_subframes,
chirps_per_frame=chirps_per_frame) chirps_per_frame=chirps_per_frame)
@@ -44,7 +44,7 @@ import numpy as np
# NCO reference — ideal complex sinusoid # NCO reference — ideal complex sinusoid
# ============================================================================= # =============================================================================
def nco_reference(num_samples: int, ftw: int, fs: float = 400e6, def nco_reference(num_samples: int, ftw: int,
phase_offset_deg: float = 0.0): phase_offset_deg: float = 0.0):
"""Ideal floating-point NCO output, scaled to match Q15 fpga_model. """Ideal floating-point NCO output, scaled to match Q15 fpga_model.
@@ -101,10 +101,7 @@ def fft_reference(in_re, in_im, n: int = 2048, inverse: bool = False):
if len(re) != n or len(im) != n: if len(re) != n or len(im) != n:
raise ValueError(f"input length {len(re)} != N={n}") raise ValueError(f"input length {len(re)} != N={n}")
x = re + 1j * im x = re + 1j * im
if inverse: y = np.fft.ifft(x) if inverse else np.fft.fft(x) / n
y = np.fft.ifft(x)
else:
y = np.fft.fft(x) / n
return y.real.copy(), y.imag.copy() return y.real.copy(), y.imag.copy()
@@ -221,7 +218,7 @@ def doppler_reference(chirp_data_i, chirp_data_q,
def _self_test(): def _self_test():
"""Quick sanity checks.""" """Quick sanity checks."""
# NCO: at FTW = 0x4CCCCCCD, frequency = 0.3 * fs = 120 MHz at 400 MSPS. # NCO: at FTW = 0x4CCCCCCD, frequency = 0.3 * fs = 120 MHz at 400 MSPS.
cos_q15, sin_q15 = nco_reference(8, 0x4CCCCCCD, fs=400e6) cos_q15, sin_q15 = nco_reference(8, 0x4CCCCCCD)
# First sample should be cos(0)=1, sin(0)=0 in Q15 # First sample should be cos(0)=1, sin(0)=0 in Q15
assert abs(cos_q15[0] - 32767.0) < 1.0, f"NCO[0].cos = {cos_q15[0]}" assert abs(cos_q15[0] - 32767.0) < 1.0, f"NCO[0].cos = {cos_q15[0]}"
assert abs(sin_q15[0]) < 1.0, f"NCO[0].sin = {sin_q15[0]}" assert abs(sin_q15[0]) < 1.0, f"NCO[0].sin = {sin_q15[0]}"
@@ -229,7 +226,7 @@ def _self_test():
# FFT: impulse -> all bins = amplitude/N (scaled-mode schedule) # FFT: impulse -> all bins = amplitude/N (scaled-mode schedule)
in_re = [1000] + [0] * 15 in_re = [1000] + [0] * 15
in_im = [0] * 16 in_im = [0] * 16
out_re, out_im = fft_reference(in_re, in_im, n=16) out_re, _out_im = fft_reference(in_re, in_im, n=16)
for k in range(16): for k in range(16):
# AUDIT-C10/C-8: FWD FFT now applies /N (=/16), so each bin = 1000/16 # AUDIT-C10/C-8: FWD FFT now applies /N (=/16), so each bin = 1000/16
assert abs(out_re[k] - 1000.0 / 16.0) < 1e-9, \ assert abs(out_re[k] - 1000.0 / 16.0) < 1e-9, \
@@ -8,12 +8,18 @@ Replaces the legacy ADI CN0566 hardware captures (32-chirp / 2-subframe /
(48-chirp / 3-subframe / 48-bin Doppler) so the regression no longer (48-chirp / 3-subframe / 48-bin Doppler) so the regression no longer
depends on out-of-tree .npy files. depends on out-of-tree .npy files.
Outputs (six files, all under tb/cosim/real_data/hex/): Outputs (all under tb/cosim/real_data/hex/):
doppler_input_realdata.hex 48 chirps x 512 range bins, packed {Q,I}
doppler_ref_i.hex / _q.hex 512 range bins x 48 Doppler bins (signed 16-bit) RTL stimuli + goldens (.hex):
fullchain_range_input.hex 48 chirps x 2048 range bins, packed {Q,I} doppler_input_realdata.hex 48 chirps x 512 range bins, packed {Q,I}
fullchain_doppler_ref_i.hex doppler_ref_i.hex / _q.hex 512 range bins x 48 Doppler bins (signed 16-bit)
fullchain_doppler_ref_q.hex same shape as doppler_ref_* fullchain_range_input.hex 48 chirps x 2048 range bins, packed {Q,I}
fullchain_doppler_ref_i.hex
fullchain_doppler_ref_q.hex same shape as doppler_ref_*
GUI replay intermediates (.npy, COSIM_DIR ReplayFormat in v7.replay):
decimated_range_i.npy / _q.npy (48, 512) — post range_bin_decimator
doppler_map_i.npy / _q.npy (512, 48) — post doppler_processor
Dimensions match production (radar_params.vh: RP_FFT_SIZE=2048, Dimensions match production (radar_params.vh: RP_FFT_SIZE=2048,
RP_DECIMATION_FACTOR=4, RP_NUM_RANGE_BINS=512, RP_NUM_DOPPLER_BINS=48). RP_DECIMATION_FACTOR=4, RP_NUM_RANGE_BINS=512, RP_NUM_DOPPLER_BINS=48).
@@ -29,6 +35,8 @@ Usage: python3 gen_realdata_hex.py
import os import os
import sys import sys
import numpy as np
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fpga_model import DopplerProcessor, RangeBinDecimator from fpga_model import DopplerProcessor, RangeBinDecimator
@@ -106,10 +114,11 @@ def gen_doppler_realdata():
seed=SCENE_SEED, seed=SCENE_SEED,
) )
stim = [] stim = [
for c in range(CHIRPS_PER_FRAME): (frame_i[c][rb], frame_q[c][rb])
for rb in range(DOPPLER_RANGE_BINS): for c in range(CHIRPS_PER_FRAME)
stim.append((frame_i[c][rb], frame_q[c][rb])) for rb in range(DOPPLER_RANGE_BINS)
]
write_hex_32(os.path.join(OUT_DIR, "doppler_input_realdata.hex"), stim) write_hex_32(os.path.join(OUT_DIR, "doppler_input_realdata.hex"), stim)
dp = make_doppler_processor() dp = make_doppler_processor()
@@ -118,7 +127,8 @@ def gen_doppler_realdata():
write_hex_16(os.path.join(OUT_DIR, "doppler_ref_i.hex"), flat_i) write_hex_16(os.path.join(OUT_DIR, "doppler_ref_i.hex"), flat_i)
write_hex_16(os.path.join(OUT_DIR, "doppler_ref_q.hex"), flat_q) write_hex_16(os.path.join(OUT_DIR, "doppler_ref_q.hex"), flat_q)
print(f" stimulus: {len(stim)} packed lines (expected {CHIRPS_PER_FRAME * DOPPLER_RANGE_BINS})") expected_stim = CHIRPS_PER_FRAME * DOPPLER_RANGE_BINS
print(f" stimulus: {len(stim)} packed lines (expected {expected_stim})")
print(f" golden: {len(flat_i)} lines i / {len(flat_q)} lines q " print(f" golden: {len(flat_i)} lines i / {len(flat_q)} lines q "
f"(expected {DOPPLER_RANGE_BINS * DOPPLER_TOTAL_BINS})") f"(expected {DOPPLER_RANGE_BINS * DOPPLER_TOTAL_BINS})")
@@ -133,10 +143,11 @@ def gen_fullchain_realdata():
seed=SCENE_SEED, seed=SCENE_SEED,
) )
stim = [] stim = [
for c in range(CHIRPS_PER_FRAME): (frame_i[c][rb], frame_q[c][rb])
for rb in range(FULLCHAIN_INPUT_BINS): for c in range(CHIRPS_PER_FRAME)
stim.append((frame_i[c][rb], frame_q[c][rb])) for rb in range(FULLCHAIN_INPUT_BINS)
]
write_hex_32(os.path.join(OUT_DIR, "fullchain_range_input.hex"), stim) write_hex_32(os.path.join(OUT_DIR, "fullchain_range_input.hex"), stim)
# fpga_model.RangeBinDecimator is hard-coded to 2048->512, DECIM=4 — production. # fpga_model.RangeBinDecimator is hard-coded to 2048->512, DECIM=4 — production.
@@ -152,6 +163,16 @@ def gen_fullchain_realdata():
write_hex_16(os.path.join(OUT_DIR, "fullchain_doppler_ref_i.hex"), flat_i) write_hex_16(os.path.join(OUT_DIR, "fullchain_doppler_ref_i.hex"), flat_i)
write_hex_16(os.path.join(OUT_DIR, "fullchain_doppler_ref_q.hex"), flat_q) write_hex_16(os.path.join(OUT_DIR, "fullchain_doppler_ref_q.hex"), flat_q)
# Same arrays serialized for v7.replay COSIM_DIR format (GUI replay).
np.save(os.path.join(OUT_DIR, "decimated_range_i.npy"),
np.asarray(decim_i_2d, dtype=np.int32))
np.save(os.path.join(OUT_DIR, "decimated_range_q.npy"),
np.asarray(decim_q_2d, dtype=np.int32))
np.save(os.path.join(OUT_DIR, "doppler_map_i.npy"),
np.asarray(doppler_i, dtype=np.int32))
np.save(os.path.join(OUT_DIR, "doppler_map_q.npy"),
np.asarray(doppler_q, dtype=np.int32))
print(f" stimulus: {len(stim)} packed lines " print(f" stimulus: {len(stim)} packed lines "
f"(expected {CHIRPS_PER_FRAME * FULLCHAIN_INPUT_BINS})") f"(expected {CHIRPS_PER_FRAME * FULLCHAIN_INPUT_BINS})")
print(f" golden: {len(flat_i)} lines i / {len(flat_q)} lines q " print(f" golden: {len(flat_i)} lines i / {len(flat_q)} lines q "
@@ -164,19 +185,31 @@ def main():
gen_fullchain_realdata() gen_fullchain_realdata()
print("\nGenerated files:") print("\nGenerated files:")
for f in ( hex_files = (
"doppler_input_realdata.hex", "doppler_input_realdata.hex",
"doppler_ref_i.hex", "doppler_ref_i.hex",
"doppler_ref_q.hex", "doppler_ref_q.hex",
"fullchain_range_input.hex", "fullchain_range_input.hex",
"fullchain_doppler_ref_i.hex", "fullchain_doppler_ref_i.hex",
"fullchain_doppler_ref_q.hex", "fullchain_doppler_ref_q.hex",
): )
for f in hex_files:
path = os.path.join(OUT_DIR, f) path = os.path.join(OUT_DIR, f)
with open(path) as fp: with open(path) as fp:
n_lines = sum(1 for _ in fp) n_lines = sum(1 for _ in fp)
print(f" {f:40s} {n_lines:7d} lines ({os.path.getsize(path):7d} bytes)") print(f" {f:40s} {n_lines:7d} lines ({os.path.getsize(path):7d} bytes)")
npy_files = (
"decimated_range_i.npy",
"decimated_range_q.npy",
"doppler_map_i.npy",
"doppler_map_q.npy",
)
for f in npy_files:
path = os.path.join(OUT_DIR, f)
arr = np.load(path)
print(f" {f:40s} shape={arr.shape!s:>12s} ({os.path.getsize(path):7d} bytes)")
if __name__ == '__main__': if __name__ == '__main__':
main() main()
+1 -1
View File
@@ -68,7 +68,7 @@ DATA_PACKET_SIZE = 11 # 1 + 4 + 2 + 2 + 1 + 1
STATUS_PACKET_SIZE = 26 # 1 + 24 + 1 STATUS_PACKET_SIZE = 26 # 1 + 24 + 1
NUM_RANGE_BINS = 512 NUM_RANGE_BINS = 512
NUM_DOPPLER_BINS = 48 # PR-F/PR-Q: 3 sub-frames * 16 (matches FPGA RP_NUM_DOPPLER_BINS) NUM_DOPPLER_BINS = 48 # PR-F/PR-Q: 3 sub-frames * 16 (= FPGA RP_NUM_DOPPLER_BINS)
NUM_CELLS = NUM_RANGE_BINS * NUM_DOPPLER_BINS # 24576 NUM_CELLS = NUM_RANGE_BINS * NUM_DOPPLER_BINS # 24576
WATERFALL_DEPTH = 64 WATERFALL_DEPTH = 64
+10 -10
View File
@@ -483,7 +483,7 @@ class TestWaveformConfig(unittest.TestCase):
from v7.models import WaveformConfig from v7.models import WaveformConfig
wc = WaveformConfig() wc = WaveformConfig()
# MEDIUM has the largest per-subframe v_unamb (smallest PRI). # MEDIUM has the largest per-subframe v_unamb (smallest PRI).
# K=6 default ~266 m/s; well above UAS speeds 5080 m/s. # K=6 default -> ~266 m/s; well above UAS speeds 50-80 m/s.
v6 = wc.extended_max_velocity_mps_crt() v6 = wc.extended_max_velocity_mps_crt()
self.assertAlmostEqual(v6, wc.max_velocity_medium_mps * 6, places=2) self.assertAlmostEqual(v6, wc.max_velocity_medium_mps * 6, places=2)
# K=3 should give half of K=6. # K=3 should give half of K=6.
@@ -669,7 +669,7 @@ class TestSoftwareFPGASignalChain(unittest.TestCase):
from v7.software_fpga import SoftwareFPGA from v7.software_fpga import SoftwareFPGA
from radar_protocol import NUM_RANGE_BINS, NUM_DOPPLER_BINS from radar_protocol import NUM_RANGE_BINS, NUM_DOPPLER_BINS
# Production dimensions: 48 chirps × 2048 samples. # Production dimensions: 48 chirps x 2048 samples.
iq_i = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64) iq_i = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64)
iq_q = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64) iq_q = np.zeros((NUM_DOPPLER_BINS, 2048), dtype=np.int64)
# Inject a single strong tone in bin 10 of every chirp. # Inject a single strong tone in bin 10 of every chirp.
@@ -803,12 +803,12 @@ class TestReplayEngineCosim(unittest.TestCase):
if not self._available(): if not self._available():
self.skipTest("co-sim data not found") self.skipTest("co-sim data not found")
from v7.replay import ReplayEngine from v7.replay import ReplayEngine
from radar_protocol import RadarFrame from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS
engine = ReplayEngine(self.COSIM_DIR) engine = ReplayEngine(self.COSIM_DIR)
frame = engine.get_frame(0) frame = engine.get_frame(0)
self.assertIsInstance(frame, RadarFrame) self.assertIsInstance(frame, RadarFrame)
self.assertEqual(frame.range_doppler_i.shape, (64, 32)) self.assertEqual(frame.range_doppler_i.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
self.assertEqual(frame.magnitude.shape, (64, 32)) self.assertEqual(frame.magnitude.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
def test_get_frame_out_of_range(self): def test_get_frame_out_of_range(self):
if not self._available(): if not self._available():
@@ -849,7 +849,7 @@ class TestReplayEngineRawIQ(unittest.TestCase):
from v7.software_fpga import SoftwareFPGA from v7.software_fpga import SoftwareFPGA
from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS from radar_protocol import RadarFrame, NUM_RANGE_BINS, NUM_DOPPLER_BINS
# Production dimensions: 48 chirps × 2048 samples per frame. # Production dimensions: 48 chirps x 2048 samples per frame.
raw = (np.random.randn(2, NUM_DOPPLER_BINS, 2048) raw = (np.random.randn(2, NUM_DOPPLER_BINS, 2048)
+ 1j * np.random.randn(2, NUM_DOPPLER_BINS, 2048)) + 1j * np.random.randn(2, NUM_DOPPLER_BINS, 2048))
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f: with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
@@ -1082,7 +1082,7 @@ class TestUnfoldVelocityCRT(unittest.TestCase):
v_unamb, v_res = self._vu_vr() v_unamb, v_res = self._vu_vr()
v_true = -75.0 v_true = -75.0
v_meas = [_fold_v(v_true, vu) for vu in v_unamb] v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertAlmostEqual(v_est, v_true, places=1) self.assertAlmostEqual(v_est, v_true, places=1)
self.assertEqual(conf, "CONFIRMED") self.assertEqual(conf, "CONFIRMED")
@@ -1105,7 +1105,7 @@ class TestUnfoldVelocityCRT(unittest.TestCase):
v_true = 25.0 v_true = 25.0
# SHORT + MEDIUM only (LONG dropped out, e.g. clutter). # SHORT + MEDIUM only (LONG dropped out, e.g. clutter).
v_meas = [_fold_v(v_true, v_unamb[0]), _fold_v(v_true, v_unamb[1])] v_meas = [_fold_v(v_true, v_unamb[0]), _fold_v(v_true, v_unamb[1])]
v_est, conf, alias = unfold_velocity_crt( v_est, conf, _alias = unfold_velocity_crt(
v_meas, [v_unamb[0], v_unamb[1]], [v_res[0], v_res[1]], v_meas, [v_unamb[0], v_unamb[1]], [v_res[0], v_res[1]],
) )
self.assertAlmostEqual(v_est, v_true, places=1) self.assertAlmostEqual(v_est, v_true, places=1)
@@ -1117,7 +1117,7 @@ class TestUnfoldVelocityCRT(unittest.TestCase):
v_unamb, v_res = self._vu_vr() v_unamb, v_res = self._vu_vr()
# Random per-PRI values that do not correspond to any v_true. # Random per-PRI values that do not correspond to any v_true.
v_meas = [10.0, -30.0, 35.0] v_meas = [10.0, -30.0, 35.0]
v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res) v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res)
self.assertEqual(conf, "AMBIGUOUS") self.assertEqual(conf, "AMBIGUOUS")
self.assertAlmostEqual(v_est, 10.0, places=2) # PRI-0 fallback self.assertAlmostEqual(v_est, 10.0, places=2) # PRI-0 fallback
@@ -1130,7 +1130,7 @@ class TestUnfoldVelocityCRT(unittest.TestCase):
# Pick v_true near the advertised CRT ceiling. # Pick v_true near the advertised CRT ceiling.
v_true = wc.extended_max_velocity_mps_crt(max_alias_k=6) - 5.0 # ~261 m/s v_true = wc.extended_max_velocity_mps_crt(max_alias_k=6) - 5.0 # ~261 m/s
v_meas = [_fold_v(v_true, vu) for vu in v_unamb] v_meas = [_fold_v(v_true, vu) for vu in v_unamb]
v_est, conf, alias = unfold_velocity_crt(v_meas, v_unamb, v_res, max_alias_k=6) v_est, conf, _alias = unfold_velocity_crt(v_meas, v_unamb, v_res, max_alias_k=6)
self.assertAlmostEqual(v_est, v_true, places=0) # within 1 m/s self.assertAlmostEqual(v_est, v_true, places=0) # within 1 m/s
# Should still be CONFIRMED for a real velocity at this scale. # Should still be CONFIRMED for a real velocity at this scale.
self.assertIn(conf, ("CONFIRMED", "LIKELY")) self.assertIn(conf, ("CONFIRMED", "LIKELY"))
+1 -1
View File
@@ -94,7 +94,7 @@ def _make_dspin() -> QDoubleSpinBox:
# ============================================================================= # =============================================================================
class RangeDopplerCanvas(FigureCanvasQTAgg): class RangeDopplerCanvas(FigureCanvasQTAgg):
"""Matplotlib canvas showing the Range-Doppler map (NUM_RANGE_BINS x NUM_DOPPLER_BINS) with dark theme.""" """Matplotlib canvas showing the Range-Doppler map (NUM_RANGE_BINS x NUM_DOPPLER_BINS) with dark theme.""" # noqa: E501
def __init__(self, _parent=None): def __init__(self, _parent=None):
fig = Figure(figsize=(10, 6), facecolor=DARK_BG) fig = Figure(figsize=(10, 6), facecolor=DARK_BG)
+2 -4
View File
@@ -573,7 +573,7 @@ def unfold_velocity_crt(
alias depth k_0 [-K, K] generates candidates alias depth k_0 [-K, K] generates candidates
``v_true = v_meas_0 + k_0 · 2 · v_unamb_0``. A candidate is ``v_true = v_meas_0 + k_0 · 2 · v_unamb_0``. A candidate is
*valid* when it folds back into all other active PRIs to within *valid* when it folds back into all other active PRIs to within
``tol_factor × max(v_res)``. ``tol_factor * max(v_res)``.
Args: Args:
v_meas_per_sf: signed velocity measurement per active sub-frame v_meas_per_sf: signed velocity measurement per active sub-frame
@@ -652,9 +652,7 @@ def unfold_velocity_crt(
confidence = "AMBIGUOUS" confidence = "AMBIGUOUS"
elif n_sf == 3 and n_cands == 1: elif n_sf == 3 and n_cands == 1:
confidence = "CONFIRMED" confidence = "CONFIRMED"
elif n_sf == 3 and n_cands == 2: elif (n_sf == 3 and n_cands == 2) or (n_sf == 2 and n_cands == 1):
confidence = "LIKELY"
elif n_sf == 2 and n_cands == 1:
confidence = "LIKELY" confidence = "LIKELY"
else: # n_sf == 2 and n_cands == 2 else: # n_sf == 2 and n_cands == 2
confidence = "AMBIGUOUS" confidence = "AMBIGUOUS"
+5 -1
View File
@@ -192,7 +192,11 @@ class SoftwareFPGA:
# to math-generated twiddles otherwise). # to math-generated twiddles otherwise).
range_i = np.zeros((n_chirps, n_samples), dtype=np.int64) range_i = np.zeros((n_chirps, n_samples), dtype=np.int64)
range_q = np.zeros((n_chirps, n_samples), dtype=np.int64) range_q = np.zeros((n_chirps, n_samples), dtype=np.int64)
twiddle_path = TWIDDLE_2048 if (n_samples == 2048 and os.path.exists(TWIDDLE_2048)) else None twiddle_path = (
TWIDDLE_2048
if (n_samples == 2048 and os.path.exists(TWIDDLE_2048))
else None
)
for c in range(n_chirps): for c in range(n_chirps):
range_i[c], range_q[c] = run_range_fft( range_i[c], range_q[c] = run_range_fft(
iq_i[c].astype(np.int64), iq_i[c].astype(np.int64),
+5 -1
View File
@@ -196,7 +196,11 @@ class RadarDataWorker(QThread):
# for SHORT/MEDIUM sub-frame bins until PR-Q.5 replaces this path # for SHORT/MEDIUM sub-frame bins until PR-Q.5 replaces this path
# with extract_targets_from_frame_crt. # with extract_targets_from_frame_crt.
v_res = self._waveform.velocity_resolution_long_mps v_res = self._waveform.velocity_resolution_long_mps
n_doppler = frame.detections.shape[1] if frame.detections.ndim == 2 else self._waveform.n_doppler_bins n_doppler = (
frame.detections.shape[1]
if frame.detections.ndim == 2
else self._waveform.n_doppler_bins
)
doppler_center = n_doppler // 2 doppler_center = n_doppler // 2
for idx in det_indices: for idx in det_indices:
+2
View File
@@ -55,3 +55,5 @@ select = [
"test_*.py" = ["ARG", "T20", "ERA"] "test_*.py" = ["ARG", "T20", "ERA"]
# Re-export modules: unused imports are intentional # Re-export modules: unused imports are intentional
"v7/hardware.py" = ["F401"] "v7/hardware.py" = ["F401"]
# FPGA cosim scripts: CLI tools — print() is the intended output channel
"9_Firmware/9_2_FPGA/tb/cosim/**.py" = ["T20"]