Compare commits
6 Commits
505b374e8f
...
776c9cc491
Author | SHA1 | Date | |
---|---|---|---|
776c9cc491 | |||
994080e574 | |||
b6cb1ecde7 | |||
383fe3ceea | |||
9912e318a8 | |||
d8b1a56c99 |
@@ -1,4 +1,8 @@
|
||||
# Charon VNA
|
||||
<!--  -->
|
||||
<!--  -->
|
||||
<!--  -->
|
||||
|
||||
|
||||
Named after [Pluto's moon](https://en.wikipedia.org/wiki/Charon_(moon)), Charon uses the [ADI Pluto SDR](https://www.analog.com/en/resources/evaluation-hardware-and-software/evaluation-boards-kits/adalm-pluto.html) as a vector network analyzer. The basic usage is as a 1 port VNA but this can be extended to arbitrarily many ports with the addition of a couple RF switches.
|
||||
|
||||
|
@@ -3,14 +3,14 @@ import subprocess
|
||||
|
||||
import numpy as np
|
||||
|
||||
from charon_vna.gui import DEFAULT_CONFIG
|
||||
from charon_vna.gui import PATH_CONFIG_DEFAULT
|
||||
|
||||
config = dict(
|
||||
frequency=np.linspace(80e6, 500e6, 500).tolist(),
|
||||
power=-5,
|
||||
)
|
||||
|
||||
with open(DEFAULT_CONFIG, "w") as f:
|
||||
with open(PATH_CONFIG_DEFAULT, "w") as f:
|
||||
json.dump(config, f)
|
||||
|
||||
# autoformat
|
||||
@@ -19,7 +19,7 @@ subprocess.run(
|
||||
"python",
|
||||
"-m",
|
||||
"json.tool",
|
||||
DEFAULT_CONFIG.resolve().as_posix(),
|
||||
DEFAULT_CONFIG.resolve().as_posix(),
|
||||
PATH_CONFIG_DEFAULT.resolve().as_posix(),
|
||||
PATH_CONFIG_DEFAULT.resolve().as_posix(),
|
||||
]
|
||||
)
|
||||
|
@@ -3,6 +3,7 @@ import json
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import webbrowser
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
@@ -14,7 +15,6 @@ from numpy import typing as npt
|
||||
from PySide6.QtGui import QAction, QKeySequence
|
||||
from PySide6.QtWidgets import (
|
||||
QApplication,
|
||||
QDialogButtonBox,
|
||||
QFileDialog,
|
||||
QInputDialog,
|
||||
QLineEdit,
|
||||
@@ -31,7 +31,7 @@ from charon_vna.util import net2s, s2net
|
||||
from charon_vna.vna import Charon
|
||||
|
||||
# %%
|
||||
DEFAULT_CONFIG = Path(__file__).parent / "config_default.json"
|
||||
PATH_CONFIG_DEFAULT = Path(__file__).parent / "config_default.json"
|
||||
CONFIG_SUFFIX = ".json"
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ class MainWindow(QMainWindow):
|
||||
def __init__(self, ip: str | None = None):
|
||||
super().__init__()
|
||||
|
||||
self.config_path = DEFAULT_CONFIG
|
||||
self.config_path = PATH_CONFIG_DEFAULT
|
||||
with open(self.config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
self._frequency = config["frequency"]
|
||||
@@ -97,6 +97,19 @@ class MainWindow(QMainWindow):
|
||||
action_cal_solt.triggered.connect(self.calibrate_solt)
|
||||
menu_calibration.addAction(action_cal_solt)
|
||||
|
||||
menu_help = QMenu("&Help")
|
||||
menubar.addMenu(menu_help)
|
||||
action_open_homepage = QAction("&Documentation", self)
|
||||
action_open_homepage.triggered.connect(
|
||||
lambda: webbrowser.open("https://git.brendanhaines.com/brendanhaines/charon_vna")
|
||||
)
|
||||
menu_help.addAction(action_open_homepage)
|
||||
action_report_issue = QAction("&Report an Issue", self)
|
||||
action_report_issue.triggered.connect(
|
||||
lambda: webbrowser.open("https://git.brendanhaines.com/brendanhaines/charon_vna/issues")
|
||||
)
|
||||
menu_help.addAction(action_report_issue)
|
||||
|
||||
# Content
|
||||
window_layout = QVBoxLayout()
|
||||
|
||||
@@ -129,6 +142,7 @@ class MainWindow(QMainWindow):
|
||||
def saveas_config(self) -> None:
|
||||
print("Prompting for save path...")
|
||||
dialog = QFileDialog(self)
|
||||
dialog.setNameFilter(f"*{CONFIG_SUFFIX}")
|
||||
dialog.setDefaultSuffix(CONFIG_SUFFIX)
|
||||
dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
|
||||
if dialog.exec():
|
||||
@@ -137,8 +151,8 @@ class MainWindow(QMainWindow):
|
||||
raise ValueError(
|
||||
f"{config_path.name} is not a valid configuration file. Must have extension {CONFIG_SUFFIX}"
|
||||
)
|
||||
if config_path == DEFAULT_CONFIG:
|
||||
raise ValueError(f"Cannot overwrite default configuration file at {DEFAULT_CONFIG}")
|
||||
if config_path == PATH_CONFIG_DEFAULT:
|
||||
raise ValueError(f"Cannot overwrite default configuration file at {PATH_CONFIG_DEFAULT}")
|
||||
self.config_path = config_path
|
||||
print(f"Config path is now {self.config_path.resolve()}")
|
||||
|
||||
@@ -162,7 +176,7 @@ class MainWindow(QMainWindow):
|
||||
self.load_config(self.config_path)
|
||||
|
||||
def save_config(self) -> None:
|
||||
if self.config_path == DEFAULT_CONFIG:
|
||||
if self.config_path == PATH_CONFIG_DEFAULT:
|
||||
self.saveas_config()
|
||||
else:
|
||||
print(f"Saving config to {self.config_path.resolve()}")
|
||||
|
@@ -1,11 +1,94 @@
|
||||
import json
|
||||
import shutil
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import List
|
||||
|
||||
import skrf as rf
|
||||
from util import net2s
|
||||
|
||||
|
||||
# scikit-rf has no way to save files aside from touchstone and pickle
|
||||
def cal2zarr(cal: rf.calibration.Calibration, outpath: Path):
|
||||
ideals = [net2s(net) for net in cal.ideals]
|
||||
measured = [net2s(net) for net in cal.measured]
|
||||
# s.to_zarr(outpath)
|
||||
# scikit-rf has no way to save Calibration objects aside from pickle
|
||||
def cal2zip(cal: rf.calibration.Calibration, path: Path | str) -> None:
|
||||
path = Path(path)
|
||||
cal_type = cal.__class__
|
||||
measured: List[rf.network.Network] = cal.measured
|
||||
ideals: List[rf.network.Network] = cal.ideals
|
||||
|
||||
if cal_type not in [rf.calibration.OnePort]:
|
||||
raise NotImplementedError(f"Calibration {cal_type.__name__} serialization not implemented")
|
||||
|
||||
assert len(ideals) == len(measured) # this should have already been asserted when cal was instantiated
|
||||
|
||||
with TemporaryDirectory() as temp:
|
||||
dir_temp = Path(temp)
|
||||
|
||||
with zipfile.ZipFile(dir_temp / "archive.zip", "w") as archive:
|
||||
# create a configuration file
|
||||
filename_config = dir_temp / "config.json"
|
||||
with open(filename_config, "w") as f:
|
||||
json.dump(
|
||||
dict(
|
||||
cal_type=cal_type.__name__,
|
||||
num_standards=len(measured),
|
||||
),
|
||||
f,
|
||||
)
|
||||
archive.write(filename_config, str(filename_config.relative_to(dir_temp)))
|
||||
|
||||
# add standard data
|
||||
dir_ideals = dir_temp / "ideals"
|
||||
dir_ideals.mkdir()
|
||||
for ii, ideal in enumerate(ideals):
|
||||
filename = dir_ideals / f"{ii}.s2p"
|
||||
ideal.write_touchstone(filename)
|
||||
archive.write(filename, str(filename.relative_to(dir_temp)))
|
||||
|
||||
# add test data
|
||||
dir_measured = dir_temp / "measured"
|
||||
dir_measured.mkdir()
|
||||
for ii, meas in enumerate(measured):
|
||||
filename = dir_measured / f"{ii}.s2p"
|
||||
meas.write_touchstone(filename)
|
||||
archive.write(filename, str(filename.relative_to(dir_temp)))
|
||||
|
||||
print("Wrote calibration to file")
|
||||
archive.printdir()
|
||||
|
||||
shutil.move(dir_temp / "archive.zip", path)
|
||||
|
||||
|
||||
def zip2cal(path: Path | str):
|
||||
path = Path(path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Calibration file {path} does not exist")
|
||||
|
||||
with zipfile.ZipFile(path) as archive:
|
||||
archive.printdir()
|
||||
|
||||
config = json.loads(archive.read("config.json"))
|
||||
print(config)
|
||||
|
||||
ideals = list()
|
||||
measured = list()
|
||||
|
||||
with TemporaryDirectory() as temp:
|
||||
dir_temp = Path(temp)
|
||||
|
||||
for ii in range(config["num_standards"]):
|
||||
with open(dir_temp / f"{ii}.s2p", "wb") as f:
|
||||
f.write(archive.read(f"ideals/{ii}.s2p"))
|
||||
ideals.append(rf.network.Network(dir_temp / f"{ii}.s2p"))
|
||||
|
||||
with open(dir_temp / f"{ii}.s2p", "wb") as f:
|
||||
f.write(archive.read(f"measured/{ii}.s2p"))
|
||||
measured.append(rf.network.Network(dir_temp / f"{ii}.s2p"))
|
||||
|
||||
cal_type = config["cal_type"]
|
||||
CalClass = getattr(rf.calibration, cal_type)
|
||||
if not issubclass(CalClass, rf.calibration.Calibration):
|
||||
raise ValueError()
|
||||
|
||||
calibration = CalClass(measured=measured, ideals=ideals)
|
||||
|
||||
return calibration
|
||||
|
@@ -66,14 +66,29 @@ class AD9361Register(IntEnum):
|
||||
AUXDAC2_TX_DELAY = 0x033
|
||||
|
||||
|
||||
@unique
|
||||
class AD9361DacVref(IntEnum):
|
||||
VREF_1V0 = 0b00
|
||||
VREF_1V5 = 0b01
|
||||
VREF_2V0 = 0b10
|
||||
VREF_2V5 = 0b11
|
||||
|
||||
|
||||
@unique
|
||||
class AD9361DacStepFactor(IntEnum):
|
||||
FACTOR_2 = 0b0
|
||||
FACTOR_1 = 0b1
|
||||
|
||||
|
||||
class Charon:
|
||||
FREQUENCY_OFFSET = 1e6
|
||||
DEFAULT_IP = "192.168.2.1"
|
||||
|
||||
calibration: rf.calibration.Calibration | None = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ip: str = "192.168.2.1",
|
||||
ip: str = DEFAULT_IP,
|
||||
frequency: npt.ArrayLike = np.linspace(1e9, 2e9, 3),
|
||||
ports: Tuple[int] = (1,),
|
||||
):
|
||||
@@ -122,8 +137,8 @@ class Charon:
|
||||
self.ctrl.reg_write(AD9361Register.EXTERNAL_LNA_CONTROL, 0x90) # bit 7: AuxDAC Manual, bit 4: GPO Manual
|
||||
self.ctrl.reg_write(AD9361Register.AUXDAC_ENABLE_CONTROL, 0x3F)
|
||||
self._set_gpo(0b0000)
|
||||
self._set_dac(value=0, channel=1)
|
||||
self._set_dac(value=0, channel=2)
|
||||
self._set_dac_code(value=0, channel=1)
|
||||
self._set_dac_code(value=0, channel=2)
|
||||
|
||||
def get_config(self) -> Dict[str, Any]:
|
||||
config = dict()
|
||||
@@ -153,24 +168,24 @@ class Charon:
|
||||
def _set_gpo(self, value: int) -> None:
|
||||
self.ctrl.reg_write(AD9361Register.GPO_FORCE_AND_INIT, (value & 0x0F) << 4) # bits 7-4: GPO3-0
|
||||
|
||||
def _set_dac(self, value: int, channel: Literal[1, 2]):
|
||||
def _set_dac_voltage(self, voltage: float, channel: Literal[1, 2]):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _set_dac_code(
|
||||
self,
|
||||
value: int,
|
||||
channel: Literal[1, 2],
|
||||
vref: AD9361DacVref = AD9361DacVref.VREF_1V0,
|
||||
step_factor: AD9361DacStepFactor = AD9361DacStepFactor.FACTOR_2,
|
||||
) -> None:
|
||||
if channel not in [1, 2]:
|
||||
raise ValueError(f"Invalid channel {channel}. Must be 1 or 2")
|
||||
|
||||
if value > 0x3FF or value < 0:
|
||||
raise ValueError("Invalid value for 10 bit DAC. Must be between 0 and 0x3FF (inclusive)")
|
||||
|
||||
@unique
|
||||
class Vref(IntEnum):
|
||||
VREF_1V0 = 0b00
|
||||
VREF_1V5 = 0b01
|
||||
VREF_2V0 = 0b10
|
||||
VREF_2V5 = 0b11
|
||||
|
||||
@unique
|
||||
class StepFactor(IntEnum):
|
||||
FACTOR_2 = 0b0
|
||||
FACTOR_1 = 0b1
|
||||
vref = AD9361DacVref(vref)
|
||||
step_factor = AD9361DacStepFactor(step_factor)
|
||||
|
||||
# https://www.analog.com/media/cn/technical-documentation/user-guides/ad9364_register_map_reference_manual_ug-672.pdf
|
||||
# page 13
|
||||
@@ -185,26 +200,37 @@ class Charon:
|
||||
)
|
||||
self.ctrl.reg_write(
|
||||
AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"),
|
||||
(value & 0x3) | (Vref.VREF_1V0.value << 2) | (StepFactor.FACTOR_2 << 4),
|
||||
(value & 0x3) | (vref.value << 2) | (step_factor << 4),
|
||||
)
|
||||
|
||||
def _get_dac_code(self, channel: Literal[1, 2]) -> Tuple[float, AD9361DacVref, AD9361DacStepFactor]:
|
||||
word = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_WORD"))
|
||||
config = self.ctrl.reg_read(AD9361Register.__getitem__(f"AUXDAC{channel}_CONFIG"))
|
||||
|
||||
value = (word << 2) + (config & 0x3)
|
||||
vref = AD9361DacVref((config >> 2) & 0x3)
|
||||
step_factor = AD9361DacStepFactor((config >> 4) & 0x1)
|
||||
|
||||
return (value, vref, step_factor)
|
||||
|
||||
def _get_dac_voltage(self) -> float:
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_output_power(self, power: float):
|
||||
# FIXME: this is a hack because I don't want to go through re-calibration
|
||||
if power == 5:
|
||||
tx_gain = -1
|
||||
elif power == 0:
|
||||
tx_gain = -7
|
||||
elif power == -5:
|
||||
tx_gain = -12
|
||||
elif power == -10:
|
||||
tx_gain = -17
|
||||
elif power == -15:
|
||||
tx_gain = -22
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
# # TODO: correct over frequency
|
||||
# tx_gain_idx = np.abs(pout.sel(tx_channel=0) - power).argmin(dim="tx_gain")
|
||||
# tx_gain = pout.coords["tx_gain"][tx_gain_idx]
|
||||
pout = xr.DataArray(
|
||||
[-15, -10, -5, 0, 5],
|
||||
dims=["tx_gain"],
|
||||
coords=dict(
|
||||
# TODO: correct over frequency
|
||||
frequency=1e9, # FIXME: I'm not sure at what frequency I generated this table
|
||||
tx_channel=0,
|
||||
tx_gain=[-22, -17, -12, -7, -1],
|
||||
),
|
||||
)
|
||||
|
||||
tx_gain_idx = np.abs(pout - power).argmin(dim="tx_gain")
|
||||
tx_gain = pout.coords["tx_gain"][tx_gain_idx]
|
||||
|
||||
self.sdr.tx_hardwaregain_chan0 = float(tx_gain)
|
||||
|
||||
def set_output(self, frequency: float, power: float):
|
||||
@@ -290,6 +316,7 @@ class Charon:
|
||||
)
|
||||
for ff, freq in enumerate(s.frequency.data):
|
||||
if callback is not None:
|
||||
# report progress during sweep
|
||||
callback(ff, len(s.frequency))
|
||||
self.set_output(frequency=freq, power=-5)
|
||||
self.sdr.rx_destroy_buffer()
|
||||
@@ -301,7 +328,9 @@ class Charon:
|
||||
self.sdr.rx_hardwaregain_chan1 = 40
|
||||
rx = self.sdr.rx()
|
||||
s.loc[dict(frequency=freq)] = np.mean(rx[1] / rx[0])
|
||||
|
||||
if callback is not None:
|
||||
# mark capture as complete
|
||||
callback(len(s.frequency), len(s.frequency))
|
||||
|
||||
return s
|
||||
|
Reference in New Issue
Block a user