# -*- coding: utf-8 -*-
"""
Contains code for the VideoMode tools
"""
# %%
import time
import datetime
import threading
import numpy as np
import logging
import numbers
import pyqtgraph
from qcodes.instrument.parameter import Parameter
from qcodes.utils.validators import Numbers
import qtt
from qtt.gui.live_plotting import livePlot
from qtt.utilities.tools import connect_slot
import qtpy.QtWidgets as QtWidgets
import qtpy.QtCore
from qtt.measurements.videomode_processor import VideomodeSawtoothMeasurement
# %%
[docs]def add_sawtooth_videomode_processor(videomode, sweepparams, sweepranges, resolution, sample_rate, minstrument):
""" Add all required variables to the VideoMode for the VideomodeSawtoothMeasurement """
videomode.resolution = resolution
videomode.sample_rate = sample_rate
videomode.minstrumenthandle = minstrument[0]
channels = minstrument[1]
if isinstance(channels, int):
channels = [channels]
videomode.channels = channels
videomode.videomode_processor = VideomodeSawtoothMeasurement(videomode.station)
sampling_frequency = videomode.videomode_processor.parse_instrument(videomode.minstrumenthandle, sample_rate)
videomode.videomode_processor.set_scan_parameters(
{'sweepparams': sweepparams, 'sweepranges': sweepranges, 'minstrument': minstrument,
'resolution': videomode.resolution, 'sampling_frequency': sampling_frequency, 'channels': channels})
# %%
[docs]class VideoMode:
""" Controls the videomode tool.
The VideoMode tools allows for fast plotting of measurement results. The basic operation of the VideoMode consists
of the following stages:
i) Initialize. For example start a periodic waveform on the AWG
ii) Start the readout. This starts a loop with the following steps:
- Measure data
- Post-process data
- Plot data
The loop continues running in the background untill the user aborts the loop.
iii) Stop the readout. This stops the measure-process-plot loop
iv) Stop. This stops all activity (e.g. both the readout loop and and activity on the AWG)
Attributes:
station (qcodes station): contains all the information about the set-up
videomode_processor (VideoModeProcessor): class performing the measurements and post-processing
Naverage (Parameter): the number of times the raw measurement data should be averaged
"""
videomode_class_index = 0
def __init__(self, station, sweepparams=None, sweepranges=None, minstrument=None, nplots=None, Naverage=10,
resolution=(96, 96), sample_rate='default', diff_dir=None, verbose=1,
dorun=True, show_controls=True, add_ppt=True, crosshair=False, averaging=True, name=None,
mouse_click_callback=None, videomode_processor=None):
""" Tool for fast acquisition of charge stability diagram
The acquisition and post-processing of data is performed by a VideoModeProcessor.
The class assumes that the station has a station.digitizer and a station.awg.
Args:
station (QCoDeS station): reference for the instruments
sweepparams (list or dict or str): parameters to sweep
sweepranges (list): list of sweep ranges
minstrument (object): specification of the measurement instrument
crosshair (bool): Enable crosshair in plots
videomode_processor (None or VideoModeProcessor): Processor to use for measurement and post-processing
mouse_click_callback (None or function): if None then update scan position with callback
"""
if VideoMode.videomode_class_index == 0:
# create instance of QApplication
_ = pyqtgraph.mkQApp()
VideoMode.videomode_class_index += 1
self.videomode_index = VideoMode.videomode_class_index
self.station = station
self.verbose = verbose
self.sweepparams = sweepparams
if isinstance(sweepranges, numbers.Number):
sweepranges = [sweepranges]
self.sweepranges = sweepranges
self.virtual_awg = getattr(station, ' virtual_awg', None)
self.Naverage = Parameter('Naverage', get_cmd=self._get_Naverage,
set_cmd=self._set_Naverage, vals=Numbers(1, 1023))
# set core VideoMode properties
self.diff_dir = diff_dir
self.smoothing = 0
self.datalock = threading.Lock()
self.datafunction_result = None
self.update_sleep = 1e-5
self.diffsigma = 1
self.diff = True
self.laplace = False
self.idx = 0
self.name = ''
self.maxidx = None
self.datafunction = None
self.alldata = None
self._averaging_enabled = None
self.fps = qtt.pgeometry.fps_t(nn=6)
if videomode_processor is None:
add_sawtooth_videomode_processor(self, sweepparams, sweepranges, resolution, sample_rate, minstrument)
# Setup the GUI
if nplots is None:
nplots = len(self.channels)
else:
self.videomode_processor = videomode_processor
self.set_videomode_name(name)
self.nplots = nplots
self.window_title = "%s: nplots %d" % (
self.name, self.nplots)
self._create_gui(nplots, show_controls, add_ppt, crosshair, Naverage, averaging)
self.Naverage(Naverage)
if mouse_click_callback is None:
mouse_click_callback = self._update_position
for p in self.lp:
p.sigMouseClicked.connect(mouse_click_callback)
if dorun:
self.run()
[docs] def set_videomode_name(self, name):
""" Set the name for this instance of the tool """
if name is None:
name = 'VideoMode %d' % self.videomode_index
name = self.videomode_processor.extend_videomode_name(name)
self.name = name
def _create_gui(self, number_of_plots, show_controls, add_ppt, crosshair, Naverage, averaging):
win = QtWidgets.QWidget()
win.setWindowTitle(self.window_title)
self.mainwin = win
vertLayout = QtWidgets.QVBoxLayout()
self.topLayout = None
if show_controls:
topLayout = QtWidgets.QHBoxLayout()
win.start_button = QtWidgets.QPushButton('Start')
win.stop_button = QtWidgets.QPushButton('Stop')
topLayout.addWidget(win.start_button)
topLayout.addWidget(win.stop_button)
if add_ppt:
win.ppt_button = QtWidgets.QPushButton('Copy to PPT')
win.ppt_button.clicked.connect(connect_slot(self.addPPT))
topLayout.addWidget(win.ppt_button)
win.averaging_box = QtWidgets.QCheckBox('Averaging')
for b in [win.start_button, win.stop_button]:
b.setMaximumHeight(24)
topLayout.addWidget(win.averaging_box)
vertLayout.addLayout(topLayout)
self.topLayout = topLayout
win.setLayout(vertLayout)
self.plotLayout = QtWidgets.QHBoxLayout()
vertLayout.addLayout(self.plotLayout)
self.lp = []
plot_dimension = self.videomode_processor.scan_dimension()
for ii in range(number_of_plots):
lp = livePlot(self.videomode_processor, self.station.gates,
self.sweepparams, self.sweepranges, show_controls=False,
plot_title=self.videomode_processor.plot_title(ii), plot_dimension=plot_dimension)
self.lp.append(lp)
self.plotLayout.addWidget(self.lp[ii].win)
if show_controls:
self.mainwin.averaging_box.clicked.connect(
connect_slot(self.enable_averaging_slot))
self.mainwin.start_button.clicked.connect(connect_slot(self.run))
self.mainwin.stop_button.clicked.connect(connect_slot(self.stop))
self.box = self._create_naverage_box(Naverage=Naverage)
self.topLayout.addWidget(self.box)
self.setGeometry = self.mainwin.setGeometry
self.mainwin.resize(800, 600)
self.mainwin.show()
self.timer = qtpy.QtCore.QTimer()
self.timer.timeout.connect(self.updatebg)
self.crosshair(show=crosshair)
self.enable_averaging_slot(averaging=averaging)
def __repr__(self):
s = '%s: %s at 0x%x' % (self.__class__.__name__, self.name, id(self))
return s
def _update_position(self, position, verbose=1):
""" Update position of gates with selected point
Args:
position (array): point with new coordinates
verbose (int): verbosity level
"""
if isinstance(self.videomode_processor, VideomodeSawtoothMeasurement):
self.videomode_processor.update_position(position, verbose=verbose)
[docs] def enable_averaging_slot(self, averaging=None, *args, **kwargs):
""" Update the averaging mode of the widget """
if averaging is None:
self._averaging_enabled = self.mainwin.averaging_box.checkState()
else:
self._averaging_enabled = averaging
self.mainwin.averaging_box.setChecked(self._averaging_enabled)
for l in self.lp:
l.enable_averaging(self._averaging_enabled)
[docs] def addPPT(self):
""" Copy image of videomode window to PPT """
isrunning = self.is_running()
if isrunning:
self.stopreadout() # prevent multi-threading issues
time.sleep(0.2)
setting_notes = 'moving averaging enabled: %d\n' % self._averaging_enabled
setting_notes += 'number of averages %d\n' % self.Naverage()
qtt.utilities.tools.addPPTslide(fig=self, title='VideoMode %s' % self.name,
notes=self.station,
extranotes='date: %s' % (
qtt.data.dateString(),) + '\n' + 'videomode_processor: ' + str(
self.videomode_processor.ppt_notes()) + '\n\n' + setting_notes)
if isrunning:
self.startreadout()
[docs] def updatebg(self):
""" Update function for the tool
Calls the videomode_processor.measure() and videomode_processor.process() and updates the GUI
"""
if self.idx % 10 == 0:
logging.debug('%s: updatebg %d' %
(self.__class__.__name__, self.idx))
self.idx = self.idx + 1
self.fps.addtime(time.time())
try:
measured_data = self.videomode_processor.measure(self)
datafunction_result = self.videomode_processor.process(measured_data, self)
self.datafunction_result = datafunction_result
for ii, d in enumerate(datafunction_result):
self.lp[ii].update(data=d, processevents=False)
pyqtgraph.mkQApp().processEvents()
except Exception as ex:
logging.exception(ex)
print('%s: Exception in updatebg, stopping readout' %
self.__class__.__name__)
self.stopreadout()
self.mainwin.setWindowTitle(
self.window_title + ' %.1f [fps]' % self.fps.framerate())
if self.fps.framerate() < 10:
time.sleep(0.1)
time.sleep(self.update_sleep)
[docs] def is_running(self):
""" Return True if the readout loop is running """
return self.timer.isActive()
[docs] def startreadout(self, callback=None, rate=30, maxidx=None):
""" Start the readout loop
Args:
rate (float): sample rate in ms
"""
if maxidx is not None:
self.maxidx = maxidx
if callback is not None:
self.datafunction = callback
self.timer.start(int(1000 * (1. / rate)))
if self.verbose:
print('%s: start readout' % (self.__class__.__name__,))
[docs] def stopreadout(self):
""" Stop the readout loop """
if self.verbose:
print('%s: stop readout' % (self.__class__.__name__,))
self.timer.stop()
self.mainwin.setWindowTitle(self.window_title + ' stopped')
def _create_naverage_box(self, Naverage=1):
box = QtWidgets.QSpinBox()
box.setButtonSymbols(QtWidgets.QAbstractSpinBox.NoButtons)
# do not emit signals when still editing
box.setKeyboardTracking(False)
box.setMinimum(1)
box.setMaximum(1023)
box.setPrefix('Naverage: ')
box.setValue(Naverage)
box.setMaximumWidth(120)
box.valueChanged.connect(self.Naverage.set)
return box
[docs] def close(self):
""" Stop the videomode and close the GUI"""
self.stop()
if self.verbose >= 2:
print(f'{self.__class__}: close')
for liveplot in self.lp:
liveplot.stopreadout()
liveplot.deleteLater()
if self.verbose >= 2:
print(f'{self.__class__}: call mainwin.close')
self.mainwin.close()
[docs] def get_dataset(self):
""" Return latest recorded dataset
Returns:
alldata (dataset or list of datasets)
"""
with self.datalock:
data = self.datafunction_result
if data is not None:
data = np.array(data)
self.alldata = self._makeDataset(data, Naverage=None)
else:
self.alldata = None
return self.alldata
[docs] def initialize(self):
""" Initialize the videomode tool for the readout loop """
self.videomode_processor.initialize(self)
[docs] def run(self, start_readout=True):
""" Initialize the tool and start the readout loop """
if self.verbose:
print('%s: run ' % (self.__class__.__name__,))
self.initialize()
if start_readout:
if self.verbose:
print('%s: run: startreadout' % (self.__class__.__name__,))
self.startreadout()
[docs] def stop(self):
""" Stops the readout loop and the input signals """
self.stopreadout()
self.stop_videomode()
[docs] def stop_videomode(self):
if self.verbose:
print('%s: stop_videomode ' % (self.__class__.__name__,))
self.videomode_processor.stop()
[docs] def single(self):
""" Do a single scan with a lot averaging.
Note: this does not yet support the usage of linear combinations of gates (a.k.a. virtual gates).
"""
raise Exception('not implemented')
[docs] def crosshair(self, *args, **kwargs):
""" Enable or disable a crosshair in the plotting windows """
for l in self.lp:
l.crosshair(*args, **kwargs)
def _makeDataset(self, data, Naverage=None):
""" Create datasets from the processed data """
metadata = {'scantime': str(datetime.datetime.now()), 'station': self.station.snapshot(),
'allgatevalues': self.station.gates.allvalues(), 'Naverage': Naverage}
alldata = self.videomode_processor.create_dataset(data, metadata)
return alldata
def _get_Naverage(self):
return self._Naverage_val
def _set_Naverage(self, value):
self._Naverage_val = value
self.box.setValue(value)
[docs] @staticmethod
def all_instances(verbose=1):
""" Return all VideoMode instances """
lst = qtt.pgeometry.list_objects(VideoMode, verbose=verbose)
return lst
[docs] @staticmethod
def get_instance(idx):
""" Return instance by index """
lst = VideoMode.all_instances(verbose=0)
for l in lst:
if l.videomode_index == idx:
return l
return None
[docs] @staticmethod
def stop_all_instances():
""" Stop readout on all all VideoMode instances """
lst = qtt.pgeometry.list_objects(VideoMode)
for v in lst:
v.stopreadout()
[docs] @staticmethod
def destruct():
""" Stop all VideoMode instances and cleanup """
lst = qtt.pgeometry.list_objects(VideoMode)
for v in lst:
v.stopreadout()
v.stop()
v.close()
pyqtgraph.cleanup()
VideoMode.videomode_class_index = 0
# %% Testing
if __name__ == '__main__':
import qtt.simulation.virtual_dot_array
import qtt.instrument_drivers.simulation_instruments
from importlib import reload
reload(qtt.instrument_drivers.simulation_instruments)
reload(qtt.measurements.scans)
from qtt.instrument_drivers.simulation_instruments import SimulationDigitizer
from qtt.instrument_drivers.simulation_instruments import SimulationAWG
from qtt.measurements.videomode_processor import DummyVideoModeProcessor
station = qtt.simulation.virtual_dot_array.initialize()
gates = station.gates
pv = qtt.createParameterWidget([gates]) # type: ignore
verbose = 1
multiprocess = False
digitizer = SimulationDigitizer(
qtt.measurements.scans.instrumentName('sdigitizer'), model=station.model)
station.add_component(digitizer)
station.awg = SimulationAWG(qtt.measurements.scans.instrumentName('vawg'))
station.add_component(station.awg)
dummy_processor = DummyVideoModeProcessor(station)
vm = VideoMode(station, Naverage=25, diff_dir=None, verbose=1,
nplots=1, dorun=False, videomode_processor=dummy_processor)
vm.updatebg()
self = vm
if 0:
sweepparams = [{'P1': 1}, {'P2': 1}]
minstrument = (digitizer.name, [0])
vm = VideoMode(station, sweepparams, sweepranges=[120] * 2,
minstrument=minstrument, resolution=[12] * 2, Naverage=2)
self = vm
if 1:
sweepparams = ['P1', 'P2']
sweepranges = [160, 80]
resolution = [80, 48]
minstrument = (digitizer.name, [0, 1])
else:
sweepparams = 'B0'
sweepranges = 160
resolution = [60]
minstrument = (digitizer.name, [0])
station.model.sdnoise = .1
vm = VideoMode(station, sweepparams, sweepranges, minstrument, Naverage=25,
resolution=resolution, sample_rate='default', diff_dir=None,
verbose=1, nplots=None, dorun=False)
self = vm
self.initialize()
measured_data = self.videomode_processor.measure(vm)
dd = self.videomode_processor.process(measured_data, vm)
vm.updatebg()
vm.run()
vm.get_dataset()
vm.setGeometry(1310, 100, 800, 800)
self = vm
vm.stopreadout()
vm.updatebg()
vm.stop()