Source code for qtt.gui.live_plotting

# %% Load packages
import time
import logging
import numpy as np
from functools import partial
import qtpy.QtWidgets as QtWidgets
import qtpy.QtCore as QtCore
from qtpy.QtCore import Signal
import pyqtgraph as pg
import pyqtgraph
import pyqtgraph.multiprocess as mp
import qcodes
import qtt
from qtt import pgeometry
import qtt.algorithms.generic

# %% Static variables

liveplotwindow = None  # global handle for live plotting


[docs]def getLivePlotWindow(): """ Return handle to live plotting window Returns: None or object: handle to live plotting window """ global liveplotwindow if liveplotwindow is not None: return liveplotwindow return None
# %% Communication try: import redis except BaseException: pass
[docs]class rda_t: def __init__(self): """ Class for simple real-time data access Every object has a `get` and `set` method to access simple parameters globally (e.g. across different python sessions). """ # we use redis as backend now self.r = redis.Redis(host='127.0.0.1', port=6379) # , password='test') try: self.set('dummy_rda_t', -3141592) v = self.get_float('dummy_rda_t') if not v == -3141592: raise Exception('set not equal to get') except redis.exceptions.ConnectionError as e: print('rda_t: check whether redis is installed and the server is running') raise e
[docs] def get_float(self, key, default_value=None): """ Get value by key and convert to a float """ v = self.get(key, default_value) if v is None: return v return float(v)
[docs] def get_int(self, key, default_value=None): """ Get value by key and convert to an int Returns: value (int) """ v = self.get(key, default_value) if v is None: return v return int(float(v))
[docs] def get(self, key, default_value=None): """ Get a value Args: key (str): value to be retrieved default_value (Anything): value to return if the key is not present Returns: value (str) """ value = self.r.get(key) if value is None: return default_value else: return value
[docs] def set(self, key, value): """ Set a value Args: key (str): key value (str): the value to be set """ self.r.set(key, value) pass
[docs]class MeasurementControl(QtWidgets.QMainWindow): """ Simple control for running measurements """ def __init__(self, name='Measurement Control', rda_variable='qtt_abort_running_measurement', text_vars=[], **kwargs): """ Simple control for running measurements Args: name (str): used as window title rda_variable (str): text_vars (list): """ super().__init__(**kwargs) w = self w.setWindowTitle(name) vbox = QtWidgets.QVBoxLayout() self.verbose = 0 self.name = name self.rda_variable = rda_variable self.rda = rda_t() self.text_vars = text_vars if len(text_vars) > 0: self.vLabels = {} self.vEdits = {} self.vButtons = {} self.vActions = [] for tv in text_vars: self.vLabels[tv] = QtWidgets.QLabel() self.vLabels[tv].setText('%s:' % tv) self.vEdits[tv] = (QtWidgets.QTextEdit()) try: self.vEdits[tv].setText( self.rda.get(tv, b'').decode('utf-8')) except Exception as Ex: print('could not retrieve value %s: %s' % (tv, str(Ex))) self.vButtons[tv] = QtWidgets.QPushButton() self.vButtons[tv].setText('Send') self.vButtons[tv].setStyleSheet( "background-color: rgb(255,150,100);") self.vButtons[tv].clicked.connect(partial(self.sendVal, tv)) vbox.addWidget(self.vLabels[tv]) vbox.addWidget(self.vEdits[tv]) vbox.addWidget(self.vButtons[tv]) self.text = QtWidgets.QLabel() self.updateStatus() vbox.addWidget(self.text) self.abortbutton = QtWidgets.QPushButton() self.abortbutton.setText('Abort measurement') self.abortbutton.setStyleSheet("background-color: rgb(255,150,100);") self.abortbutton.clicked.connect(self.abort_measurements) vbox.addWidget(self.abortbutton) self.enable_button = QtWidgets.QPushButton() self.enable_button.setText('Enable measurements') self.enable_button.setStyleSheet("background-color: rgb(255,150,100);") self.enable_button.clicked.connect(self.enable_measurements) vbox.addWidget(self.enable_button) widget = QtWidgets.QWidget() widget.setLayout(vbox) self.setCentralWidget(widget) menuBar = self.menuBar() menuDict = { '&Edit': {'&Get all Values': self.getAllValues}, '&Help': {'&Info': self.showHelpBox} } for (k, menu) in menuDict.items(): mb = menuBar.addMenu(k) for (kk, action) in menu.items(): act = QtWidgets.QAction(kk, self) mb.addAction(act) act.triggered.connect(action) w.resize(300, 300) self.timer = QtCore.QTimer() self.timer.timeout.connect(self.updateStatus) # this also works self.timer.start(1000) self.show()
[docs] def updateStatus(self): if self.verbose >= 2: print('updateStatus...') value = int(self.rda.get(self.rda_variable, 0)) self.text.setText('%s: %d' % (self.rda_variable, value))
[docs] def enable_measurements(self): """ Enable measurements """ if self.verbose: print('%s: setting %s to 0' % (self.name, self.rda_variable)) self.rda.set(self.rda_variable, 0) self.updateStatus()
[docs] def abort_measurements(self): """ Abort the current measurement """ if self.verbose: print('%s: setting %s to 1' % (self.name, self.rda_variable)) self.rda.set(self.rda_variable, 1) self.updateStatus()
[docs] def sendVal(self, tv): """ send text value """ print('sending value %s' % tv) self.rda.set(tv, self.vEdits[tv].toPlainText())
[docs] def getVal(self, tv): """ get text value """ value = self.rda.get(tv, b'').decode('utf-8') self.vEdits[tv].setText(value)
[docs] def showHelpBox(self): """ Show help dialog """ self.infotext = "This widget is used for live control of your measurement via inter-process communication.\ <br/><br/>To add additional variables (str) to the control use the text_vars argmument. \ To access values, use the <code>qtt.redisvalue</code> method." QtWidgets.QMessageBox.information(self, 'qtt measurement control info', self.infotext)
[docs] def getAllValues(self): """ get all string values """ for tv in self.text_vars: self.getVal(tv)
[docs]def start_measurement_control(doexec=False): """ Start measurement control GUI Args: doexec(bool): if True run the event loop """ _ = pyqtgraph.mkQApp() # import warnings # from pyqtgraph.multiprocess.remoteproxy import RemoteExceptionWarning # warnings.simplefilter('ignore', RemoteExceptionWarning) proc = mp.QtProcess() lp = proc._import('qtt.gui.live_plotting') mc = lp.MeasurementControl() app = pyqtgraph.mkQApp() if doexec: app.exec()
try: import qtt.gui.parameterviewer import qtt.gui from qtt.utilities.tools import monitorSizes from qcodes.plots.pyqtgraph import QtPlot
[docs] def setupMeasurementWindows(station=None, create_parameter_widget=True, ilist=None): """ Create liveplot window and parameter widget (optional) Args: station (QCoDeS station): station with instruments create_parameter_widget (bool): if True create ParameterWidget ilist (None or list): list of instruments to add to ParameterWidget Returns: dict: created gui objects """ windows = {} ms = monitorSizes() vv = ms[-1] if create_parameter_widget and any([station, ilist]): if ilist is None: ilist = [station.gates] w = qtt.createParameterWidget(ilist) w.setGeometry(vv[0] + vv[2] - 400 - 300, vv[1], 300, 600) windows['parameterviewer'] = w plotQ = QtPlot(window_title='Live plot', interval=.5) plotQ.setGeometry(vv[0] + vv[2] - 600, vv[1] + vv[3] - 400, 600, 400) plotQ.update() qtt.gui.live_plotting.liveplotwindow = plotQ windows['plotwindow'] = plotQ app = QtWidgets.QApplication.instance() app.processEvents() return windows
except Exception as ex: logging.exception(ex) print('failed to add setupMeasurementWindows!') pass # %%
[docs]class RdaControl(QtWidgets.QMainWindow): def __init__(self, name='LivePlot Control', boxes=[ 'xrange', 'yrange', 'nx', 'ny'], **kwargs): """ Simple control for real-time data parameters """ super().__init__(**kwargs) w = self w.setGeometry(1700, 50, 300, 600) w.setWindowTitle(name) vbox = QtWidgets.QVBoxLayout() self.verbose = 0 self.rda = rda_t() self.boxes = boxes self.widgets = {} for _, b in enumerate(self.boxes): self.widgets[b] = {} hbox = QtWidgets.QHBoxLayout() self.widgets[b]['hbox'] = hbox tbox = QtWidgets.QLabel(b) self.widgets[b]['tbox'] = tbox dbox = QtWidgets.QDoubleSpinBox() # do not emit signals when still editing dbox.setKeyboardTracking(False) self.widgets[b]['dbox'] = dbox val = self.rda.get_float(b, 100) dbox.setMinimum(-10000) dbox.setMaximum(10000) dbox.setSingleStep(10) dbox.setValue(val) dbox.setValue(100) dbox.valueChanged.connect(partial(self.valueChanged, b)) hbox.addWidget(tbox) hbox.addWidget(dbox) vbox.addLayout(hbox) widget = QtWidgets.QWidget() widget.setLayout(vbox) self.setCentralWidget(widget) self.update_values() self.show()
[docs] def update_values(self): for _, b in enumerate(self.boxes): val = self.rda.get_float(b) if val is None: # default... val = 100 dbox = self.widgets[b]['dbox'] # oldstate = dbox.blockSignals(True) dbox.setValue(val)
# dbox.blockSignals(oldstate)
[docs] def valueChanged(self, name, value): if self.verbose: print('valueChanged: %s %s' % (name, value)) self.rda.set(name, value)
# legacy name LivePlotControl = RdaControl # %% Liveplot object
[docs]class livePlot(QtCore.QObject): """ Class to enable live plotting of data. Attributes: datafunction: the function to call for data acquisition sweepInstrument: the instrument to which sweepparams belong sweepparams: the parameter(s) being swept sweepranges (list): the ranges over which sweepparams are being swept verbose (int): output level of logging information show_controls (bool): show gui elements for control of the live plotting alpha (float): parameter (value between 0 and 1) which determines the weight given in averaging to the latest measurement result (alpha) and the previous measurement result (1-alpha), default value 0.3 """ sigMouseClicked = Signal(object) def __init__( self, datafunction=None, sweepInstrument=None, sweepparams=None, sweepranges=None, alpha=.3, verbose=1, show_controls=True, window_title='live view', plot_dimension=None, plot_title=None, is1dscan=None, **kwargs): """Return a new livePlot object.""" super().__init__(**kwargs) self.window_title = window_title win = QtWidgets.QWidget() win.resize(800, 600) win.setWindowTitle(self.window_title) vertLayout = QtWidgets.QVBoxLayout() self._averaging_enabled = 2 if show_controls: topLayout = QtWidgets.QHBoxLayout() win.start_button = QtWidgets.QPushButton('Start') win.stop_button = QtWidgets.QPushButton('Stop') win.averaging_box = QtWidgets.QCheckBox('Averaging') win.averaging_box.setChecked(self._averaging_enabled) for b in [win.start_button, win.stop_button]: b.setMaximumHeight(24) topLayout.addWidget(win.start_button) topLayout.addWidget(win.stop_button) topLayout.addWidget(win.averaging_box) vertLayout.addLayout(topLayout) plotwin = pg.GraphicsWindow(title="Live view") vertLayout.addWidget(plotwin) win.setLayout(vertLayout) self.setGeometry = win.setGeometry self.win = win self.plotwin = plotwin self.verbose = verbose self.idx = 0 self.maxidx = 1e9 self.data = None self.data_avg = None self.sweepInstrument = sweepInstrument self.sweepparams = sweepparams self.sweepranges = sweepranges self.fps = pgeometry.fps_t(nn=6) self.datafunction = datafunction self.datafunction_result = None self.plot_dimension=plot_dimension self.alpha = alpha if is1dscan is None: is1dscan = ( isinstance( self.sweepparams, str) or ( isinstance( self.sweepparams, (list, dict)) and len( self.sweepparams) == 1)) if isinstance(self.sweepparams, dict): if 'gates_horz' not in self.sweepparams: is1dscan = True if verbose: print('live_plotting: is1dscan %s' % is1dscan) if self.sweepparams is None: p1 = plotwin.addPlot(title=plot_title) p1.setLabel('left', 'param2') p1.setLabel('bottom', 'param1') if plot_dimension == 1: dd = np.zeros((0,)) plot = p1.plot(dd, pen='b') self.plot = plot else: self.plot = pg.ImageItem() p1.addItem(self.plot) self._crosshair = [] elif is1dscan: p1 = plotwin.addPlot(title=plot_title) p1.setLabel('left', 'Value') p1.setLabel('bottom', self.sweepparams, units='mV') dd = np.zeros((0,)) plot = p1.plot(dd, pen='b') self.plot = plot vpen = pg.QtGui.QPen(pg.QtGui.QColor( 130, 130, 175, 60), 0, pg.QtCore.Qt.SolidLine) gv = pg.InfiniteLine([0, 0], angle=90, pen=vpen) gv.setZValue(0) p1.addItem(gv) self._crosshair = [gv] self.crosshair(show=False) elif isinstance(self.sweepparams, (list, dict)): # 2D scan p1 = plotwin.addPlot(title=plot_title) if type(self.sweepparams) is dict: [xlabel, ylabel] = ['sweepparam_v', 'stepparam_v'] else: [xlabel, ylabel] = self.sweepparams p1.setLabel('bottom', xlabel, units='mV') p1.setLabel('left', ylabel, units='mV') self.plot = pg.ImageItem() p1.addItem(self.plot) vpen = pg.QtGui.QPen(pg.QtGui.QColor( 0, 130, 235, 60), 0, pg.QtCore.Qt.SolidLine) gh = pg.InfiniteLine([0, 0], angle=90, pen=vpen) gv = pg.InfiniteLine([0, 0], angle=0, pen=vpen) gh.setZValue(0) gv.setZValue(0) p1.addItem(gh) p1.addItem(gv) self._crosshair = [gh, gv] self.crosshair(show=False) else: raise Exception( 'The number of sweep parameters should be either None, 1 or 2.') self.plothandle = p1 self.timer = QtCore.QTimer() self.timer.timeout.connect(self.updatebg) self.win.show() def connect_slot(target): """ Create a slot by dropping signal arguments """ def signal_drop_arguments(*args, **kwargs): target() return signal_drop_arguments if show_controls: win.start_button.clicked.connect(connect_slot(self.startreadout)) win.stop_button.clicked.connect(connect_slot(self.stopreadout)) win.averaging_box.clicked.connect( connect_slot(self.enable_averaging_slot)) self.datafunction_result = None self.plotwin.scene().sigMouseClicked.connect(self._onClick) def __del__(self): self.stopreadout() pyqtgraph.mkQApp().processEvents() self.close() parent = super() if hasattr(parent, '__del__'): parent.__del__() def _onClick(self, event): image_pt = self.plot.mapFromScene(event.scenePos()) tr = self.plot.transform() pt = tr.map(image_pt.x(), image_pt.y()) if self.verbose >= 2: print('pt %s' % (pt,)) self.sigMouseClicked.emit(pt)
[docs] def close(self): if self.verbose: print('LivePlot.close()') self.stopreadout() pyqtgraph.mkQApp().processEvents() self.win.close()
[docs] @qtt.utilities.tools.rdeprecated(txt='method not used any more', expire='1 Dec 2019') def resetdata(self): self.idx = 0 self.data = None
[docs] def crosshair(self, show=None, pos=None): """ Enable or disable crosshair Args: show (None, True or False) pos (None or position) """ for x in self._crosshair: if show is not None: if show: x.show() else: x.hide() if pos is not None: x.setPos(pos)
[docs] def update(self, data=None, processevents=True): self.win.setWindowTitle('%s, fps: %.2f' % (self.window_title, self.fps.framerate())) if self.verbose >= 2: print('livePlot: update: idx %d ' % self.idx) if data is not None: self.data = np.array(data) if self.data_avg is None: self.data_avg = self.data # depending on value of self.averaging_enabled either do or # don't do the averaging if self._averaging_enabled: self.data_avg = self.alpha * self.data + \ (1 - self.alpha) * self.data_avg else: self.data_avg = self.data if self.data.ndim == 1: if None in (self.sweepInstrument, self.sweepparams, self.sweepranges): sweepvalues = np.arange(0, self.data_avg.size) self.plot.setData(sweepvalues, self.data_avg) else: if type(self.sweepparams) is dict: paramval = 0 else: sweep_param = getattr( self.sweepInstrument, self.sweepparams) paramval = sweep_param.get_latest() sweepvalues = np.linspace( paramval - self.sweepranges[0] / 2, self.sweepranges[0] / 2 + paramval, len(data)) self.plot.setData(sweepvalues, self.data_avg) self._sweepvalues = [sweepvalues] self.crosshair(show=None, pos=[paramval, 0]) elif self.data.ndim == 2: self.plot.setImage(self.data_avg.T) if None not in (self.sweepInstrument, self.sweepparams, self.sweepranges): if isinstance(self.sweepparams, dict): value_x = 0 value_y = 0 else: if isinstance(self.sweepparams[0], dict): value_x = 0 value_y = 0 else: value_x = self.sweepInstrument.get( self.sweepparams[0]) value_y = self.sweepInstrument.get( self.sweepparams[1]) self.horz_low = value_x - self.sweepranges[0] / 2 self.horz_range = self.sweepranges[0] self.vert_low = value_y - self.sweepranges[1] / 2 self.vert_range = self.sweepranges[1] self.rect = QtCore.QRect( int(self.horz_low), int(self.vert_low), int(self.horz_range), int(self.vert_range)) self.plot.setRect(self.rect) self.crosshair(show=None, pos=[value_x, value_y]) self._sweepvalues = [ np.linspace( self.horz_low, self.horz_low + self.horz_range, self.data.shape[1]), np.linspace( self.vert_low, self.vert_low + self.vert_range, self.data.shape[0])] else: raise Exception('ndim %d not supported' % self.data.ndim) else: pass self.idx = self.idx + 1 if self.idx > self.maxidx: self.idx = 0 self.timer.stop() if processevents: QtWidgets.QApplication.processEvents()
[docs] def updatebg(self): """ Update function for the widget Calls the datafunction() and update() function """ if self.idx % 10 == 0: logging.debug('livePlot: updatebg %d' % self.idx) self.idx = self.idx + 1 self.fps.addtime(time.time()) if self.datafunction is not None: try: dd = self.datafunction() self.datafunction_result = dd self.update(data=dd) except Exception as e: logging.exception(e) print('livePlot: Exception in updatebg, stopping readout') self.stopreadout() else: self.stopreadout() dd = None if self.fps.framerate() < 10: time.sleep(0.1) time.sleep(0.00001)
[docs] def enable_averaging(self, value): """ Enabling rolling average """ self._averaging_enabled = value if self.verbose >= 1: if self._averaging_enabled == 2: if self.verbose: print('enable_averaging called, alpha = ' + str(self.alpha)) elif self._averaging_enabled == 0: if self.verbose: print('enable_averaging called, averaging turned off') else: if self.verbose: print('enable_averaging called, undefined: value %s' % (self._averaging_enabled,))
[docs] def enable_averaging_slot(self, *args, **kwargs): """ Update the averaging mode of the widget """ self._averaging_enabled = self.win.averaging_box.checkState() self.enable_averaging(self._averaging_enabled)
[docs] def startreadout(self, callback=None, rate=30, maxidx=None): """ Args: callback (None or method): Method to call on update rate (float): sample rate in ms maxidx (None or int): Stop reading if the index is larger than the maxidx """ if maxidx is not None: self.maxidx = maxidx if callback is not None: self.datafunction = callback self.timer.start( int(1000 * (1. / rate)) ) if self.verbose: print('live_plotting: start readout: rate %.1f Hz' % rate)
[docs] def stopreadout(self): """ Stop the readout loop """ if self.verbose: print('live_plotting: stop readout') self.timer.stop() self.win.setWindowTitle('Live view stopped')
# %% Some default callbacks
[docs]class MockCallback_2d(qcodes.Instrument): def __init__(self, name, nx=80, **kwargs): super().__init__(name, **kwargs) self.nx = nx self.add_parameter('p', parameter_class=qcodes.ManualParameter, initial_value=-20) self.add_parameter('q', parameter_class=qcodes.ManualParameter, initial_value=30) def __call__(self): import qtt.utilities.imagetools as lt data = np.random.rand(self.nx * self.nx) data_reshaped = data.reshape(self.nx, self.nx) lt.semiLine(data_reshaped, [self.nx / 2, self.nx / 2], np.deg2rad(self.p()), w=2, l=self.nx / 3, H=2) lt.semiLine(data_reshaped, [self.nx / 2, self.nx / 2], np.deg2rad(self.q()), w=2, l=self.nx / 4, H=3) return data_reshaped