Source code for qtt.measurements.videomode_processor

import numpy as np
import warnings
import logging
import copy
import numbers

from scipy import ndimage

import qcodes
import qtt
from qtt.measurements.scans import makeDataset_sweep, makeDataset_sweep_2D
from qtt.measurements.acquisition.interfaces import AcquisitionScopeInterface

from abc import ABC, abstractmethod


[docs]class VideoModeProcessor(ABC): """ Base class for VideoMode processing functionality """
[docs] @abstractmethod def initialize(self, videomode): pass
[docs] @abstractmethod def stop(self): pass
[docs] @abstractmethod def measure(self, videomode): return None
[docs] @abstractmethod def scan_dimension(self): """ Return the dimension of the data to be shown (1 or 2) """
[docs] @abstractmethod def process(self, measurement_data, videomode): return measurement_data
[docs] @classmethod def extend_videomode_name(self, name): """ String to append to VideoMode name """ return name
[docs] @classmethod def ppt_notes(self): """ Generate notes to be added in a powerpoint slide """ return ''
[docs] @classmethod def default_processing(self, measurement_data, videomode): diff_sigma = videomode.diffsigma dd = [] for ii, measurement_data_channel in enumerate(measurement_data): data_processed = np.array(measurement_data_channel) if videomode.diff_dir is not None: if isinstance(videomode.diff_dir, (list, tuple)): diff_dir = videomode.diff_dir[ii] else: diff_dir = videomode.diff_dir data_processed = qtt.utilities.tools.diffImageSmooth( data_processed, dy=diff_dir, sigma=diff_sigma) if videomode.smoothing: data_processed = qtt.algorithms.generic.smoothImage( data_processed) if videomode.laplace: data_processed = ndimage.laplace( data_processed, mode='nearest') dd.append(data_processed) return dd
[docs] @classmethod def plot_title(self, index): """ Return title for plot window """ plot_title = f'plot {index}' return plot_title
[docs] def create_dataset(self, processed_data, metadata): alldata = [None] * len(processed_data) for jj, data_block in enumerate(processed_data): if self.scan_dimension() == 1: xdata = np.arange(0., data_block.size) alldatax = qtt.data.makeDataSet1Dplain( xname='x', x=xdata, yname='signal', y=data_block, loc_record={'label': 'videomode_1d_single'}) else: xdata = np.arange(0., data_block.shape[1]) ydata = np.arange(0., data_block.shape[0]) alldatax = qtt.data.makeDataSet2Dplain( xname='x', x=xdata, yname='y', y=ydata, zname='signal', z=data_block, loc_record={'label': 'videomode_1d_single'}) alldatax.metadata = copy.copy(metadata) alldata[jj] = alldatax return alldata
[docs] def acquisition_device_type(self): """ Return type of acquisition device Returns: Device type as a string. Can be """ measurement_instrument_handle = self.minstrumenthandle if isinstance(measurement_instrument_handle, AcquisitionScopeInterface): return 'AcquisitionScopeInterface' elif measurement_instrument_handle.name in ['digitizer', 'm4i']: return 'm4i' elif measurement_instrument_handle.name in ['ZIUHFLI', 'ziuhfli']: return 'ziuhfli' else: return 'other'
[docs]class DummyVideoModeProcessor(VideoModeProcessor): def __init__(self, station, verbose=1): """ Dummy implementation of the VideoModeProcessor """ self.station = station
[docs] def ppt_notes(self): return f'dummy notes for {self}'
[docs] def initialize(self, videomode): pass
[docs] def stop(self): pass
[docs] def measure(self, videomode): data = [np.random.rand(100, )] return data
[docs] def process(self, measurement_data, videomode): dd = self.default_processing(measurement_data, videomode) return dd
[docs] def scan_dimension(self): return 1
[docs]class VideomodeSawtoothMeasurement(VideoModeProcessor): def __init__(self, station, verbose=1): self.station = station self.verbose = verbose self.scan_parameters = {} self.period_1d = qcodes.ManualParameter('period_1d', initial_value=1e-3)
[docs] def plot_title(self, index): plot_title = str(self.minstrumenthandle) + ' ' + str(self.channels[index]) return plot_title
[docs] def create_dataset(self, processed_data, metadata): alldata = [None] * len(processed_data) if processed_data.ndim == 2: for jj, data_block in enumerate(processed_data): dataset, _ = makeDataset_sweep(data_block, self.sweepparams, self.sweepranges[0], gates=self.station.gates, loc_record={'label': 'videomode_1d'}) dataset.metadata = copy.copy(metadata) alldata[jj] = dataset elif processed_data.ndim == 3: for jj, data_block in enumerate(processed_data): dataset, _ = makeDataset_sweep_2D(data_block, self.station.gates, self.sweepparams, self.sweepranges, loc_record={'label': 'videomode_2d'}) dataset.metadata = copy.copy(metadata) alldata[jj] = dataset else: raise Exception('makeDataset: data.ndim %d' % processed_data.ndim) return alldata
[docs] def set_properties(self, waveform, measurement_instrument, resolution=None): """ Create callback object for videmode data Args: station (QCoDeS station) waveform measurement_instrument (tuple): instrumentname, channel diff_dir (list or (int, str)): differentiation modes for the data """ self.waveform = waveform self.minstrument = measurement_instrument[0] self.channels = measurement_instrument[1] self.measuresegment_arguments = {'mV_range': 5000} if not isinstance(self.channels, list): self.channels = [self.channels] self.unique_channels = list(np.unique(self.channels)) self.resolution = resolution
[docs] def update_position(self, position, verbose=1): if verbose: print('# %s: update position: %s' % (self.__class__.__name__, position,)) station = self.station if self.scan_dimension() == 1: delta = position[0] if isinstance(self.scan_parameters['sweepparams'], str): param = getattr(station.gates, self.scan_parameters['sweepparams']) param.set(delta) if verbose > 2: print(' set %s to %s' % (param, delta)) return try: for ii, parameter in enumerate(self.scan_parameters['sweepparams']): delta = position[ii] if verbose: print('param %s: delta %.3f' % (parameter, delta)) if isinstance(parameter, str): if parameter == 'gates_horz' or parameter == 'gates_vert': d = self.scan_parameters['sweepparams'][parameter] for gate, factor in d.items(): if verbose >= 2: print(' %s: increment %s with %s' % (parameter, gate, factor * delta)) param = getattr(station.gates, gate) param.increment(factor * delta) else: if verbose > 2: print(' set %s to %s' % (parameter, delta)) param = getattr(station.gates, parameter) param.set(delta) else: raise Exception('_update_position with parameter type %s not supported' % (type(parameter),)) except Exception as ex: logging.exception(ex)
[docs] def ppt_notes(self): return str(self.scan_parameters)
[docs] def initialize(self, videomode): scan_dimensions = self.scan_dimension() virtual_awg = getattr(self.station, 'virtual_awg', None) awg = getattr(self.station, 'awg', None) if isinstance(self.minstrumenthandle, AcquisitionScopeInterface): if scan_dimensions == 1: self.minstrumenthandle.period = self.period_1d() elif scan_dimensions == 2: self.minstrumenthandle.number_of_samples = np.prod(self.resolution) scan_channels = self.scanparams['minstrument'][1] channel_attributes = self.scanparams['minstrument'][2] self.minstrumenthandle.enabled_channels = tuple(scan_channels) for channel, attribute in zip(scan_channels, channel_attributes): self.minstrumenthandle.set_input_signal(channel, attribute) self.minstrumenthandle.start_acquisition() if scan_dimensions == 1: self._run_1d_scan(awg, virtual_awg, period=self.period_1d()) elif scan_dimensions == 2: self._run_2d_scan(awg, virtual_awg) else: raise Exception(f'type of scan not supported scan_dimensions is {scan_dimensions}')
[docs] def stop(self): if self.station is not None: if hasattr(self.station, 'awg'): self.station.awg.stop() if hasattr(self.station, 'RF'): self.station.RF.off() if hasattr(self.station, 'virtual_awg'): self.station.virtual_awg.stop() self.station.virtual_awg.disable_outputs() if isinstance(self.minstrumenthandle, AcquisitionScopeInterface): self.minstrumenthandle.stop_acquisition()
[docs] def measure(self, videomode): if self.acquisition_device_type() == 'm4i': if self.scan_dimension() == 1: if (self.sampling_frequency() * self.period_1d()) * (1 - self.waveform['width']) / 2 > 40 + 2 * 16: trigger_re_arm_compensation = True else: trigger_re_arm_compensation = False else: trigger_re_arm_compensation = True trigger_re_arm_compensation = getattr(self, 'trigger_re_arm_compensation', trigger_re_arm_compensation) device_parameters = {'trigger_re_arm_compensation': trigger_re_arm_compensation} else: device_parameters = {} self._device_parameters = device_parameters data = qtt.measurements.scans.measuresegment( self.waveform, videomode.Naverage(), self.minstrumenthandle, self.unique_channels, **self.measuresegment_arguments, device_parameters=device_parameters) if np.all(data == 0): raise Exception('data returned contained only zeros, aborting') return data
[docs] def process(self, measurement_data, videomode): measurement_data_selected = [] for _, channel in enumerate(self.channels): unique_channel_idx = self.unique_channels.index(channel) data_processed = np.array(measurement_data[unique_channel_idx]) measurement_data_selected.append(data_processed) dd = self.default_processing(measurement_data_selected, videomode) return dd
[docs] def parse_instrument(self, measurement_instrument_handle, sample_rate): measurement_instrument_handle = qtt.measurements.scans.get_instrument(measurement_instrument_handle, self.station) self.minstrumenthandle = measurement_instrument_handle device_type = self.acquisition_device_type() if device_type == 'AcquisitionScopeInterface': if sample_rate != 'default': measurement_instrument_handle.sample_rate = sample_rate self.sampling_frequency = lambda: measurement_instrument_handle.sample_rate elif device_type == 'm4i': if sample_rate == 'default': self.sampling_frequency = measurement_instrument_handle.sample_rate else: measurement_instrument_handle.sample_rate(sample_rate) self.sampling_frequency = measurement_instrument_handle.sample_rate elif device_type == 'ziuhfli': self.sampling_frequency = measurement_instrument_handle.scope_samplingrate else: try: measurement_instrument_handle = qtt.measurements.scans.get_instrument( measurement_instrument_handle, self.station) self.sampling_frequency = measurement_instrument_handle.sample_rate except Exception as ex: raise Exception( f'no fpga or digitizer found minstrumenthandle is {measurement_instrument_handle}') from ex return self.sampling_frequency
def _run_1d_scan(self, awg, virtual_awg, period=1e-3): if virtual_awg: if not isinstance(self.sweepparams, (str, dict)): raise Exception('argument sweepparams of type %s not supported' % type(self.sweepparams)) sweep_range = self.sweepranges[0] gates = self.sweepparams if isinstance(self.sweepparams, dict) else {self.sweepparams: 1} waveform = virtual_awg.sweep_gates(gates, sweep_range, period) virtual_awg.enable_outputs(list(gates.keys())) virtual_awg.run() else: if isinstance(self.sweepparams, str): waveform, _ = awg.sweep_gate(self.sweepparams, self.sweepranges[0], period=period) elif isinstance(self.sweepparams, dict): waveform, _ = awg.sweep_gate_virt(self.sweepparams, self.sweepranges[0], period=period) else: raise Exception('argument sweepparams of type %s not supported' % type(self.sweepparams)) self.set_properties(waveform, measurement_instrument=(self.minstrumenthandle, self.scan_parameters['channels'])) def _run_2d_scan(self, awg, virtual_awg, period=None): if virtual_awg: sweep_ranges = [i for i in self.sweepranges] if isinstance(self.sweepparams[0], dict): gates = self.sweepparams else: # convert list of strings to dict format gates = self.sweepparams gates = [dict([(gate, 1)]) for gate in gates] if period is None: sampling_frequency = qtt.measurements.scans.get_sampling_frequency(self.minstrumenthandle) base_period = 1. / sampling_frequency total_period = base_period * np.prod(self.resolution) else: total_period = period warnings.warn('code untested') waveform = virtual_awg.sweep_gates_2d(gates, sweep_ranges, total_period, self.resolution) keys = qtt.utilities.tools.flatten([list(item.keys()) for item in gates]) virtual_awg.enable_outputs(keys) virtual_awg.run() else: if isinstance(self.sweepparams, list): waveform, _ = awg.sweep_2D(self.sampling_frequency.get(), self.sweepparams, self.sweepranges, self.resolution) elif isinstance(self.sweepparams, dict): waveform, _ = awg.sweep_2D_virt(self.sampling_frequency.get(), self.sweepparams[ 'gates_horz'], self.sweepparams['gates_vert'], self.sweepranges, self.resolution) else: raise Exception('argument sweepparams of type %s not supported' % type(self.sweepparams)) if self.verbose: print('%s: 2d scan, define callback ' % (self.__class__.__name__,)) self.set_properties(waveform, measurement_instrument=(self.minstrumenthandle, self.scan_parameters['channels']), resolution=self.resolution)
[docs] def set_scan_parameters(self, scan_parameters): self.scan_parameters = scan_parameters self.sweepranges = scan_parameters['sweepranges'] if isinstance(self.sweepranges, numbers.Number): self.sweepranges = [self.sweepranges] self.sweepparams = scan_parameters['sweepparams'] self.resolution = scan_parameters['resolution'] self.channels = scan_parameters['channels']
[docs] def scan_dimension(self): if isinstance(self.sweepranges, (float, int)): return 1 elif isinstance(self.sweepranges, list): if len(self.sweepranges) == 2: return 2 elif len(self.sweepranges) == 1: return 1 else: raise Exception('scan dimension not supported') else: return -1
[docs] def extend_videomode_name(self, name): """ String to append to VideoMode name """ if name is not None: if isinstance(self.sweepparams, (str, list)): name += ': %s' % str(self.sweepparams) return name