15 Commits

Author SHA1 Message Date
2a5f1657f4 add some analysis of generated signal
All checks were successful
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Successful in -49s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-26 19:14:38 -06:00
7644cbe0ad fix path to version file
All checks were successful
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Successful in -58s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-21 21:40:21 -06:00
92c5876b23 switch to src-layout to avoid issues with multiple top level packages
Some checks failed
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Failing after -52s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-21 21:39:31 -06:00
3b12c21e20 typing + flake8
Some checks failed
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Failing after -53s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-16 22:00:07 -06:00
1170da8b04 plots 2025-07-16 21:56:30 -06:00
c5dc320989 2-port SOLT cal
Some checks failed
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Failing after -56s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-08 00:02:03 -06:00
452dddc19c functional 1 port calibration 2025-07-07 23:22:36 -06:00
3c02a4b388 allow downselecting measurements 2025-07-07 22:49:49 -06:00
339dbe255e smash some functions together
Some checks failed
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Failing after -1m4s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-07 22:28:21 -06:00
81143a72c4 bettah port handling
Some checks failed
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Failing after -58s
Publish Python 🐍 distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been skipped
2025-07-07 22:15:48 -06:00
f021780971 working 2-port capture 2025-07-07 20:13:59 -06:00
6f947a28fa add really basic usage file for developing vna class outside of gui 2025-07-07 20:04:54 -06:00
581131f1e0 document and rearrange some stuff 2025-07-07 20:04:16 -06:00
adf6e40752 change default IP to what I have configured on my pluto (revert later) 2025-07-07 19:43:15 -06:00
411f96dd87 only define default ip once 2025-07-07 19:39:16 -06:00
14 changed files with 725 additions and 60 deletions

File diff suppressed because one or more lines are too long

View File

@@ -9,7 +9,7 @@ description = "RF Network Analyzer based on the Pluto SDR"
readme = "README.md" readme = "README.md"
requires-python = ">=3" requires-python = ">=3"
# keywords = ["one", "two"] # keywords = ["one", "two"]
license = { text = "MIT License" } license = "MIT"
classifiers = ["Programming Language :: Python :: 3"] classifiers = ["Programming Language :: Python :: 3"]
dependencies = [ dependencies = [
"numpy", "numpy",
@@ -34,7 +34,7 @@ charon-cli = "charon_vna.cli:main"
charon-gui = "charon_vna.gui:main" charon-gui = "charon_vna.gui:main"
[tool.setuptools_scm] [tool.setuptools_scm]
version_file = "charon_vna/_version.py" version_file = "src/charon_vna/_version.py"
[tool.black] [tool.black]
line-length = 120 line-length = 120

View File

@@ -274,7 +274,7 @@ def main() -> None:
"Pluto IP Address", "Pluto IP Address",
"Enter Pluto IP Address", "Enter Pluto IP Address",
QLineEdit.Normal, QLineEdit.Normal,
"192.168.2.1", Charon.DEFAULT_IP,
) )
match = re.match(r"(\d{1,3}\.){3}\d{1,3}", text) match = re.match(r"(\d{1,3}\.){3}\d{1,3}", text)
if not match: if not match:

View File

@@ -1,8 +1,9 @@
# %% imports # %% imports
import copy import copy
import pickle
from enum import IntEnum, unique from enum import IntEnum, unique
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, Literal, Tuple from typing import Any, Callable, Dict, List, Literal, Tuple
import adi import adi
import iio import iio
@@ -82,7 +83,7 @@ class AD9361DacStepFactor(IntEnum):
class Charon: class Charon:
FREQUENCY_OFFSET = 1e6 FREQUENCY_OFFSET = 1e6
DEFAULT_IP = "192.168.2.1" DEFAULT_IP = "192.168.3.1"
calibration: rf.calibration.Calibration | None = None calibration: rf.calibration.Calibration | None = None
@@ -90,8 +91,11 @@ class Charon:
self, self,
ip: str = DEFAULT_IP, ip: str = DEFAULT_IP,
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3), frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
ports: Tuple[int] = (1,), ports: Tuple[int] | int = 1,
): ):
if isinstance(ports, int):
ports = (np.arange(ports) + 1).tolist()
ports = tuple(ports)
self.ports = ports self.ports = ports
self.frequency = frequency self.frequency = frequency
@@ -128,19 +132,22 @@ class Charon:
self.sdr.rx_hardwaregain_chan1 = 10 self.sdr.rx_hardwaregain_chan1 = 10
self.sdr.tx_hardwaregain_chan0 = -10 self.sdr.tx_hardwaregain_chan0 = -10
# # switch control # switch control
ctx = iio.Context(uri) ctx = iio.Context(uri)
self.ctrl = ctx.find_device("ad9361-phy") self.ctrl = ctx.find_device("ad9361-phy")
# raw ad9361 register accesss: # raw ad9361 register accesss:
# https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python # https://ez.analog.com/linux-software-drivers/f/q-a/120853/control-fmcomms3-s-gpo-with-python
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf # noqa: E501 # https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F) self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F)
# initialize switch control outputs
self._set_gpo(0b0000) self._set_gpo(0b0000)
self._set_dac_code(value=0, channel=1) self._set_dac_code(value=0, channel=1)
self._set_dac_code(value=0, channel=2) self._set_dac_code(value=0, channel=2)
self.set_switches(a=0, b=0) # set default switch state
self.set_switches(a=self.ports[0] - 1, b=self.ports[0] - 1)
def get_config(self) -> Dict[str, Any]: def get_config(self) -> Dict[str, Any]:
config = dict() config = dict()
@@ -170,20 +177,15 @@ class Charon:
def _set_gpo(self, value: int) -> None: def _set_gpo(self, value: int) -> None:
self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0 self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0
def _set_dac_voltage(self, voltage: float, channel: Literal[1, 2]): def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]:
raise NotImplementedError() word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD"))
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"))
def set_switches(self, a: int, b: int, excitation: int | None = None): value = (word << 2) + (config & 0x3)
if excitation is None: vref = AD9361DacVref((config >> 2) & 0x3)
excitation = a step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
val = 0 return (value, vref, step_factor)
val |= int(bool(excitation)) << 0 # exc = GPO0
val |= int(bool(a)) << 2 # a = GPO2
val |= int(bool(b)) << 1 # b = GPO1
self._set_gpo(val)
def _set_dac_code( def _set_dac_code(
self, self,
@@ -203,7 +205,12 @@ class Charon:
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf # https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
# page 13 # page 13
# vout = 0.97 * vref + (0.000738 + 9e-6 * (vref * 1.6 - 2)) * auxdac_word[9:0] * step_factor - 0.3572 * step_factor + 0.05 # vout = (
# 0.97 * vref
# + (0.000738 + 9e-6 * (vref * 1.6 - 2)) * auxdac_word[9:0] * step_factor
# - 0.3572 * step_factor
# + 0.05
# )
# vout ~= (vref - 0.3572 * step_factor) + 0.000738 * auxdac_word[9:0] * step_factor # vout ~= (vref - 0.3572 * step_factor) + 0.000738 * auxdac_word[9:0] * step_factor
# which gives a 1.5V swing with step_factor == 2 and 0.75V swing with step_factor == 1 # which gives a 1.5V swing with step_factor == 2 and 0.75V swing with step_factor == 1
# vref basically just changes the minimum voltage with negligible impact on output scaling # vref basically just changes the minimum voltage with negligible impact on output scaling
@@ -217,18 +224,17 @@ class Charon:
(value & 0x3) | (vref.value << 2) | (step_factor << 4), (value & 0x3) | (vref.value << 2) | (step_factor << 4),
) )
def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]: def set_switches(self, b: int, a: int, excitation: int | None = None):
word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD")) if excitation is None:
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG")) excitation = a
value = (word << 2) + (config & 0x3) val = 0
vref = AD9361DacVref((config >> 2) & 0x3)
step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
return (value, vref, step_factor) val |= int(bool(excitation)) << 0 # exc = GPO0
val |= int(bool(a)) << 2 # a = GPO2
val |= int(bool(b)) << 1 # b = GPO1
def _get_dac_voltage(self) -> float: self._set_gpo(val)
raise NotImplementedError()
def set_output_power(self, power: float): def set_output_power(self, power: float):
pout = xr.DataArray( pout = xr.DataArray(
@@ -305,50 +311,147 @@ class Charon:
return np.mean(data[1] / data[0]) return np.mean(data[1] / data[0])
def sweep_b_over_a(self): def capture(
self,
callback: Callable[[int, int], None] | None = None,
*,
measurements: List[Tuple[int, int]] = None,
):
if measurements is None:
measurements = [(m, n) for n in self.ports for m in self.ports]
measurements = list(measurements)
s = xr.DataArray( s = xr.DataArray(
np.zeros( np.zeros(
len(self.frequency), [len(self.frequency), len(self.ports), len(self.ports)],
dtype=np.complex128, dtype=np.complex128,
), ),
dims=["frequency"], dims=["frequency", "m", "n"],
coords=dict( coords=dict(
frequency=self.frequency, frequency=self.frequency,
m=list(self.ports),
n=list(self.ports),
), ),
) )
for frequency in self.frequency:
s.loc[dict(frequency=frequency)] = self.get_b_over_a(frequency=frequency)
return s
def vna_capture(self, frequency: npt.ArrayLike, callback: Callable[int, int] | None): total_count = len(measurements) * len(s.frequency)
s = xr.DataArray( count = 0
np.empty(len(frequency), dtype=np.complex128),
dims=["frequency"], for m in s.m.data:
coords=dict( for n in s.n.data:
frequency=frequency, if (m, n) in measurements:
), self.set_switches(b=m - 1, a=n - 1)
)
for ff, freq in enumerate(s.frequency.data): for ff, freq in enumerate(s.frequency.data):
if callback is not None: if callback is not None:
# report progress during sweep # report progress during sweep
callback(ff, len(s.frequency)) callback(count, total_count)
self.set_output(frequency=freq, power=-5)
self.sdr.rx_destroy_buffer() self.set_output(frequency=freq, power=-5)
self.sdr.rx_lo = int(freq) self.sdr.rx_destroy_buffer()
self.sdr.rx_enabled_channels = [0, 1] self.sdr.rx_lo = int(freq)
self.sdr.gain_control_mode_chan0 = "manual" self.sdr.rx_enabled_channels = [0, 1]
self.sdr.gain_control_mode_chan1 = "manual" self.sdr.gain_control_mode_chan0 = "manual"
self.sdr.rx_hardwaregain_chan0 = 40 self.sdr.gain_control_mode_chan1 = "manual"
self.sdr.rx_hardwaregain_chan1 = 40 self.sdr.rx_hardwaregain_chan0 = 40
rx = self.sdr.rx() self.sdr.rx_hardwaregain_chan1 = 40
s.loc[dict(frequency=freq)] = np.mean(rx[1] / rx[0]) rx = self.sdr.rx()
s.loc[dict(frequency=freq, m=m, n=n)] = np.mean(rx[1] / rx[0])
count += 1
if callback is not None: if callback is not None:
# mark capture as complete # mark capture as complete
callback(len(s.frequency), len(s.frequency)) callback(total_count, total_count)
return s return s
def calibrate_sol(self, prompt: Callable[[str], None] | None = None, **kwargs) -> None:
if len(self.ports) != 1:
raise ValueError(
f"SOL calibration needs only one port but {len(self.ports)} ports are enabled. "
"Did you mean to use SOLT?"
)
if prompt is None:
prompt = lambda s: input(f"{s}\nENTER to continue...")
ideal = rf.media.DefinedGammaZ0(frequency=rf.media.Frequency.from_f(self.frequency, unit="Hz"))
ideals = [ideal.short(), ideal.open(), ideal.load(0)]
names = ["short", "open", "load"]
measured = list()
for name in names:
prompt(f"Connect standard {name} to port {self.ports[0]}")
measured.append(self.capture(**kwargs))
cal = rf.OnePort(measured=[s2net(m) for m in measured], ideals=ideals)
self.calibration = cal
def calibrate_solt(self, prompt: Callable[[str], None] | None = None, **kwargs) -> None:
if len(self.ports) < 2:
raise ValueError(
f"SOLT calibration needs at least two ports but {len(self.ports)} ports are enabled. "
"Did you mean to use SOL?"
)
if len(self.ports) > 2:
raise NotImplementedError("SOLT calibration with more than two ports not yet supported")
if prompt is None:
prompt = lambda s: input(f"{s}\nENTER to continue...")
ideal = rf.media.DefinedGammaZ0(frequency=rf.media.Frequency.from_f(self.frequency, unit="Hz"))
ideals = [ideal.short(), ideal.open(), ideal.load(0)]
ideals = [rf.two_port_reflect(id, id) for id in ideals]
thru = np.zeros((len(self.frequency), 2, 2), dtype=np.complex128)
thru[:, 0, 1] = 1
thru[:, 1, 0] = 1
thru = rf.Network(frequency=self.frequency, f_unit="Hz", s=thru)
ideals.append(thru)
names_1p = ["short", "open", "load"]
names_2p = ["thru"]
measured = list()
for name in names_1p:
measured_param = list()
for port in self.ports:
prompt(f"Connect standard {name} to port {port}")
measured_param.append(self.capture(measurements=[(port, port)], **kwargs).sel(m=port, n=port))
measured.append(rf.two_port_reflect(*[s2net(m) for m in measured_param]))
for name in names_2p:
prompt(f"Connect standard {name} between ports {self.ports[0]} and {self.ports[1]}")
measured.append(s2net(self.capture(**kwargs)))
cal = rf.SOLT(measured=measured, ideals=ideals)
self.calibration = cal
def save_calibration(self, path: Path | str):
path = Path(path)
if path.suffix.lower() == ".pkl":
with open(str(path), "wb") as f:
pickle.dump(self.calibration, f)
else:
raise NotImplementedError(f"Unknown calibration file extension: {path.suffix}")
def load_calibration(self, path: Path | str):
path = Path(path)
if path.suffix.lower() == ".pkl":
with open(str(path), "rb") as f:
cal = pickle.load(f)
if not isinstance(cal, rf.calibration.Calibration):
raise ValueError(f"Expected {rf.calibration.Calibration}, got {type(cal)}")
self.calibration = cal
else:
raise NotImplementedError(f"Unknown calibration file extension: {path.suffix}")
# %% # %%
if __name__ == "__main__": if __name__ == "__main__":

81
src/charon_vna/vna_dev.py Normal file
View File

@@ -0,0 +1,81 @@
# %% imports
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.ticker import EngFormatter
from charon_vna.util import db20, net2s, s2net
from charon_vna.vna import Charon
# %%
frequency = np.linspace(80e6, 280e6, 301)
# %%
vna = Charon(frequency=frequency, ports=2)
# %%
s = vna.capture()
# %%
for m in s.m.data:
for n in s.n.data:
plt.plot(s.frequency, db20(s.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$")
plt.grid(True)
plt.legend()
plt.show()
# %%
vna.calibrate_sol()
# %%
vna.calibrate_solt()
# %%
vna.save_calibration("./calibration.pkl")
# %%
vna.load_calibration("./calibration.pkl")
# %%
s2 = net2s(vna.calibration.apply_cal(s2net(s)))
# s2.coords["m"] = s.m
# s2.coords["n"] = s.n
for m in s.m.data:
for n in s.n.data:
plt.plot(s.frequency, db20(s.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$ (uncalibrated)")
plt.plot(s2.frequency, db20(s2.sel(m=m, n=n)), label="$S_{" + str(m) + str(n) + "}$ (calibrated)")
plt.grid(True)
plt.legend()
plt.xlabel("Frequency [Hz]")
plt.ylabel("Magnitude [dB]")
# plt.ylim(-30, 5)
plt.ylim(-25, 5)
plt.xlim(s.frequency[0], s.frequency[-1])
plt.gca().xaxis.set_major_formatter(EngFormatter())
plt.tight_layout()
plt.show()
for m in s.m.data:
for n in s.n.data:
if m != n:
plt.plot(
s.frequency,
np.angle(s.sel(m=m, n=n), deg=True),
label="$S_{" + str(m) + str(n) + "}$ (uncalibrated)",
)
plt.plot(
s2.frequency,
np.angle(s2.sel(m=m, n=n), deg=True),
label="$S_{" + str(m) + str(n) + "}$ (calibrated)",
)
plt.grid(True)
plt.legend()
plt.ylabel("Phase [deg]")
plt.xlabel("Frequency [Hz]")
plt.xlim(s.frequency[0], s.frequency[-1])
plt.gca().xaxis.set_major_formatter(EngFormatter())
plt.tight_layout()
plt.show()
# %%