start turning this into a class

This commit is contained in:
Brendan Haines 2024-12-02 19:29:23 -07:00
parent 2c9e9b0eb2
commit 167a0b7aef

View File

@ -1,7 +1,7 @@
# %% imports # %% imports
import copy import copy
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Any, Dict, Optional
import adi import adi
import iio import iio
@ -17,99 +17,128 @@ from util import HAM_BANDS, db20, net2s, s2net
dir_ = Path(__file__).parent dir_ = Path(__file__).parent
# https://wiki.analog.com/resources/tools-software/linux-drivers/iio-transceiver/ad9361-customization
# %% helper functions
def get_config(sdr: adi.ad9361):
config = dict()
config["rx_lo"] = sdr.rx_lo
config["rx_rf_bandwidth"] = sdr.rx_rf_bandwidth
config["rx_enabled_channels"] = sdr.rx_enabled_channels
for chan in config["rx_enabled_channels"]:
config[f"rx_hardwaregain_chan{chan}"] = getattr(sdr, f"rx_hardwaregain_chan{chan}")
config[f"gain_control_mode_chan{chan}"] = getattr(sdr, f"gain_control_mode_chan{chan}")
config["tx_lo"] = sdr.tx_lo
config["tx_rf_bandwidth"] = sdr.tx_rf_bandwidth
config["tx_cyclic_buffer"] = sdr.tx_cyclic_buffer
config["tx_enabled_channels"] = sdr.tx_enabled_channels
for chan in config["tx_enabled_channels"]:
config[f"tx_hardwaregain_chan{chan}"] = getattr(sdr, f"tx_hardwaregain_chan{chan}")
config["filter"] = sdr.filter
config["sample_rate"] = sdr.sample_rate
config["loopback"] = sdr.loopback
return config
def generate_tone(f: float, N: int = 1024, fs: Optional[float] = None):
if fs is None:
fs = sdr.sample_rate
fs = int(fs)
fc = int(f / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2**14
q = np.sin(2 * np.pi * t * fc) * 2**14
iq = i + 1j * q
return iq
# %% connection # %% connection
sdr = adi.ad9361(uri="ip:192.168.3.1") class Charon:
def __init__(
self,
uri: str,
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
):
# everything RF
self.sdr = adi.ad9361(uri=uri)
for attr, expected in [
("adi,2rx-2tx-mode-enable", True),
("adi,gpo-manual-mode-enable", True),
]:
# available configuration options:
# https://wiki.analog.com/resources/tools-software/linux-drivers/iio-transceiver/ad9361-customization
if bool(self.sdr._get_iio_debug_attr(attr)) != expected:
raise ValueError(
f"'{attr}' is not set in pluto. "
"See README.md for instructions for one time configuration instructions"
)
# TODO: it might be possible to change this on the fly.
# I think we'll actually just fail in __init__ of ad9361 if 2rx-2tx is wrong
# %% verify device configuration self.sdr.rx_lo = int(frequency[0])
mode_2r2t = bool(sdr._get_iio_debug_attr("adi,2rx-2tx-mode-enable")) self.sdr.tx_lo = int(frequency[0])
if not mode_2r2t: self.sdr.sample_rate = 30e6
raise ValueError("'adi,2rx-2tx-mode-enable' is not set in pluto. See README.md for instructions for changing this") self.sdr.rx_rf_bandwidth = int(4e6)
# TODO: it might be possible to change this on the fly. I think we'll actually just fail in __init__ for sdr self.sdr.tx_rf_bandwidth = int(4e6)
self.sdr.rx_destroy_buffer()
self.sdr.tx_destroy_buffer()
self.sdr.rx_enabled_channels = [0, 1]
self.sdr.tx_enabled_channels = [0]
self.sdr.loopback = 0
self.sdr.gain_control_mode_chan0 = "manual"
self.sdr.gain_control_mode_chan1 = "manual"
self.sdr.rx_hardwaregain_chan0 = 40
self.sdr.rx_hardwaregain_chan1 = 40
self.sdr.tx_hardwaregain_chan0 = -10
# %% switch control outputs # switch control
# NOTE: this doesn't appear to work ctx = iio.Context(uri)
sdr._set_iio_debug_attr_str("adi,gpo-manual-mode-enable", "1") self.ctrl = ctx.find_device("ad9361-phy")
sdr._get_iio_debug_attr_str("adi,gpo-manual-mode-enable-mask") # raw ad9361 register accesss:
# but direct register access does # https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python
# https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python # https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
ctx = iio.Context("ip:192.168.3.1") self.ctrl.reg_write(0x26, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
ctrl = ctx.find_device("ad9361-phy") self._set_gpo(0)
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf # TODO: init AuxDAC
ctrl.reg_write(0x26, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
ctrl.reg_write(0x27, 0x10) # bits 7-4: GPO3-0 def get_config(self) -> Dict[str, Any]:
config = dict()
config["rx_lo"] = self.sdr.rx_lo
config["rx_rf_bandwidth"] = self.sdr.rx_rf_bandwidth
config["rx_enabled_channels"] = self.sdr.rx_enabled_channels
for chan in config["rx_enabled_channels"]:
config[f"rx_hardwaregain_chan{chan}"] = getattr(self.sdr, f"rx_hardwaregain_chan{chan}")
config[f"gain_control_mode_chan{chan}"] = getattr(self.sdr, f"gain_control_mode_chan{chan}")
config["tx_lo"] = self.sdr.tx_lo
config["tx_rf_bandwidth"] = self.sdr.tx_rf_bandwidth
config["tx_cyclic_buffer"] = self.sdr.tx_cyclic_buffer
config["tx_enabled_channels"] = self.sdr.tx_enabled_channels
for chan in config["tx_enabled_channels"]:
config[f"tx_hardwaregain_chan{chan}"] = getattr(self.sdr, f"tx_hardwaregain_chan{chan}")
config["filter"] = self.sdr.filter
config["sample_rate"] = self.sdr.sample_rate
config["loopback"] = self.sdr.loopback
return config
def _set_gpo(self, value: int) -> None:
self.ctrl.reg_write(0x27, (value & 0x0F) << 4) # bits 7-4: GPO3-0
def set_output_power(self, power: float):
if power == -5:
# FIXME: this is a hack because I don't want to go through re-calibration
tx_gain = -8
else:
raise NotImplementedError()
# # TODO: correct over frequency
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
self.sdr.tx_hardwaregain_chan0 = float(tx_gain)
def set_output(self, frequency: float, power: float, offset_frequency: float = 1e6):
# TODO: switch to DDS in Pluto
def generate_tone(f: float, N: int = 1024, fs: Optional[float] = None):
if fs is None:
fs = sdr.sample_rate
fs = int(fs)
fc = int(f / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2**14
q = np.sin(2 * np.pi * t * fc) * 2**14
iq = i + 1j * q
return iq
self.sdr.tx_destroy_buffer()
self.set_output_power(power)
self.sdr.tx_lo = int(frequency - offset_frequency)
offset_frequency = frequency - self.sdr.tx_lo
self.sdr.tx_cyclic_buffer = True
self.sdr.tx(generate_tone(offset_frequency))
def _rx(self) -> npt.ArrayLike:
self.sdr.rx_destroy_buffer()
return np.array(self.sdr.rx())
# %%
sdr = Charon("ip:192.168.3.1")
# %% initialization # %% initialization
sdr.rx_lo = int(2.0e9) config = sdr.get_config()
sdr.tx_lo = int(2.0e9)
sdr.sample_rate = 30e6
sdr.rx_rf_bandwidth = int(4e6)
sdr.tx_rf_bandwidth = int(4e6)
sdr.rx_destroy_buffer()
sdr.tx_destroy_buffer()
sdr.rx_enabled_channels = [0, 1]
sdr.tx_enabled_channels = [0]
sdr.loopback = 0
sdr.gain_control_mode_chan0 = "manual"
sdr.gain_control_mode_chan1 = "manual"
sdr.rx_hardwaregain_chan0 = 40
sdr.rx_hardwaregain_chan1 = 40
sdr.tx_hardwaregain_chan0 = -10
config = get_config(sdr)
config config
# %%
sdr.tx_destroy_buffer() # must destroy buffer before changing cyclicity
sdr.tx_cyclic_buffer = True
sdr.tx(generate_tone(1e6))
# %%
sdr.rx_destroy_buffer()
data = sdr.rx()
# %% Plot in time # %% Plot in time
data = sdr._rx()
fig, axs = plt.subplots(2, 1, sharex=True, tight_layout=True) fig, axs = plt.subplots(2, 1, sharex=True, tight_layout=True)
axs[0].plot(np.real(data).T) axs[0].plot(np.real(data).T)
axs[1].plot(np.imag(data).T) axs[1].plot(np.imag(data).T)
@ -131,28 +160,6 @@ plt.grid(True)
plt.show() plt.show()
# %% TX helper functions
def set_output_power(power: float):
if power == -5:
# FIXME: this is a hack because I don't want to go through re-calibration
tx_gain = -8
else:
raise NotImplementedError()
# # TODO: correct over frequency
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
sdr.tx_hardwaregain_chan0 = float(tx_gain)
def set_output(frequency: float, power: float, offset_frequency: float = 1e6):
sdr.tx_destroy_buffer()
set_output_power(power)
sdr.tx_lo = int(frequency - offset_frequency)
offset_frequency = frequency - sdr.tx_lo
sdr.tx_cyclic_buffer = True
sdr.tx(generate_tone(offset_frequency))
# %% # %%
def vna_capture(frequency: npt.ArrayLike): def vna_capture(frequency: npt.ArrayLike):
s = xr.DataArray( s = xr.DataArray(