Source code for qtt.algorithms.random_telegraph_signal

""" Functionality to analyse random telegraph signals

Created on Wed Feb 28 10:20:46 2018

@author: riggelenfv /eendebakpt
"""

import operator
import warnings
from typing import Optional, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import qcodes

from qtt.algorithms.fitting import fit_double_gaussian, refit_double_gaussian
from qtt.algorithms.functions import double_gaussian, exp_function, fit_exp_decay, gaussian
from qtt.algorithms.markov_chain import ContinuousTimeMarkovModel
from qtt.utilities.tools import addPPTslide
from qtt.utilities.visualization import get_axis, plot_double_gaussian_fit, plot_vertical_line


[docs]def rts2tunnel_ratio(binary_signal: np.ndarray) -> float: """ Calculate ratio between tunnelrate down and up From the mean and standard deviation of the RTS data we can determine the ratio between the two tunnel rates. See equations on https://en.wikipedia.org/wiki/Telegraph_process Args: binary_signal: RTS signal with two levels 0 and 1 Returns: Ratio of tunnelrate up to down (l2) and down to up (l1) """ binary_signal = np.asarray(binary_signal) c1 = binary_signal.min() c2 = binary_signal.max() number_of_transitions = np.abs(np.diff(binary_signal)).sum() if number_of_transitions < 40: warnings.warn(f'number of transitions {number_of_transitions} is low, estimate can be inaccurate') if c1 == c2: raise ValueError(f'binary signal contains only a single value {c1}') if c1 != 0 or c2 != 1: raise ValueError('signal must only contain 0 and 1') m = binary_signal.mean() var = binary_signal.var() ratio_l2_over_l1 = var/m**2 return ratio_l2_over_l1
[docs]def transitions_durations(data: np.ndarray, split: float, add_start: bool = False, add_end: bool = False) -> Tuple[np.ndarray, np.ndarray]: """ For data of a two level system (up and down) determine durations of segments This function determines which datapoints belong to which level and finds the transitions, in order to determines how long the system stays in these levels. Args: data : data from the two level system split: value that separates the up and down level add_start: If True, then include the segments at the start of the data add_end:: If True, then include the segments at the end of the data Returns: duration_dn: array of the durations (unit: data points) in the down level duration_up: array of durations (unit: data points) in the up level """ size = len(data) if size == 0: return np.array([], dtype=int), np.array([], dtype=int) # split the data and find the index of the transitions, transitions from # up to down are marked with -1 and from down to up with 1 b = np.asarray(data) > split d = np.diff(b.astype(int)) transitions_down_to_up = (d == 1).nonzero()[0] transitions_up_to_down = (d == -1).nonzero()[0] # durations are calculated by taking the difference in data points between # the transitions endpoints_dn = [] endpoints_up = [] if data[0] <= split and data[-1] <= split: duration_up = transitions_up_to_down - transitions_down_to_up duration_dn = transitions_down_to_up[1:] - transitions_up_to_down[:-1] if len(transitions_up_to_down) == 0: if add_start or add_end: endpoints_dn.append(size) else: if add_start: endpoints_dn.append(transitions_down_to_up[0]+1) if add_end: endpoints_dn.append(size - transitions_up_to_down[-1]-1) elif data[0] <= split < data[-1]: duration_up = transitions_up_to_down - transitions_down_to_up[:-1] duration_dn = transitions_down_to_up[1:]-transitions_up_to_down if add_start: endpoints_dn.append(transitions_down_to_up[0]+1) if add_end: endpoints_up.append(size-transitions_down_to_up[-1]-1) elif data[0] > split >= data[-1]: duration_up = transitions_up_to_down[1:] - transitions_down_to_up duration_dn = transitions_down_to_up - transitions_up_to_down[:-1] if add_start: endpoints_up.append(transitions_up_to_down[0]+1) if add_end: endpoints_dn.append(size-transitions_up_to_down[-1]-1) else: # case: data[0] > split and data[-1] > split: duration_up = transitions_up_to_down[1:] - transitions_down_to_up[:-1] duration_dn = transitions_down_to_up - transitions_up_to_down if len(transitions_up_to_down) == 0: if add_start or add_end: endpoints_up.append(size) else: if add_start: endpoints_up.append(transitions_up_to_down[0]+1) if add_end: endpoints_up.append(size-transitions_down_to_up[-1]-1) duration_dn = np.concatenate((duration_dn, np.asarray(endpoints_dn, dtype=int)), dtype=int) duration_up = np.concatenate((duration_up, np.asarray(endpoints_up, dtype=int)), dtype=int) return duration_dn, duration_up
[docs]class FittingException(Exception): """ Fitting exception in RTS code """ pass
def _plot_rts_histogram(data, num_bins, double_gaussian_fit, split, figure_title): _, bins, _ = plt.hist(data, bins=num_bins) bincentres = np.array([(bins[i] + bins[i + 1]) / 2 for i in range(0, len(bins) - 1)]) get_left_mean_std_amplitude = operator.itemgetter(4, 2, 0) get_right_mean_std_amplitude = operator.itemgetter(5, 3, 1) left_gaussian = list(get_left_mean_std_amplitude(double_gaussian_fit)) right_gaussian = list(get_right_mean_std_amplitude(double_gaussian_fit)) plt.plot(bincentres, double_gaussian(bincentres, double_gaussian_fit), '-m', label='Fitted double gaussian') plt.plot(bincentres, gaussian(bincentres, *left_gaussian), 'g', label='Left Gaussian', alpha=.85, linewidth=.75) plt.plot(bincentres, gaussian(bincentres, *right_gaussian), 'r', label='Right Gaussian', alpha=.85, linewidth=.75) plt.plot(split, double_gaussian(split, double_gaussian_fit), 'ro', markersize=8, label='split: %.3f' % split) plt.xlabel('Measured value (a.u.)') plt.ylabel('Data points per bin') plt.legend() plt.title(figure_title)
[docs]def two_level_threshold(data: np.ndarray, number_of_bins: int = 40) -> dict: """ Determine threshold for separation of two-level signal Typical examples of such a signal are an RTS signal or Elzerman readout. Args: data: Two dimensional array with single traces number_of_bins: Number of bins to use for calculation of double histogram Returns: Dictionary with results. The key readout_threshold contains the calculated threshold """ counts, bins = np.histogram(data, bins=number_of_bins) bin_centres = np.array([(bins[i] + bins[i + 1]) / 2 for i in range(0, len(bins) - 1)]) _, result_dict = fit_double_gaussian(bin_centres, counts) result_dict = refit_double_gaussian(result_dict, bin_centres, counts) result = {'signal_threshold': result_dict['split'], 'double_gaussian_fit': result_dict, 'separation': result_dict['separation'], 'histogram': {'counts': counts, 'bins': bins, 'bin_centres': bin_centres}} return result
[docs]def plot_two_level_threshold(results: dict, fig: int = 100, plot_initial_estimate: bool = False): separation = results['separation'] threshold = results['signal_threshold'] ax = get_axis(fig) bin_centres = results['histogram']['bin_centres'] counts = results['histogram']['counts'] ax.bar(bin_centres, counts, width=bin_centres[1] - bin_centres[0], label='histogram') ax.set_ylabel('Counts') ax.set_xlabel('Signal [a.u.]') plot_vertical_line(threshold, label='threshold') plot_double_gaussian_fit(results['double_gaussian_fit'], bin_centres) ax.set_title(f'Two-level signal: separation {separation:.3f}, threshold {threshold:.3g}') if plot_initial_estimate: xdata = np.linspace(bin_centres[0], bin_centres[-1], 300) initial_estimate = results['double_gaussian_fit']['parameters initial guess'] left0 = initial_estimate[::2][::-1] right0 = initial_estimate[1::2][::-1] ax.plot(xdata, gaussian(xdata, *left0), ':g', label='initial estimate left') ax.plot(xdata, gaussian(xdata, *right0), ':r', label='initial estimate right')
def _create_integer_histogram(durations): """ Calculate number of bins, bin edges and histogram for durations This method works if the data is sampled at integer durations. """ numbins = int(np.sqrt(len(durations))) if numbins == 0: raise Exception('cannot create histogram with zero bins') bin_size = int(np.ceil((durations.max() - (durations.min() - .5)) / numbins)) # choose bins carefully, since our data is sampled only at discrete times bins = np.arange(durations.min() - .5, durations.max() + bin_size, bin_size) counts, bin_edges = np.histogram(durations, bins=bins) return counts, bin_edges, bin_size
[docs]def tunnelrates_RTS(data: Union[np.ndarray, qcodes.data.data_set.DataSet], samplerate: Optional[float] = None, min_sep: float = 2.0, max_sep: float = 7.0, min_duration: int = 5, num_bins: Optional[int] = None, fig: Optional[int] = None, ppt=None, verbose: int = 0, offset_parameter: Optional[float] = None) -> Tuple[Optional[float], Optional[float], dict]: """ This function takes an RTS dataset, fits a double gaussian, finds the split between the two levels, determines the durations in these two levels, fits a decaying exponential on two arrays of durations, which gives the tunneling frequency for both the levels. If the number of datapoints is too low to get enough points per bin for the exponential fit (for either the up or the down level), this analysis step is passed over. tunnelrate_dn and tunnelrate_up are returned as None, but similar information can be substracted from parameters['down_segments'] and parameters['up_segments']. Args: data: qcodes DataSet (or 1d data array) with the RTS data samplerate: sampling rate of the acquisition device, optional if given in the metadata of the measured data min_sep: if the separation found for the fit of the double gaussian is less then this value, the fit probably failed and a FittingException is raised max_sep: if the separation found for the fit of the double gaussian is more then this value, the fit probably failed and a FittingException is raised min_duration: minimal number of datapoints a duration should last to be taking into account for the analysis num_bins: number of bins for the histogram of signal values. If None, then determine based on the size of the data fig: shows figures and sends them to the ppt when is not None ppt: determines if the figures are send to a powerpoint presentation verbose: prints info to the console when > 0 offset_parameter: Offset parameter for fitting of exponential decay Returns: tunnelrate_dn: tunneling rate of the down level to the up level (kHz) or None in case of not enough datapoints tunnelrate_up: tunneling rate of the up level to the down level (kHz) or None in case of not enough datapoints parameters: dictionary with relevent (fit) parameters. this includes: tunnelrate_down (float): tunnel rate in Hz tunnelrate_up (float): tunnel rate up in Hz """ if isinstance(data, qcodes.data.data_set.DataSet): if samplerate is None: samplerate = data.metadata.get('samplerate', None) data = np.array(data.default_parameter_array()) if samplerate is None: raise ValueError('samplerate should be set to the data samplerate in Hz') # plotting a 2d histogram of the RTS if fig is not None: max_num_bins_time_domain = 1200 max_num_bins_signal_domain = 800 xdata = np.arange(len(data)) / (samplerate / 1_000) ny = min(int(np.sqrt(len(data))/2), max_num_bins_signal_domain) nx = min(int(np.sqrt(len(xdata))/2), max_num_bins_time_domain) Z, xedges, yedges = np.histogram2d(xdata, data, bins=[nx, ny]) title = '2d histogram RTS' Fig = plt.figure(fig) plt.clf() plt.pcolormesh(xedges, yedges, Z.T) cb = plt.colorbar() cb.set_label('Data points per bin') plt.xlabel('Time (ms)') plt.ylabel('Signal sensing dot (a.u.)') plt.title(title) if ppt: addPPTslide(title=title, fig=Fig) # binning the data and determining the bincentres if num_bins is None: max_num_bins_fitting = 1200 num_bins = min(int(np.sqrt(len(data))), max_num_bins_fitting) fit_results = two_level_threshold(data, number_of_bins=num_bins) separation = fit_results['separation'] split = fit_results['signal_threshold'] double_gaussian_fit_parameters = fit_results['double_gaussian_fit']['parameters'] if verbose: print('Fit parameters double gaussian:\n mean down: %.3f counts' % double_gaussian_fit_parameters[4] + ', mean up: %.3f counts' % double_gaussian_fit_parameters[ 5] + ', std down: %.3f counts' % double_gaussian_fit_parameters[2] + ', std up:%.3f counts' % double_gaussian_fit_parameters[3]) print('Separation between peaks gaussians: %.3f std' % separation) print('Split between two levels: %.3f' % split) # plotting the data in a histogram, the fitted two gaussian model and the split if fig: figure_title = 'Histogram of two level signal' + f'\nseparation: {separation:.1f} [std]' Fig = plt.figure(fig + 1) plt.clf() _plot_rts_histogram(data, num_bins, double_gaussian_fit_parameters, split, figure_title) if ppt: addPPTslide(title=title, fig=Fig, notes='Fit parameters double gaussian:\n mean down: %.3f counts' % double_gaussian_fit_parameters[4] + ', mean up:%.3f counts' % double_gaussian_fit_parameters[5] + ', std down: %.3f counts' % double_gaussian_fit_parameters[2] + ', std up:%.3f counts' % double_gaussian_fit_parameters[ 3] + '.Separation between peaks gaussians: %.3f std' % separation + '. Split between two levels: %.3f' % split) if separation < min_sep: raise FittingException( 'Separation between the peaks of the gaussian %.1f is less then %.1f std, indicating that the fit was not succesfull.' % ( separation, min_sep)) if separation > max_sep: raise FittingException( 'Separation between the peaks of the gaussian %.1f is more then %.1f std, indicating that the fit was not succesfull.' % ( separation, max_sep)) thresholded_data = data > split fraction_up = np.sum(thresholded_data) / data.size fraction_down = 1 - fraction_up # count the number of transitions and their duration durations_dn_idx, durations_up_idx = transitions_durations(data, split) # throwing away the durations with less data points then min_duration durations_up_min_duration = durations_up_idx >= min_duration durations_up = durations_up_idx[durations_up_min_duration] durations_dn_min_duration = durations_dn_idx >= min_duration durations_dn = durations_dn_idx[durations_dn_min_duration] if len(durations_up) < 1: raise FittingException('All durations_up are shorter than the minimal duration.') if len(durations_dn) < 1: raise FittingException('All durations_dn are shorter than the minimal duration.') # calculating the number of bins and counts for down level counts_dn, bins_dn, _ = _create_integer_histogram(durations_dn) # calculating the number of bins and counts for up level counts_up, bins_up, _ = _create_integer_histogram(durations_up) if verbose >= 2: print(f' _create_integer_histogram: up/down: number of bins {len(bins_up)}/{len(bins_dn)}') bins_dn = bins_dn / samplerate bins_up = bins_up / samplerate if verbose >= 2: print('counts_dn %d, counts_up %d' % (counts_dn[0], counts_up[0])) tunnelrate_dn = None tunnelrate_up = None minimal_count_number = 50 if counts_dn[0] < minimal_count_number: warnings.warn( f'Number of down datapoints {counts_dn[0]} is not enough (minimal_count_number {minimal_count_number})' f' to make an accurate fit of the exponential decay for level down. ' + 'Look therefore at the mean value of the measurement segments') if counts_up[0] < minimal_count_number: warnings.warn( f'Number of up datapoints {counts_up[0]} is not enough (minimal_count_number {minimal_count_number}) to make an acurate fit of the exponential decay for level up. ' + 'Look therefore at the mean value of the measurement segments') parameters = {'sampling rate': samplerate, 'fit parameters double gaussian': double_gaussian_fit_parameters, 'separations between peaks gaussians': separation, 'split between the two levels': split} parameters['down_segments'] = {'number': len(durations_dn_idx), 'mean': np.mean(durations_dn_idx) / samplerate, 'p50': np.percentile( durations_dn_idx, 50) / samplerate, 'number_filtered': len(durations_dn), 'mean_filtered': np.mean(durations_dn)} parameters['up_segments'] = {'number': len(durations_up_idx), 'mean': np.mean(durations_up_idx) / samplerate, 'p50': np.percentile( durations_up_idx, 50) / samplerate, 'number_filtered': len(durations_up), 'mean_filtered': np.mean(durations_up)} parameters['tunnelrate_down_to_up'] = 1. / parameters['down_segments']['mean'] parameters['tunnelrate_up_to_down'] = 1. / parameters['up_segments']['mean'] parameters['fraction_down'] = fraction_down parameters['fraction_up'] = fraction_up parameters['bins_dn'] = {'number': len(bins_dn), 'size': np.diff(bins_dn).mean(), 'start': bins_dn[0]} parameters['bins_up'] = {'number': len(bins_up), 'size': np.diff(bins_up).mean(), 'start': bins_up[0]} parameters['tunnelrate_ratio'] = rts2tunnel_ratio(thresholded_data) if (counts_dn[0] > minimal_count_number) and (counts_up[0] > minimal_count_number): def _fit_and_plot_decay(bincentres, counts, label, fig_label): """ Fitting and plotting of exponential decay for level """ A_fit, B_fit, gamma_fit = fit_exp_decay(bincentres, counts, offset_parameter=offset_parameter) tunnelrate = gamma_fit / 1000 other_label = 'up' if label == 'down' else 'down' if verbose: print(f'Tunnel rate {label} to {other_label}: %.1f kHz' % tunnelrate) time_scaling = 1e3 if fig_label: title = f'Fitted exponential decay, level {label}' Fig = plt.figure(fig_label) plt.clf() plt.plot(time_scaling * bincentres, counts, 'o', label=f'Counts {label}') plt.plot(time_scaling * bincentres, exp_function(bincentres, A_fit, B_fit, gamma_fit), 'r', label=r'Fitted exponential decay $\Gamma_{\mathrm{%s\ to\ %s}}$: %.1f kHz' % (label, other_label, tunnelrate)) plt.xlabel('Lifetime (ms)') plt.ylabel('Counts per bin') plt.legend() plt.title(title) if ppt: addPPTslide(title=title, fig=Fig) fit_parameters = [A_fit, B_fit, gamma_fit] return tunnelrate, fit_parameters bincentres_dn = (bins_dn[:-1]+bins_dn[1:])/2 fig_label = None if fig is None else fig + 2 tunnelrate_dn, fit_parameters_down = _fit_and_plot_decay( bincentres_dn, counts_dn, label='down', fig_label=fig_label) bincentres_up = (bins_up[:-1]+bins_up[1:])/2 fig_label = None if fig is None else fig + 3 tunnelrate_up, fit_parameters_up = _fit_and_plot_decay( bincentres_up, counts_up, label='up', fig_label=fig_label) parameters['fit parameters exp. decay down'] = fit_parameters_down parameters['fit parameters exp. decay up'] = fit_parameters_up parameters['tunnelrate_down_exponential_fit'] = tunnelrate_dn parameters['tunnelrate_up_exponential_fit'] = tunnelrate_up else: parameters['tunnelrate_down_exponential_fit'] = None parameters['tunnelrate_up_exponential_fit'] = None return tunnelrate_dn, tunnelrate_up, parameters
[docs]def generate_RTS_signal(number_of_samples: int = 100000, std_gaussian_noise: float = 0.1, uniform_noise: float = 0.05, rate_up: float = 10e3, rate_down: float = 15e3, samplerate: float = 1e6) -> np.ndarray: """ Generate a RTS signal Args: number_of_samples: Length the the trace to be generated std_normal_noise: std of Gaussian noise added to the signal uniform_noise: uniform noise in the range +- uniform_noise/2 is added to the signal rate_up: rate from down to up rate_down: rate from up to down samplerate: The samplerate of the signal to be generated Returns: Array with generated signal (0 is down, 1 is up) """ rts_model = ContinuousTimeMarkovModel(['down', 'up'], [rate_up / samplerate, rate_down / samplerate], np.array([[0., 1], [1, 0]])) data = rts_model.generate_sequence(number_of_samples, delta_time=1) if uniform_noise != 0: data = data + uniform_noise * (np.random.rand(data.size, ) - .5) if std_gaussian_noise != 0: data = data + np.random.normal(0, std_gaussian_noise, data.size) return data