Compare commits
6 Commits
505b374e8f
...
776c9cc491
Author | SHA1 | Date | |
---|---|---|---|
776c9cc491 | |||
994080e574 | |||
b6cb1ecde7 | |||
383fe3ceea | |||
9912e318a8 | |||
d8b1a56c99 |
@@ -1,4 +1,8 @@
|
|||||||
# Charon VNA
|
# Charon VNA
|
||||||
|
<!--  -->
|
||||||
|
<!--  -->
|
||||||
|
<!--  -->
|
||||||
|
|
||||||
|
|
||||||
Named after [Pluto's moon](https://en.wikipedia.org/wiki/Charon_(moon)), Charon uses the [ADI Pluto SDR](https://www.analog.com/en/resources/evaluation-hardware-and-software/evaluation-boards-kits/adalm-pluto.html) as a vector network analyzer. The basic usage is as a 1 port VNA but this can be extended to arbitrarily many ports with the addition of a couple RF switches.
|
Named after [Pluto's moon](https://en.wikipedia.org/wiki/Charon_(moon)), Charon uses the [ADI Pluto SDR](https://www.analog.com/en/resources/evaluation-hardware-and-software/evaluation-boards-kits/adalm-pluto.html) as a vector network analyzer. The basic usage is as a 1 port VNA but this can be extended to arbitrarily many ports with the addition of a couple RF switches.
|
||||||
|
|
||||||
|
@@ -3,14 +3,14 @@ import subprocess
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from charon_vna.gui import DEFAULT_CONFIG
|
from charon_vna.gui import PATH_CONFIG_DEFAULT
|
||||||
|
|
||||||
config = dict(
|
config = dict(
|
||||||
frequency=np.linspace(80e6, 500e6, 500).tolist(),
|
frequency=np.linspace(80e6, 500e6, 500).tolist(),
|
||||||
power=-5,
|
power=-5,
|
||||||
)
|
)
|
||||||
|
|
||||||
with open(DEFAULT_CONFIG, "w") as f:
|
with open(PATH_CONFIG_DEFAULT, "w") as f:
|
||||||
json.dump(config, f)
|
json.dump(config, f)
|
||||||
|
|
||||||
# autoformat
|
# autoformat
|
||||||
@@ -19,7 +19,7 @@ subprocess.run(
|
|||||||
"python",
|
"python",
|
||||||
"-m",
|
"-m",
|
||||||
"json.tool",
|
"json.tool",
|
||||||
DEFAULT_CONFIG.resolve().as_posix(),
|
PATH_CONFIG_DEFAULT.resolve().as_posix(),
|
||||||
DEFAULT_CONFIG.resolve().as_posix(),
|
PATH_CONFIG_DEFAULT.resolve().as_posix(),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
@@ -3,6 +3,7 @@ import json
|
|||||||
import pickle
|
import pickle
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
import webbrowser
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
@@ -14,7 +15,6 @@ from numpy import typing as npt
|
|||||||
from PySide6.QtGui import QAction, QKeySequence
|
from PySide6.QtGui import QAction, QKeySequence
|
||||||
from PySide6.QtWidgets import (
|
from PySide6.QtWidgets import (
|
||||||
QApplication,
|
QApplication,
|
||||||
QDialogButtonBox,
|
|
||||||
QFileDialog,
|
QFileDialog,
|
||||||
QInputDialog,
|
QInputDialog,
|
||||||
QLineEdit,
|
QLineEdit,
|
||||||
@@ -31,7 +31,7 @@ from charon_vna.util import net2s, s2net
|
|||||||
from charon_vna.vna import Charon
|
from charon_vna.vna import Charon
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
DEFAULT_CONFIG = Path(__file__).parent / "config_default.json"
|
PATH_CONFIG_DEFAULT = Path(__file__).parent / "config_default.json"
|
||||||
CONFIG_SUFFIX = ".json"
|
CONFIG_SUFFIX = ".json"
|
||||||
|
|
||||||
|
|
||||||
@@ -44,7 +44,7 @@ class MainWindow(QMainWindow):
|
|||||||
def __init__(self, ip: str | None = None):
|
def __init__(self, ip: str | None = None):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self.config_path = DEFAULT_CONFIG
|
self.config_path = PATH_CONFIG_DEFAULT
|
||||||
with open(self.config_path, "r") as f:
|
with open(self.config_path, "r") as f:
|
||||||
config = json.load(f)
|
config = json.load(f)
|
||||||
self._frequency = config["frequency"]
|
self._frequency = config["frequency"]
|
||||||
@@ -97,6 +97,19 @@ class MainWindow(QMainWindow):
|
|||||||
action_cal_solt.triggered.connect(self.calibrate_solt)
|
action_cal_solt.triggered.connect(self.calibrate_solt)
|
||||||
menu_calibration.addAction(action_cal_solt)
|
menu_calibration.addAction(action_cal_solt)
|
||||||
|
|
||||||
|
menu_help = QMenu("&Help")
|
||||||
|
menubar.addMenu(menu_help)
|
||||||
|
action_open_homepage = QAction("&Documentation", self)
|
||||||
|
action_open_homepage.triggered.connect(
|
||||||
|
lambda: webbrowser.open("https://git.brendanhaines.com/brendanhaines/charon_vna")
|
||||||
|
)
|
||||||
|
menu_help.addAction(action_open_homepage)
|
||||||
|
action_report_issue = QAction("&Report an Issue", self)
|
||||||
|
action_report_issue.triggered.connect(
|
||||||
|
lambda: webbrowser.open("https://git.brendanhaines.com/brendanhaines/charon_vna/issues")
|
||||||
|
)
|
||||||
|
menu_help.addAction(action_report_issue)
|
||||||
|
|
||||||
# Content
|
# Content
|
||||||
window_layout = QVBoxLayout()
|
window_layout = QVBoxLayout()
|
||||||
|
|
||||||
@@ -129,6 +142,7 @@ class MainWindow(QMainWindow):
|
|||||||
def saveas_config(self) -> None:
|
def saveas_config(self) -> None:
|
||||||
print("Prompting for save path...")
|
print("Prompting for save path...")
|
||||||
dialog = QFileDialog(self)
|
dialog = QFileDialog(self)
|
||||||
|
dialog.setNameFilter(f"*{CONFIG_SUFFIX}")
|
||||||
dialog.setDefaultSuffix(CONFIG_SUFFIX)
|
dialog.setDefaultSuffix(CONFIG_SUFFIX)
|
||||||
dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
|
dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
|
||||||
if dialog.exec():
|
if dialog.exec():
|
||||||
@@ -137,8 +151,8 @@ class MainWindow(QMainWindow):
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"{config_path.name} is not a valid configuration file. Must have extension {CONFIG_SUFFIX}"
|
f"{config_path.name} is not a valid configuration file. Must have extension {CONFIG_SUFFIX}"
|
||||||
)
|
)
|
||||||
if config_path == DEFAULT_CONFIG:
|
if config_path == PATH_CONFIG_DEFAULT:
|
||||||
raise ValueError(f"Cannot overwrite default configuration file at {DEFAULT_CONFIG}")
|
raise ValueError(f"Cannot overwrite default configuration file at {PATH_CONFIG_DEFAULT}")
|
||||||
self.config_path = config_path
|
self.config_path = config_path
|
||||||
print(f"Config path is now {self.config_path.resolve()}")
|
print(f"Config path is now {self.config_path.resolve()}")
|
||||||
|
|
||||||
@@ -162,7 +176,7 @@ class MainWindow(QMainWindow):
|
|||||||
self.load_config(self.config_path)
|
self.load_config(self.config_path)
|
||||||
|
|
||||||
def save_config(self) -> None:
|
def save_config(self) -> None:
|
||||||
if self.config_path == DEFAULT_CONFIG:
|
if self.config_path == PATH_CONFIG_DEFAULT:
|
||||||
self.saveas_config()
|
self.saveas_config()
|
||||||
else:
|
else:
|
||||||
print(f"Saving config to {self.config_path.resolve()}")
|
print(f"Saving config to {self.config_path.resolve()}")
|
||||||
|
@@ -1,11 +1,94 @@
|
|||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import zipfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
from typing import List
|
||||||
|
|
||||||
import skrf as rf
|
import skrf as rf
|
||||||
from util import net2s
|
|
||||||
|
|
||||||
|
|
||||||
# scikit-rf has no way to save files aside from touchstone and pickle
|
# scikit-rf has no way to save Calibration objects aside from pickle
|
||||||
def cal2zarr(cal: rf.calibration.Calibration, outpath: Path):
|
def cal2zip(cal: rf.calibration.Calibration, path: Path | str) -> None:
|
||||||
ideals = [net2s(net) for net in cal.ideals]
|
path = Path(path)
|
||||||
measured = [net2s(net) for net in cal.measured]
|
cal_type = cal.__class__
|
||||||
# s.to_zarr(outpath)
|
measured: List[rf.network.Network] = cal.measured
|
||||||
|
ideals: List[rf.network.Network] = cal.ideals
|
||||||
|
|
||||||
|
if cal_type not in [rf.calibration.OnePort]:
|
||||||
|
raise NotImplementedError(f"Calibration {cal_type.__name__} serialization not implemented")
|
||||||
|
|
||||||
|
assert len(ideals) == len(measured) # this should have already been asserted when cal was instantiated
|
||||||
|
|
||||||
|
with TemporaryDirectory() as temp:
|
||||||
|
dir_temp = Path(temp)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(dir_temp / "archive.zip", "w") as archive:
|
||||||
|
# create a configuration file
|
||||||
|
filename_config = dir_temp / "config.json"
|
||||||
|
with open(filename_config, "w") as f:
|
||||||
|
json.dump(
|
||||||
|
dict(
|
||||||
|
cal_type=cal_type.__name__,
|
||||||
|
num_standards=len(measured),
|
||||||
|
),
|
||||||
|
f,
|
||||||
|
)
|
||||||
|
archive.write(filename_config, str(filename_config.relative_to(dir_temp)))
|
||||||
|
|
||||||
|
# add standard data
|
||||||
|
dir_ideals = dir_temp / "ideals"
|
||||||
|
dir_ideals.mkdir()
|
||||||
|
for ii, ideal in enumerate(ideals):
|
||||||
|
filename = dir_ideals / f"{ii}.s2p"
|
||||||
|
ideal.write_touchstone(filename)
|
||||||
|
archive.write(filename, str(filename.relative_to(dir_temp)))
|
||||||
|
|
||||||
|
# add test data
|
||||||
|
dir_measured = dir_temp / "measured"
|
||||||
|
dir_measured.mkdir()
|
||||||
|
for ii, meas in enumerate(measured):
|
||||||
|
filename = dir_measured / f"{ii}.s2p"
|
||||||
|
meas.write_touchstone(filename)
|
||||||
|
archive.write(filename, str(filename.relative_to(dir_temp)))
|
||||||
|
|
||||||
|
print("Wrote calibration to file")
|
||||||
|
archive.printdir()
|
||||||
|
|
||||||
|
shutil.move(dir_temp / "archive.zip", path)
|
||||||
|
|
||||||
|
|
||||||
|
def zip2cal(path: Path | str):
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"Calibration file {path} does not exist")
|
||||||
|
|
||||||
|
with zipfile.ZipFile(path) as archive:
|
||||||
|
archive.printdir()
|
||||||
|
|
||||||
|
config = json.loads(archive.read("config.json"))
|
||||||
|
print(config)
|
||||||
|
|
||||||
|
ideals = list()
|
||||||
|
measured = list()
|
||||||
|
|
||||||
|
with TemporaryDirectory() as temp:
|
||||||
|
dir_temp = Path(temp)
|
||||||
|
|
||||||
|
for ii in range(config["num_standards"]):
|
||||||
|
with open(dir_temp / f"{ii}.s2p", "wb") as f:
|
||||||
|
f.write(archive.read(f"ideals/{ii}.s2p"))
|
||||||
|
ideals.append(rf.network.Network(dir_temp / f"{ii}.s2p"))
|
||||||
|
|
||||||
|
with open(dir_temp / f"{ii}.s2p", "wb") as f:
|
||||||
|
f.write(archive.read(f"measured/{ii}.s2p"))
|
||||||
|
measured.append(rf.network.Network(dir_temp / f"{ii}.s2p"))
|
||||||
|
|
||||||
|
cal_type = config["cal_type"]
|
||||||
|
CalClass = getattr(rf.calibration, cal_type)
|
||||||
|
if not issubclass(CalClass, rf.calibration.Calibration):
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
|
calibration = CalClass(measured=measured, ideals=ideals)
|
||||||
|
|
||||||
|
return calibration
|
||||||
|
@@ -66,14 +66,29 @@ class AD9361Register(IntEnum):
|
|||||||
AUXDAC2_TX_DELAY = 0x033
|
AUXDAC2_TX_DELAY = 0x033
|
||||||
|
|
||||||
|
|
||||||
|
@unique
|
||||||
|
class AD9361DacVref(IntEnum):
|
||||||
|
VREF_1V0 = 0b00
|
||||||
|
VREF_1V5 = 0b01
|
||||||
|
VREF_2V0 = 0b10
|
||||||
|
VREF_2V5 = 0b11
|
||||||
|
|
||||||
|
|
||||||
|
@unique
|
||||||
|
class AD9361DacStepFactor(IntEnum):
|
||||||
|
FACTOR_2 = 0b0
|
||||||
|
FACTOR_1 = 0b1
|
||||||
|
|
||||||
|
|
||||||
class Charon:
|
class Charon:
|
||||||
FREQUENCY_OFFSET = 1e6
|
FREQUENCY_OFFSET = 1e6
|
||||||
|
DEFAULT_IP = "192.168.2.1"
|
||||||
|
|
||||||
calibration: rf.calibration.Calibration | None = None
|
calibration: rf.calibration.Calibration | None = None
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
ip: str = "192.168.2.1",
|
ip: str = DEFAULT_IP,
|
||||||
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
|
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
|
||||||
ports: Tuple[int] = (1,),
|
ports: Tuple[int] = (1,),
|
||||||
):
|
):
|
||||||
@@ -122,8 +137,8 @@ class Charon:
|
|||||||
self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
|
self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
|
||||||
self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F)
|
self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F)
|
||||||
self._set_gpo(0b0000)
|
self._set_gpo(0b0000)
|
||||||
self._set_dac(value=0, channel=1)
|
self._set_dac_code(value=0, channel=1)
|
||||||
self._set_dac(value=0, channel=2)
|
self._set_dac_code(value=0, channel=2)
|
||||||
|
|
||||||
def get_config(self) -> Dict[str, Any]:
|
def get_config(self) -> Dict[str, Any]:
|
||||||
config = dict()
|
config = dict()
|
||||||
@@ -153,24 +168,24 @@ class Charon:
|
|||||||
def _set_gpo(self, value: int) -> None:
|
def _set_gpo(self, value: int) -> None:
|
||||||
self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0
|
self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0
|
||||||
|
|
||||||
def _set_dac(self, value: int, channel: Literal[1, 2]):
|
def _set_dac_voltage(self, voltage: float, channel: Literal[1, 2]):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def _set_dac_code(
|
||||||
|
self,
|
||||||
|
value: int,
|
||||||
|
channel: Literal[1, 2],
|
||||||
|
vref: AD9361DacVref = AD9361DacVref.VREF_1V0,
|
||||||
|
step_factor: AD9361DacStepFactor = AD9361DacStepFactor.FACTOR_2,
|
||||||
|
) -> None:
|
||||||
if channel not in [1, 2]:
|
if channel not in [1, 2]:
|
||||||
raise ValueError(f"Invalid channel {channel}. Must be 1 or 2")
|
raise ValueError(f"Invalid channel {channel}. Must be 1 or 2")
|
||||||
|
|
||||||
if value > 0x3FF or value < 0:
|
if value > 0x3FF or value < 0:
|
||||||
raise ValueError("Invalid value for 10 bit DAC. Must be between 0 and 0x3FF (inclusive)")
|
raise ValueError("Invalid value for 10 bit DAC. Must be between 0 and 0x3FF (inclusive)")
|
||||||
|
|
||||||
@unique
|
vref = AD9361DacVref(vref)
|
||||||
class Vref(IntEnum):
|
step_factor = AD9361DacStepFactor(step_factor)
|
||||||
VREF_1V0 = 0b00
|
|
||||||
VREF_1V5 = 0b01
|
|
||||||
VREF_2V0 = 0b10
|
|
||||||
VREF_2V5 = 0b11
|
|
||||||
|
|
||||||
@unique
|
|
||||||
class StepFactor(IntEnum):
|
|
||||||
FACTOR_2 = 0b0
|
|
||||||
FACTOR_1 = 0b1
|
|
||||||
|
|
||||||
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
|
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
|
||||||
# page 13
|
# page 13
|
||||||
@@ -185,26 +200,37 @@ class Charon:
|
|||||||
)
|
)
|
||||||
self.ctrl.reg_write(
|
self.ctrl.reg_write(
|
||||||
AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"),
|
AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"),
|
||||||
(value & 0x3) | (Vref.VREF_1V0.value << 2) | (StepFactor.FACTOR_2 << 4),
|
(value & 0x3) | (vref.value << 2) | (step_factor << 4),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]:
|
||||||
|
word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD"))
|
||||||
|
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"))
|
||||||
|
|
||||||
|
value = (word << 2) + (config & 0x3)
|
||||||
|
vref = AD9361DacVref((config >> 2) & 0x3)
|
||||||
|
step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
|
||||||
|
|
||||||
|
return (value, vref, step_factor)
|
||||||
|
|
||||||
|
def _get_dac_voltage(self) -> float:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
def set_output_power(self, power: float):
|
def set_output_power(self, power: float):
|
||||||
# FIXME: this is a hack because I don't want to go through re-calibration
|
pout = xr.DataArray(
|
||||||
if power == 5:
|
[-15, -10, -5, 0, 5],
|
||||||
tx_gain = -1
|
dims=["tx_gain"],
|
||||||
elif power == 0:
|
coords=dict(
|
||||||
tx_gain = -7
|
# TODO: correct over frequency
|
||||||
elif power == -5:
|
frequency=1e9, # FIXME: I'm not sure at what frequency I generated this table
|
||||||
tx_gain = -12
|
tx_channel=0,
|
||||||
elif power == -10:
|
tx_gain=[-22, -17, -12, -7, -1],
|
||||||
tx_gain = -17
|
),
|
||||||
elif power == -15:
|
)
|
||||||
tx_gain = -22
|
|
||||||
else:
|
tx_gain_idx = np.abs(pout - power).argmin(dim="tx_gain")
|
||||||
raise NotImplementedError()
|
tx_gain = pout.coords["tx_gain"][tx_gain_idx]
|
||||||
# # TODO: correct over frequency
|
|
||||||
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
|
|
||||||
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
|
|
||||||
self.sdr.tx_hardwaregain_chan0 = float(tx_gain)
|
self.sdr.tx_hardwaregain_chan0 = float(tx_gain)
|
||||||
|
|
||||||
def set_output(self, frequency: float, power: float):
|
def set_output(self, frequency: float, power: float):
|
||||||
@@ -290,6 +316,7 @@ class Charon:
|
|||||||
)
|
)
|
||||||
for ff, freq in enumerate(s.frequency.data):
|
for ff, freq in enumerate(s.frequency.data):
|
||||||
if callback is not None:
|
if callback is not None:
|
||||||
|
# report progress during sweep
|
||||||
callback(ff, len(s.frequency))
|
callback(ff, len(s.frequency))
|
||||||
self.set_output(frequency=freq, power=-5)
|
self.set_output(frequency=freq, power=-5)
|
||||||
self.sdr.rx_destroy_buffer()
|
self.sdr.rx_destroy_buffer()
|
||||||
@@ -301,7 +328,9 @@ class Charon:
|
|||||||
self.sdr.rx_hardwaregain_chan1 = 40
|
self.sdr.rx_hardwaregain_chan1 = 40
|
||||||
rx = self.sdr.rx()
|
rx = self.sdr.rx()
|
||||||
s.loc[dict(frequency=freq)] = np.mean(rx[1] / rx[0])
|
s.loc[dict(frequency=freq)] = np.mean(rx[1] / rx[0])
|
||||||
|
|
||||||
if callback is not None:
|
if callback is not None:
|
||||||
|
# mark capture as complete
|
||||||
callback(len(s.frequency), len(s.frequency))
|
callback(len(s.frequency), len(s.frequency))
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
Reference in New Issue
Block a user