Compare commits
15 Commits
b4e4b689ea
...
main
Author | SHA1 | Date | |
---|---|---|---|
2a5f1657f4 | |||
7644cbe0ad | |||
92c5876b23 | |||
3b12c21e20 | |||
1170da8b04 | |||
c5dc320989 | |||
452dddc19c | |||
3c02a4b388 | |||
339dbe255e | |||
81143a72c4 | |||
f021780971 | |||
6f947a28fa | |||
581131f1e0 | |||
adf6e40752 | |||
411f96dd87 |
481
examples/spectral_purity.ipynb
Normal file
481
examples/spectral_purity.ipynb
Normal file
File diff suppressed because one or more lines are too long
@@ -9,7 +9,7 @@ description = "RF Network Analyzer based on the Pluto SDR"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3"
|
||||
# keywords = ["one", "two"]
|
||||
license = { text = "MIT License" }
|
||||
license = "MIT"
|
||||
classifiers = ["Programming Language :: Python :: 3"]
|
||||
dependencies = [
|
||||
"numpy",
|
||||
@@ -34,7 +34,7 @@ charon-cli = "charon_vna.cli:main"
|
||||
charon-gui = "charon_vna.gui:main"
|
||||
|
||||
[tool.setuptools_scm]
|
||||
version_file = "charon_vna/_version.py"
|
||||
version_file = "src/charon_vna/_version.py"
|
||||
|
||||
[tool.black]
|
||||
line-length = 120
|
||||
|
@@ -274,7 +274,7 @@ def main() -> None:
|
||||
"Pluto IP Address",
|
||||
"Enter Pluto IP Address",
|
||||
QLineEdit.Normal,
|
||||
"192.168.2.1",
|
||||
Charon.DEFAULT_IP,
|
||||
)
|
||||
match = re.match(r"(\d{1,3}\.){3}\d{1,3}", text)
|
||||
if not match:
|
@@ -1,8 +1,9 @@
|
||||
# %% imports
|
||||
import copy
|
||||
import pickle
|
||||
from enum import IntEnum, unique
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Literal, Tuple
|
||||
from typing import Any, Callable, Dict, List, Literal, Tuple
|
||||
|
||||
import adi
|
||||
import iio
|
||||
@@ -82,7 +83,7 @@ class AD9361DacStepFactor(IntEnum):
|
||||
|
||||
class Charon:
|
||||
FREQUENCY_OFFSET = 1e6
|
||||
DEFAULT_IP = "192.168.2.1"
|
||||
DEFAULT_IP = "192.168.3.1"
|
||||
|
||||
calibration: rf.calibration.Calibration | None = None
|
||||
|
||||
@@ -90,8 +91,11 @@ class Charon:
|
||||
self,
|
||||
ip: str = DEFAULT_IP,
|
||||
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
|
||||
ports: Tuple[int] = (1,),
|
||||
ports: Tuple[int] | int = 1,
|
||||
):
|
||||
if isinstance(ports, int):
|
||||
ports = (np.arange(ports) + 1).tolist()
|
||||
ports = tuple(ports)
|
||||
self.ports = ports
|
||||
self.frequency = frequency
|
||||
|
||||
@@ -128,19 +132,22 @@ class Charon:
|
||||
self.sdr.rx_hardwaregain_chan1 = 10
|
||||
self.sdr.tx_hardwaregain_chan0 = -10
|
||||
|
||||
# # switch control
|
||||
# switch control
|
||||
ctx = iio.Context(uri)
|
||||
self.ctrl = ctx.find_device("ad9361-phy")
|
||||
# raw ad9361 register accesss:
|
||||
# https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python
|
||||
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf # noqa: E501
|
||||
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
|
||||
self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
|
||||
self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F)
|
||||
|
||||
# initialize switch control outputs
|
||||
self._set_gpo(0b0000)
|
||||
self._set_dac_code(value=0, channel=1)
|
||||
self._set_dac_code(value=0, channel=2)
|
||||
|
||||
self.set_switches(a=0, b=0)
|
||||
# set default switch state
|
||||
self.set_switches(a=self.ports[0] - 1, b=self.ports[0] - 1)
|
||||
|
||||
def get_config(self) -> Dict[str, Any]:
|
||||
config = dict()
|
||||
@@ -170,20 +177,15 @@ class Charon:
|
||||
def _set_gpo(self, value: int) -> None:
|
||||
self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0
|
||||
|
||||
def _set_dac_voltage(self, voltage: float, channel: Literal[1, 2]):
|
||||
raise NotImplementedError()
|
||||
def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]:
|
||||
word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD"))
|
||||
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"))
|
||||
|
||||
def set_switches(self, a: int, b: int, excitation: int | None = None):
|
||||
if excitation is None:
|
||||
excitation = a
|
||||
value = (word << 2) + (config & 0x3)
|
||||
vref = AD9361DacVref((config >> 2) & 0x3)
|
||||
step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
|
||||
|
||||
val = 0
|
||||
|
||||
val |= int(bool(excitation)) << 0 # exc = GPO0
|
||||
val |= int(bool(a)) << 2 # a = GPO2
|
||||
val |= int(bool(b)) << 1 # b = GPO1
|
||||
|
||||
self._set_gpo(val)
|
||||
return (value, vref, step_factor)
|
||||
|
||||
def _set_dac_code(
|
||||
self,
|
||||
@@ -203,7 +205,12 @@ class Charon:
|
||||
|
||||
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
|
||||
# page 13
|
||||
# vout = 0.97 * vref + (0.000738 + 9e-6 * (vref * 1.6 - 2)) * auxdac_word[9:0] * step_factor - 0.3572 * step_factor + 0.05
|
||||
# vout = (
|
||||
# 0.97 * vref
|
||||
# + (0.000738 + 9e-6 * (vref * 1.6 - 2)) * auxdac_word[9:0] * step_factor
|
||||
# - 0.3572 * step_factor
|
||||
# + 0.05
|
||||
# )
|
||||
# vout ~= (vref - 0.3572 * step_factor) + 0.000738 * auxdac_word[9:0] * step_factor
|
||||
# which gives a 1.5V swing with step_factor == 2 and 0.75V swing with step_factor == 1
|
||||
# vref basically just changes the minimum voltage with negligible impact on output scaling
|
||||
@@ -217,18 +224,17 @@ class Charon:
|
||||
(value & 0x3) | (vref.value << 2) | (step_factor << 4),
|
||||
)
|
||||
|
||||
def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]:
|
||||
word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD"))
|
||||
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"))
|
||||
def set_switches(self, b: int, a: int, excitation: int | None = None):
|
||||
if excitation is None:
|
||||
excitation = a
|
||||
|
||||
value = (word << 2) + (config & 0x3)
|
||||
vref = AD9361DacVref((config >> 2) & 0x3)
|
||||
step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
|
||||
val = 0
|
||||
|
||||
return (value, vref, step_factor)
|
||||
val |= int(bool(excitation)) << 0 # exc = GPO0
|
||||
val |= int(bool(a)) << 2 # a = GPO2
|
||||
val |= int(bool(b)) << 1 # b = GPO1
|
||||
|
||||
def _get_dac_voltage(self) -> float:
|
||||
raise NotImplementedError()
|
||||
self._set_gpo(val)
|
||||
|
||||
def set_output_power(self, power: float):
|
||||
pout = xr.DataArray(
|
||||
@@ -305,33 +311,42 @@ class Charon:
|
||||
|
||||
return np.mean(data[1] / data[0])
|
||||
|
||||
def sweep_b_over_a(self):
|
||||
def capture(
|
||||
self,
|
||||
callback: Callable[[int, int], None] | None = None,
|
||||
*,
|
||||
measurements: List[Tuple[int, int]] = None,
|
||||
):
|
||||
if measurements is None:
|
||||
measurements = [(m, n) for n in self.ports for m in self.ports]
|
||||
measurements = list(measurements)
|
||||
|
||||
s = xr.DataArray(
|
||||
np.zeros(
|
||||
len(self.frequency),
|
||||
[len(self.frequency), len(self.ports), len(self.ports)],
|
||||
dtype=np.complex128,
|
||||
),
|
||||
dims=["frequency"],
|
||||
dims=["frequency", "m", "n"],
|
||||
coords=dict(
|
||||
frequency=self.frequency,
|
||||
m=list(self.ports),
|
||||
n=list(self.ports),
|
||||
),
|
||||
)
|
||||
for frequency in self.frequency:
|
||||
s.loc[dict(frequency=frequency)] = self.get_b_over_a(frequency=frequency)
|
||||
return s
|
||||
|
||||
def vna_capture(self, frequency: npt.ArrayLike, callback: Callable[int, int] | None):
|
||||
s = xr.DataArray(
|
||||
np.empty(len(frequency), dtype=np.complex128),
|
||||
dims=["frequency"],
|
||||
coords=dict(
|
||||
frequency=frequency,
|
||||
),
|
||||
)
|
||||
total_count = len(measurements) * len(s.frequency)
|
||||
count = 0
|
||||
|
||||
for m in s.m.data:
|
||||
for n in s.n.data:
|
||||
if (m, n) in measurements:
|
||||
self.set_switches(b=m - 1, a=n - 1)
|
||||
|
||||
for ff, freq in enumerate(s.frequency.data):
|
||||
if callback is not None:
|
||||
# report progress during sweep
|
||||
callback(ff, len(s.frequency))
|
||||
callback(count, total_count)
|
||||
|
||||
self.set_output(frequency=freq, power=-5)
|
||||
self.sdr.rx_destroy_buffer()
|
||||
self.sdr.rx_lo = int(freq)
|
||||
@@ -341,14 +356,102 @@ class Charon:
|
||||
self.sdr.rx_hardwaregain_chan0 = 40
|
||||
self.sdr.rx_hardwaregain_chan1 = 40
|
||||
rx = self.sdr.rx()
|
||||
s.loc[dict(frequency=freq)] = np.mean(rx[1] / rx[0])
|
||||
s.loc[dict(frequency=freq, m=m, n=n)] = np.mean(rx[1] / rx[0])
|
||||
|
||||
count += 1
|
||||
|
||||
if callback is not None:
|
||||
# mark capture as complete
|
||||
callback(len(s.frequency), len(s.frequency))
|
||||
callback(total_count, total_count)
|
||||
|
||||
return s
|
||||
|
||||
def calibrate_sol(self, prompt: Callable[[str], None] | None = None, **kwargs) -> None:
|
||||
if len(self.ports) != 1:
|
||||
raise ValueError(
|
||||
f"SOL calibration needs only one port but {len(self.ports)} ports are enabled. "
|
||||
"Did you mean to use SOLT?"
|
||||
)
|
||||
|
||||
if prompt is None:
|
||||
prompt = lambda s: input(f"{s}\nENTER to continue...")
|
||||
|
||||
ideal = rf.media.DefinedGammaZ0(frequency=rf.media.Frequency.from_f(self.frequency, unit="Hz"))
|
||||
ideals = [ideal.short(), ideal.open(), ideal.load(0)]
|
||||
|
||||
names = ["short", "open", "load"]
|
||||
|
||||
measured = list()
|
||||
for name in names:
|
||||
prompt(f"Connect standard {name} to port {self.ports[0]}")
|
||||
measured.append(self.capture(**kwargs))
|
||||
|
||||
cal = rf.OnePort(measured=[s2net(m) for m in measured], ideals=ideals)
|
||||
|
||||
self.calibration = cal
|
||||
|
||||
def calibrate_solt(self, prompt: Callable[[str], None] | None = None, **kwargs) -> None:
|
||||
if len(self.ports) < 2:
|
||||
raise ValueError(
|
||||
f"SOLT calibration needs at least two ports but {len(self.ports)} ports are enabled. "
|
||||
"Did you mean to use SOL?"
|
||||
)
|
||||
|
||||
if len(self.ports) > 2:
|
||||
raise NotImplementedError("SOLT calibration with more than two ports not yet supported")
|
||||
|
||||
if prompt is None:
|
||||
prompt = lambda s: input(f"{s}\nENTER to continue...")
|
||||
|
||||
ideal = rf.media.DefinedGammaZ0(frequency=rf.media.Frequency.from_f(self.frequency, unit="Hz"))
|
||||
ideals = [ideal.short(), ideal.open(), ideal.load(0)]
|
||||
ideals = [rf.two_port_reflect(id, id) for id in ideals]
|
||||
|
||||
thru = np.zeros((len(self.frequency), 2, 2), dtype=np.complex128)
|
||||
thru[:, 0, 1] = 1
|
||||
thru[:, 1, 0] = 1
|
||||
thru = rf.Network(frequency=self.frequency, f_unit="Hz", s=thru)
|
||||
|
||||
ideals.append(thru)
|
||||
|
||||
names_1p = ["short", "open", "load"]
|
||||
names_2p = ["thru"]
|
||||
|
||||
measured = list()
|
||||
for name in names_1p:
|
||||
measured_param = list()
|
||||
for port in self.ports:
|
||||
prompt(f"Connect standard {name} to port {port}")
|
||||
measured_param.append(self.capture(measurements=[(port, port)], **kwargs).sel(m=port, n=port))
|
||||
measured.append(rf.two_port_reflect(*[s2net(m) for m in measured_param]))
|
||||
|
||||
for name in names_2p:
|
||||
prompt(f"Connect standard {name} between ports {self.ports[0]} and {self.ports[1]}")
|
||||
measured.append(s2net(self.capture(**kwargs)))
|
||||
|
||||
cal = rf.SOLT(measured=measured, ideals=ideals)
|
||||
|
||||
self.calibration = cal
|
||||
|
||||
def save_calibration(self, path: Path | str):
|
||||
path = Path(path)
|
||||
if path.suffix.lower() == ".pkl":
|
||||
with open(str(path), "wb") as f:
|
||||
pickle.dump(self.calibration, f)
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown calibration file extension: {path.suffix}")
|
||||
|
||||
def load_calibration(self, path: Path | str):
|
||||
path = Path(path)
|
||||
if path.suffix.lower() == ".pkl":
|
||||
with open(str(path), "rb") as f:
|
||||
cal = pickle.load(f)
|
||||
if not isinstance(cal, rf.calibration.Calibration):
|
||||
raise ValueError(f"Expected {rf.calibration.Calibration}, got {type(cal)}")
|
||||
self.calibration = cal
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown calibration file extension: {path.suffix}")
|
||||
|
||||
|
||||
# %%
|
||||
if __name__ == "__main__":
|
81
src/charon_vna/vna_dev.py
Normal file
81
src/charon_vna/vna_dev.py
Normal file
@@ -0,0 +1,81 @@
|
||||
# %% imports
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
from matplotlib.ticker import EngFormatter
|
||||
|
||||
from charon_vna.util import db20, net2s, s2net
|
||||
from charon_vna.vna import Charon
|
||||
|
||||
# %%
|
||||
frequency = np.linspace(80e6, 280e6, 301)
|
||||
|
||||
# %%
|
||||
vna = Charon(frequency=frequency, ports=2)
|
||||
|
||||
# %%
|
||||
s = vna.capture()
|
||||
|
||||
# %%
|
||||
for m in s.m.data:
|
||||
for n in s.n.data:
|
||||
plt.plot(s.frequency, db20(s.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$")
|
||||
plt.grid(True)
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
|
||||
# %%
|
||||
vna.calibrate_sol()
|
||||
|
||||
# %%
|
||||
vna.calibrate_solt()
|
||||
|
||||
# %%
|
||||
vna.save_calibration("./calibration.pkl")
|
||||
|
||||
# %%
|
||||
vna.load_calibration("./calibration.pkl")
|
||||
|
||||
# %%
|
||||
s2 = net2s(vna.calibration.apply_cal(s2net(s)))
|
||||
# s2.coords["m"] = s.m
|
||||
# s2.coords["n"] = s.n
|
||||
|
||||
for m in s.m.data:
|
||||
for n in s.n.data:
|
||||
plt.plot(s.frequency, db20(s.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$ (uncalibrated)")
|
||||
plt.plot(s2.frequency, db20(s2.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$ (calibrated)")
|
||||
plt.grid(True)
|
||||
plt.legend()
|
||||
plt.xlabel("Frequency [Hz]")
|
||||
plt.ylabel("Magnitude [dB]")
|
||||
# plt.ylim(-30, 5)
|
||||
plt.ylim(-25, 5)
|
||||
plt.xlim(s.frequency[0], s.frequency[-1])
|
||||
plt.gca().xaxis.set_major_formatter(EngFormatter())
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
for m in s.m.data:
|
||||
for n in s.n.data:
|
||||
if m != n:
|
||||
plt.plot(
|
||||
s.frequency,
|
||||
np.angle(s.sel(m=m, n=n), deg=True),
|
||||
label="$S_{" + str(m) + str(n) + "}$ (uncalibrated)",
|
||||
)
|
||||
plt.plot(
|
||||
s2.frequency,
|
||||
np.angle(s2.sel(m=m, n=n), deg=True),
|
||||
label="$S_{" + str(m) + str(n) + "}$ (calibrated)",
|
||||
)
|
||||
plt.grid(True)
|
||||
plt.legend()
|
||||
plt.ylabel("Phase [deg]")
|
||||
plt.xlabel("Frequency [Hz]")
|
||||
plt.xlim(s.frequency[0], s.frequency[-1])
|
||||
plt.gca().xaxis.set_major_formatter(EngFormatter())
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
# %%
|
Reference in New Issue
Block a user