Compare commits

...

4 Commits

Author SHA1 Message Date
958d1f96d1 filtering stuff
All checks were successful
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Successful in -15s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2024-12-02 22:55:39 -07:00
48d559f084 I'm not sure why exp creates such a bad tone 2024-12-02 22:38:56 -07:00
167a0b7aef start turning this into a class 2024-12-02 19:29:23 -07:00
2c9e9b0eb2 move some helpers to util.py 2024-12-02 18:56:43 -07:00
3 changed files with 313 additions and 173 deletions

12
charon_vna/io_.py Normal file
View File

@ -0,0 +1,12 @@
from pathlib import Path
import skrf as rf
import xarray as xr
from util import net2s
# scikit-rf has no way to save files aside from touchstone and pickle
def cal2zarr(cal: rf.calibration.Calibration, outpath: Path):
ideals = [net2s(net) for net in cal.ideals]
measured = [net2s(net) for net in cal.measured]
# s.to_zarr(outpath)

79
charon_vna/util.py Normal file
View File

@ -0,0 +1,79 @@
import numpy as np
import skrf as rf
import xarray as xr
HAM_BANDS = [
[135.7e3, 137.8e3],
[472e3, 479e3],
[1.8e6, 2e6],
[3.5e6, 4e6],
[5332e3, 5405e3],
[7e6, 7.3e6],
[10.1e6, 10.15e6],
[14e6, 14.35e6],
[18.068e6, 18.168e6],
[21e6, 21.45e6],
[24.89e6, 24.99e6],
[28e6, 29.7e6],
[50e6, 54e6],
[144e6, 148e6],
[219e6, 220e6],
[222e6, 225e6],
[420e6, 450e6],
[902e6, 928e6],
[1240e6, 1300e6],
[2300e6, 2310e6],
[2390e6, 2450e6],
[3400e6, 3450e6],
[5650e6, 5925e6],
[10e9, 10.5e9],
[24e9, 24.25e9],
[47e9, 47.2e9],
[76e9, 81e9],
[122.25e9, 123e9],
[134e9, 141e9],
[241e9, 250e9],
[275e9, np.inf],
]
def db10(p):
return 10 * np.log10(np.abs(p))
def db20(p):
return 20 * np.log10(np.abs(p))
def minmax(x):
return (np.min(x), np.max(x))
def s2net(s: xr.DataArray) -> rf.Network:
net = rf.Network(frequency=s.frequency, f_unit="Hz", s=s)
return net
def net2s(net: rf.Network) -> xr.DataArray:
port_tuples = net.port_tuples
m = list(set(t[0] for t in port_tuples))
m.sort()
m = np.array(m)
m += 1 # skrf uses 0-indexed ports
n = list(set(t[0] for t in port_tuples))
n.sort()
n = np.array(n)
n += 1 # skrf uses 0-indexed ports
s = xr.DataArray(
net.s,
dims=["frequency", "m", "n"],
coords=dict(
frequency=net.f,
m=m,
n=n,
),
)
return s

View File

@ -1,67 +1,116 @@
# %% imports # %% imports
import copy import copy
import time
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Any, Dict, Optional, Tuple
import adi import adi
import iio import iio
import matplotlib as mpl
import numpy as np import numpy as np
import skrf as rf import skrf as rf
import xarray as xr import xarray as xr
from matplotlib import pyplot as plt from matplotlib import pyplot as plt
from matplotlib.gridspec import GridSpec
from matplotlib.patches import Circle
from matplotlib.ticker import EngFormatter from matplotlib.ticker import EngFormatter
from numpy import typing as npt from numpy import typing as npt
from scipy import signal from scipy import signal
from util import HAM_BANDS, db20, net2s, s2net
dir_ = Path(__file__).parent dir_ = Path(__file__).parent
# %% connection
class Charon:
FREQUENCY_OFFSET = 1e6
def __init__(
self,
uri: str,
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
ports: Tuple[int] = (1,),
):
self.ports = ports
self.frequency = frequency
# everything RF
self.sdr = adi.ad9361(uri=uri)
for attr, expected in [
("adi,2rx-2tx-mode-enable", True),
# ("adi,gpo-manual-mode-enable", True),
]:
# available configuration options:
# https://wiki.analog.com/resources/tools-software/linux-drivers/iio-transceiver/ad9361-customization # https://wiki.analog.com/resources/tools-software/linux-drivers/iio-transceiver/ad9361-customization
if bool(self.sdr._get_iio_debug_attr(attr)) != expected:
raise ValueError(
f"'{attr}' is not set in pluto. "
"See README.md for instructions for one time configuration instructions"
)
# TODO: it might be possible to change this on the fly.
# I think we'll actually just fail in __init__ of ad9361 if 2rx-2tx is wrong
self.sdr.rx_lo = int(self.frequency[0])
self.sdr.tx_lo = int(self.frequency[0])
self.sdr.sample_rate = 30e6
self.sdr.rx_rf_bandwidth = int(4e6)
self.sdr.tx_rf_bandwidth = int(4e6)
self.sdr.rx_destroy_buffer()
self.sdr.tx_destroy_buffer()
self.sdr.rx_enabled_channels = [0, 1]
self.sdr.tx_enabled_channels = [0]
self.sdr.loopback = 0
self.sdr.gain_control_mode_chan0 = "manual"
self.sdr.gain_control_mode_chan1 = "manual"
self.sdr.rx_hardwaregain_chan0 = 40
self.sdr.rx_hardwaregain_chan1 = 40
self.sdr.tx_hardwaregain_chan0 = -10
# %% helper functions # switch control
def get_config(sdr: adi.ad9361): ctx = iio.Context(uri)
self.ctrl = ctx.find_device("ad9361-phy")
# raw ad9361 register accesss:
# https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
self.ctrl.reg_write(0x26, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
self._set_gpo(self.ports[0] - 1)
# TODO: init AuxDAC
def get_config(self) -> Dict[str, Any]:
config = dict() config = dict()
config["rx_lo"] = sdr.rx_lo config["rx_lo"] = self.sdr.rx_lo
config["rx_rf_bandwidth"] = sdr.rx_rf_bandwidth config["rx_rf_bandwidth"] = self.sdr.rx_rf_bandwidth
config["rx_enabled_channels"] = sdr.rx_enabled_channels config["rx_enabled_channels"] = self.sdr.rx_enabled_channels
for chan in config["rx_enabled_channels"]: for chan in config["rx_enabled_channels"]:
config[f"rx_hardwaregain_chan{chan}"] = getattr(sdr, f"rx_hardwaregain_chan{chan}") config[f"rx_hardwaregain_chan{chan}"] = getattr(self.sdr, f"rx_hardwaregain_chan{chan}")
config[f"gain_control_mode_chan{chan}"] = getattr(sdr, f"gain_control_mode_chan{chan}") config[f"gain_control_mode_chan{chan}"] = getattr(self.sdr, f"gain_control_mode_chan{chan}")
config["tx_lo"] = sdr.tx_lo config["tx_lo"] = self.sdr.tx_lo
config["tx_rf_bandwidth"] = sdr.tx_rf_bandwidth config["tx_rf_bandwidth"] = self.sdr.tx_rf_bandwidth
config["tx_cyclic_buffer"] = sdr.tx_cyclic_buffer config["tx_cyclic_buffer"] = self.sdr.tx_cyclic_buffer
config["tx_enabled_channels"] = sdr.tx_enabled_channels config["tx_enabled_channels"] = self.sdr.tx_enabled_channels
for chan in config["tx_enabled_channels"]: for chan in config["tx_enabled_channels"]:
config[f"tx_hardwaregain_chan{chan}"] = getattr(sdr, f"tx_hardwaregain_chan{chan}") config[f"tx_hardwaregain_chan{chan}"] = getattr(self.sdr, f"tx_hardwaregain_chan{chan}")
config["filter"] = sdr.filter config["filter"] = self.sdr.filter
config["sample_rate"] = sdr.sample_rate config["sample_rate"] = self.sdr.sample_rate
config["loopback"] = sdr.loopback config["loopback"] = self.sdr.loopback
return config return config
def _set_gpo(self, value: int) -> None:
self.ctrl.reg_write(0x27, (value & 0x0F) << 4) # bits 7-4: GPO3-0
def db10(p): def set_output_power(self, power: float):
return 10 * np.log10(np.abs(p)) if power == -5:
# FIXME: this is a hack because I don't want to go through re-calibration
tx_gain = -8
else:
raise NotImplementedError()
# # TODO: correct over frequency
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
self.sdr.tx_hardwaregain_chan0 = float(tx_gain)
def generate_tone(self, f: float, N: int = 1024, fs: Optional[float] = None):
def db20(p):
return 20 * np.log10(np.abs(p))
def minmax(x):
return (np.min(x), np.max(x))
def generate_tone(f: float, N: int = 1024, fs: Optional[float] = None):
if fs is None: if fs is None:
fs = sdr.sample_rate fs = self.sdr.sample_rate
fs = int(fs) fs = int(fs)
fc = int(f / (fs / N)) * (fs / N) fc = int(f / (fs / N)) * (fs / N)
ts = 1 / float(fs) ts = 1 / float(fs)
@ -72,57 +121,80 @@ def generate_tone(f: float, N: int = 1024, fs: Optional[float] = None):
return iq return iq
def set_output(self, frequency: float, power: float):
# TODO: switch to DDS in Pluto
# %% connection self.sdr.tx_destroy_buffer()
sdr = adi.ad9361(uri="ip:192.168.3.1") self.set_output_power(power)
self.sdr.tx_lo = int(frequency - self.FREQUENCY_OFFSET)
self.sdr.tx_cyclic_buffer = True
self.sdr.tx(self.generate_tone(self.FREQUENCY_OFFSET))
# %% verify device configuration def _rx(self, count: int = 1) -> npt.ArrayLike:
mode_2r2t = bool(sdr._get_iio_debug_attr("adi,2rx-2tx-mode-enable")) if count < 1:
if not mode_2r2t: raise ValueError
raise ValueError("'adi,2rx-2tx-mode-enable' is not set in pluto. See README.md for instructions for changing this") self.sdr.rx_destroy_buffer()
# TODO: it might be possible to change this on the fly. I think we'll actually just fail in __init__ for sdr return np.concatenate([np.array(self.sdr.rx()) for _ in range(count)], axis=-1)
# %% switch control outputs
# NOTE: this doesn't appear to work
sdr._set_iio_debug_attr_str("adi,gpo-manual-mode-enable", "1")
sdr._get_iio_debug_attr_str("adi,gpo-manual-mode-enable-mask")
# but direct register access does
# https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python
ctx = iio.Context("ip:192.168.3.1")
ctrl = ctx.find_device("ad9361-phy")
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
ctrl.reg_write(0x26, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
ctrl.reg_write(0x27, 0x10) # bits 7-4: GPO3-0
# %%
sdr = Charon("ip:192.168.3.1")
# %% initialization # %% initialization
sdr.rx_lo = int(2.0e9) config = sdr.get_config()
sdr.tx_lo = int(2.0e9)
sdr.sample_rate = 30e6
sdr.rx_rf_bandwidth = int(4e6)
sdr.tx_rf_bandwidth = int(4e6)
sdr.rx_destroy_buffer()
sdr.tx_destroy_buffer()
sdr.rx_enabled_channels = [0, 1]
sdr.tx_enabled_channels = [0]
sdr.loopback = 0
sdr.gain_control_mode_chan0 = "manual"
sdr.gain_control_mode_chan1 = "manual"
sdr.rx_hardwaregain_chan0 = 40
sdr.rx_hardwaregain_chan1 = 40
sdr.tx_hardwaregain_chan0 = -10
config = get_config(sdr)
config config
# %% # %% generate tone
sdr.tx_destroy_buffer() # must destroy buffer before changing cyclicity sdr.set_output(frequency=1e9 + sdr.FREQUENCY_OFFSET, power=-5)
sdr.tx_cyclic_buffer = True
sdr.tx(generate_tone(1e6)) # %% capture data
data = sdr._rx(1)
# %% # %%
sdr.rx_destroy_buffer() fig, axs = plt.subplots(2, 2, sharex=True, tight_layout=True)
data = sdr.rx() # ddc_tone = np.exp(
# -1j * 2 * np.pi * (sdr.FREQUENCY_OFFSET / sdr.sdr.sample_rate) * np.arange(data.shape[-1]), dtype=np.complex128
# )
ddc_tone = sdr.generate_tone(-sdr.FREQUENCY_OFFSET) * 2**-14
ddc_data = data * ddc_tone
axs[0][0].plot(np.real(ddc_data).T, label="DDC")
axs[1][0].plot(np.imag(ddc_data).T, label="DDC")
ddc_rel = ddc_data[1] / ddc_data[0]
axs[0][1].plot(np.real(ddc_rel).T, label="DDC")
axs[1][1].plot(np.imag(ddc_rel).T, label="DDC")
n, wn = signal.buttord(
wp=0.3 * sdr.FREQUENCY_OFFSET,
ws=0.8 * sdr.FREQUENCY_OFFSET,
gpass=1,
gstop=40,
analog=False,
fs=sdr.sdr.sample_rate,
)
sos = signal.butter(n, wn, "lowpass", analog=False, output="sos", fs=sdr.sdr.sample_rate)
filt_data = signal.sosfiltfilt(sos, ddc_data, axis=-1)
axs[0][0].plot(np.real(filt_data).T, label="FILT")
axs[1][0].plot(np.imag(filt_data).T, label="FILT")
filt_rel = filt_data[1] / filt_data[0]
axs[0][1].plot(np.real(filt_rel).T, label="FILT")
axs[1][1].plot(np.imag(filt_rel).T, label="FILT")
s = np.mean(filt_rel)
axs[0][1].axhline(np.real(s), color="k")
axs[1][1].axhline(np.imag(s), color="k")
axs[0][0].grid(True)
axs[1][0].grid(True)
axs[0][1].grid(True)
axs[1][1].grid(True)
axs[0][0].legend(loc="lower right")
axs[1][0].legend(loc="lower right")
axs[0][1].legend(loc="lower right")
axs[1][1].legend(loc="lower right")
axs[0][0].set_ylabel("Real")
axs[1][0].set_ylabel("Imag")
axs[0][0].set_title("By Channel")
axs[0][1].set_title("Relative")
# %% Plot in time # %% Plot in time
fig, axs = plt.subplots(2, 1, sharex=True, tight_layout=True) fig, axs = plt.subplots(2, 1, sharex=True, tight_layout=True)
@ -130,44 +202,63 @@ axs[0].plot(np.real(data).T)
axs[1].plot(np.imag(data).T) axs[1].plot(np.imag(data).T)
axs[0].set_ylabel("Real") axs[0].set_ylabel("Real")
axs[1].set_ylabel("Imag") axs[1].set_ylabel("Imag")
axs[-1].set_xlabel("Time") axs[0].grid(True)
axs[1].grid(True)
axs[-1].set_xlabel("Sample")
axs[-1].set_xlim(0, data.shape[-1])
fig.show()
# %%
fig, ax = plt.subplots(1, 1, tight_layout=True)
ax.plot(np.real(data).T, np.imag(data).T)
ax.grid(True)
ax.set_aspect("equal")
ax.set_xlabel("Real")
ax.set_ylabel("Imag")
ax.set_xlim(np.array([-1, 1]) * (2 ** (12 - 1) - 1))
ax.set_ylim(ax.get_xlim())
fig.show() fig.show()
# %% Plot in frequency # %% Plot in frequency
f, Pxx_den = signal.periodogram(data, sdr.sample_rate, axis=-1, return_onesided=False) f = np.fft.fftfreq(data.shape[-1], 1 / sdr.sdr.sample_rate)
RX_BITS = 10
Pxx_den = np.fft.fft(data, axis=-1) / (len(data) * 2 ** (2 * RX_BITS))
Pxx_den_ddc = np.fft.fft(ddc_data, axis=-1) / (len(ddc_data) * 2 ** (2 * RX_BITS))
Pxx_den_filt = np.fft.fft(filt_data, axis=-1) / (len(filt_data) * 2 ** (2 * RX_BITS))
fft_ddc_tone = np.fft.fft(ddc_tone, axis=-1) / (len(ddc_tone))
plt.figure() plt.figure()
for cc, chan in enumerate(sdr.rx_enabled_channels): for cc, chan in enumerate(sdr.sdr.rx_enabled_channels):
plt.semilogy(f, Pxx_den[cc], label=f"Channel {chan}") # plt.plot(
# np.fft.fftshift(f),
# db20(np.fft.fftshift(Pxx_den[cc])),
# label=f"Channel {chan}",
# )
plt.plot(
np.fft.fftshift(f),
db20(np.fft.fftshift(Pxx_den_ddc[cc])),
label=f"Channel {chan}",
)
plt.plot(
np.fft.fftshift(f),
db20(np.fft.fftshift(Pxx_den_filt[cc])),
label=f"Channel {chan}",
)
# plt.plot(
# np.fft.fftshift(f),
# db20(np.fft.fftshift(fft_ddc_tone)),
# label="DDC Tone",
# )
plt.legend() plt.legend()
plt.ylim([1e-7, 1e2]) # plt.ylim(1e-7, 1e2)
plt.xlabel("frequency [Hz]") plt.ylim(-100, 0)
plt.ylabel("PSD [V**2/Hz]") plt.xlabel("Frequency [Hz]")
plt.ylabel("Power [dBfs]")
plt.title(f"Fc = {sdr.sdr.rx_lo / 1e9} GHz")
plt.gca().xaxis.set_major_formatter(EngFormatter())
plt.grid(True) plt.grid(True)
plt.show() plt.show()
# %% TX helper functions
def set_output_power(power: float):
if power == -5:
# FIXME: this is a hack because I don't want to go through re-calibration
tx_gain = -8
else:
raise NotImplementedError()
# # TODO: correct over frequency
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
sdr.tx_hardwaregain_chan0 = float(tx_gain)
def set_output(frequency: float, power: float, offset_frequency: float = 1e6):
sdr.tx_destroy_buffer()
set_output_power(power)
sdr.tx_lo = int(frequency - offset_frequency)
offset_frequency = frequency - sdr.tx_lo
sdr.tx_cyclic_buffer = True
sdr.tx(generate_tone(offset_frequency))
# %% # %%
def vna_capture(frequency: npt.ArrayLike): def vna_capture(frequency: npt.ArrayLike):
s = xr.DataArray( s = xr.DataArray(
@ -217,7 +308,7 @@ reference_sparams = None
reference_sparams = dir_ / "RBP-135+_Plus25degC.s2p" reference_sparams = dir_ / "RBP-135+_Plus25degC.s2p"
if reference_sparams is not None: if reference_sparams is not None:
ref = rf.Network(reference_sparams) ref = rf.Network(reference_sparams)
rbp135 = xr.DataArray(ref.s, dims=["frequency", "m", "n"], coords=dict(frequency=ref.f, m=[1, 2], n=[1, 2])) rbp135 = net2s(ref)
axs[0].plot(rbp135.frequency, db20(rbp135.sel(m=1, n=1)), label="Datasheet") axs[0].plot(rbp135.frequency, db20(rbp135.sel(m=1, n=1)), label="Datasheet")
axs[1].plot(rbp135.frequency, np.rad2deg(np.angle(rbp135.sel(m=2, n=1))), label="Datasheet") axs[1].plot(rbp135.frequency, np.rad2deg(np.angle(rbp135.sel(m=2, n=1))), label="Datasheet")
@ -227,15 +318,8 @@ if reference_sparams is not None:
plt.show() plt.show()
# %%
def s2net(s: xr.DataArray) -> rf.Network:
net = rf.Network(frequency=s.frequency)
net.s = s.data
return net
# %% SOL calibration # %% SOL calibration
cal_frequency = np.linspace(70e6, 600e6, 2001) cal_frequency = np.linspace(70e6, 600e6, 101)
ideal_cal_frequency = rf.Frequency(np.min(cal_frequency), np.max(cal_frequency), len(cal_frequency)) ideal_cal_frequency = rf.Frequency(np.min(cal_frequency), np.max(cal_frequency), len(cal_frequency))
input("Connect SHORT and press ENTER...") input("Connect SHORT and press ENTER...")
short = vna_capture(frequency=cal_frequency) short = vna_capture(frequency=cal_frequency)
@ -258,41 +342,6 @@ calibration = rf.calibration.OnePort(
# %% # %%
s = vna_capture(frequency=cal_frequency) s = vna_capture(frequency=cal_frequency)
# %%
ham_bands = [
[135.7e3, 137.8e3],
[472e3, 479e3],
[1.8e6, 2e6],
[3.5e6, 4e6],
[5332e3, 5405e3],
[7e6, 7.3e6],
[10.1e6, 10.15e6],
[14e6, 14.35e6],
[18.068e6, 18.168e6],
[21e6, 21.45e6],
[24.89e6, 24.99e6],
[28e6, 29.7e6],
[50e6, 54e6],
[144e6, 148e6],
[219e6, 220e6],
[222e6, 225e6],
[420e6, 450e6],
[902e6, 928e6],
[1240e6, 1300e6],
[2300e6, 2310e6],
[2390e6, 2450e6],
[3400e6, 3450e6],
[5650e6, 5925e6],
[10e9, 10.5e9],
[24e9, 24.25e9],
[47e9, 47.2e9],
[76e9, 81e9],
[122.25e9, 123e9],
[134e9, 141e9],
[241e9, 250e9],
[275e9, np.inf],
]
# %% # %%
s_calibrated = calibration.apply_cal(s2net(s)) s_calibrated = calibration.apply_cal(s2net(s))
@ -302,7 +351,7 @@ s_calibrated.plot_s_smith()
plt.show() plt.show()
plt.figure() plt.figure()
for start, stop in ham_bands: for start, stop in HAM_BANDS:
plt.axvspan(start, stop, alpha=0.1, color="k") plt.axvspan(start, stop, alpha=0.1, color="k")
s_calibrated.plot_s_db() s_calibrated.plot_s_db()
# ref.plot_s_db(m=1, n=1) # ref.plot_s_db(m=1, n=1)
@ -312,7 +361,7 @@ plt.xlim(s_calibrated.f[0], s_calibrated.f[-1])
plt.show() plt.show()
plt.figure() plt.figure()
for start, stop in ham_bands: for start, stop in HAM_BANDS:
plt.axvspan(start, stop, alpha=0.1, color="k") plt.axvspan(start, stop, alpha=0.1, color="k")
# s_calibrated.plot_s_vswr() # s_calibrated.plot_s_vswr()
# drop invalid points # drop invalid points