import numpy as np
from qcodes import Parameter
from qcodes.instrument_drivers.tektronix.AWG5014 import Tektronix_AWG5014
from qcodes.utils.validators import Numbers
from qtt.instrument_drivers.virtualAwg.awgs.common import AwgCommon, AwgCommonError
from typing import Dict, List, Optional, Tuple
[docs]class Tektronix5014C_AWG(AwgCommon):
def __init__(self, awg: Tektronix_AWG5014) -> None:
""" The VirtualAWG backend for the Tektronix 5014C AWG.
Args:
awg: The Tektronix 5014C AWG instance.
Raises:
ValueError: The provided AWG is not of type Tektronix_AWG5014.
"""
super().__init__('Tektronix_AWG5014', channel_numbers=[1, 2, 3, 4], marker_numbers=[1, 2])
if type(awg).__name__ is not self._awg_name:
raise ValueError(f'The AWG type does not correspond with {self._awg_name}')
self.__settings = [Parameter(name='marker_low', unit='V', initial_value=0.0,
vals=Numbers(-1.0, 2.6), set_cmd=None),
Parameter(name='marker_high', unit='V', initial_value=1.0,
vals=Numbers(-0.9, 2.7), set_cmd=None),
Parameter(name='amplitudes', unit='V', initial_value=1.0,
vals=Numbers(0.02, 4.5), set_cmd=None),
Parameter(name='offset', unit='V', initial_value=0,
vals=Numbers(-2.25, 2.25), set_cmd=None)]
self.__awg = awg
@property
def fetch_awg(self) -> Tektronix_AWG5014:
""" Return the AWG instance."""
return self.__awg
[docs] def run(self) -> None:
""" Enable the AWG outputs.
This function equals enabling Run button on the AWG.
"""
self.__awg.run()
[docs] def stop(self) -> None:
""" Disables the AWG outputs.
This function equals disabling the Run button on the AWG.
"""
self.__awg.stop()
[docs] def reset(self) -> None:
""" Resets the AWG to it's default settings."""
self.__awg.reset()
[docs] def enable_outputs(self, channels: Optional[List[int]] = None) -> None:
""" Enables the outputs for the given channels.
This function equals enabling the CH1, .. CH4 buttons on the AWG.
Args:
channels: A list with the channel numbers. All channels are enabled, if no value is given.
Raises:
ValueError: If channels contains an invalid channel number.
"""
if channels is None:
channels = self._channel_numbers
if not all([ch in self._channel_numbers for ch in channels]):
raise ValueError(f'Invalid channel numbers {channels}')
list(map(lambda channel: self.__awg.set(f'ch{channel}_state', 1), channels)) # type: ignore
[docs] def disable_outputs(self, channels: Optional[List[int]] = None) -> None:
""" Disables the outputs for the given channels.
This function equals disabling the CH1, .. CH4 buttons on the AWG.
Args:
channels: A list with the channel numbers. All channels are enabled, if no value is given.
Raises:
ValueError: If channels contains an invalid channel number.
"""
if channels is None:
channels = self._channel_numbers
if not all([ch in self._channel_numbers for ch in channels]):
raise ValueError(f'Invalid channel numbers {channels}')
list(map(lambda channel: self.__awg.set(f'ch{channel}_state', 0), channels)) # type: ignore
[docs] def change_setting(self, name: str, value: float) -> None:
""" Sets a setting on the AWG. The changeable settings are:
marker_low, marker_high, amplitudes and offset.
Args:
name: The name of the setting.
value: The value the setting should get.
"""
index = next(i for i, p in enumerate(self.__settings) if p.name == name)
self.__settings[index].set(value)
[docs] def retrieve_setting(self, name: str) -> float:
""" Gets a setting from the AWG. The gettable are:
marker_low, marker_high, amplitudes and offset.
Args:
name: The name of the setting.
"""
index = next(i for i, p in enumerate(self.__settings) if p.name == name)
return self.__settings[index].get()
[docs] def update_running_mode(self, mode: str) -> None:
""" Sets the running mode. The possible modes are the
continues (CONT) and sequential (SEQ).
Args:
mode: Either 'CONT' (continues) or 'SEQ' (sequential).
Raises:
ValueError: If the given mode is not 'CONT' (continues) or 'SEQ' (sequential).
"""
modes = ['CONT', 'SEQ']
if mode not in modes:
raise ValueError(f'Invalid AWG mode ({mode})')
self.__awg.set('run_mode', mode)
[docs] def retrieve_running_mode(self) -> float:
""" Sets the running mode. The possible modes are the
continues (CONT) and sequential (SEQ).
Returns:
'CONT' or 'SEQ'.
"""
return self.__awg.get('run_mode')
[docs] def update_sampling_rate(self, sampling_rate: int) -> None:
""" Sets the sampling rate of the AWG.
Args:
sampling_rate: The number of samples the AWG outputs per second.
"""
return self.__awg.set('clock_freq', sampling_rate)
[docs] def retrieve_sampling_rate(self) -> int:
""" Gets the sample rate of the AWG.
Returns:
The sample rate of the AWG in Samples/second.
"""
return self.__awg.get('clock_freq')
[docs] def update_gain(self, gain: float) -> None:
""" Sets the amplitude of the channel outputs.
The amplitude for all channels are set to the same value using this function.
Args:
gain: The amplitude of the output channels.
"""
list(map(lambda channel: self.__awg.set(f'ch{channel}_amp', gain), self._channel_numbers)) # type: ignore
[docs] def retrieve_gain(self) -> float:
""" Gets the amplitude for all the output channels.
Returns:
The amplitude for all output channels.
Raises:
AwgCommonError: If not all channel amplitudes have the same value. Then the settings in
the AWG are off and needs to be reset first.
"""
gains = [self.__awg.get(f'ch{ch}_amp') for ch in self._channel_numbers]
if not all([g == gains[0] for g in gains]):
raise AwgCommonError('Not all channel amplitudes are equal. Please reset!')
return gains[0]
def _set_sequence(self, channels: List[int], sequence: List[List[str]]) -> None:
""" Sets the sequence on the AWG using the user defined waveforms.
Args:
channels: A list with channel numbers that should output the waveform on the AWG.
sequence: A list containing lists with the waveform names for each channel.
The outer list determines the number of rows the sequences has.
Raises:
ValueError: If the number of channels does not match the element count in the sequence or
when the number of waveforms for each channel do not match.
"""
if len(sequence) != len(channels):
raise ValueError('Invalid sequence and channel count!')
if not all(len(idx) == len(sequence[0]) for idx in sequence):
raise ValueError('Invalid sequence list lengths!')
request_rows = len(sequence[0])
current_rows = self.get_sequence_length()
if request_rows != current_rows:
self.set_sequence_length(request_rows)
for row_index in range(request_rows):
for channel in self._channel_numbers:
if channel in channels:
ch_index = channels.index(channel)
wave_name = sequence[ch_index][row_index]
self.__awg.set_sqel_waveform(wave_name, channel, row_index + 1)
else:
self.__awg.set_sqel_waveform("", channel, row_index + 1)
self.__awg.set_sqel_goto_state(request_rows, 1)
def _upload_waveforms(self, names: List[str], waveforms: List[List[np.ndarray]],
file_name: str = 'default.awg') -> None:
""" Upload the waveforms to the AWG. Creates an AWG file and then uploads it to the AWG.
Args:
names: A list with the waveform names.
waveforms: A list containing the waveform data.
file_name: The name of the AWG file.
"""
pack_count = len(names)
packed_waveforms = dict()
[wfs, m1s, m2s] = list(map(list, zip(*waveforms)))
for i in range(pack_count):
name = names[i]
package = self.__awg.pack_waveform(wfs[i], m1s[i], m2s[i])
packed_waveforms[name] = package
offset = self.retrieve_setting('offset')
amplitude = self.retrieve_setting('amplitudes')
marker_low = self.retrieve_setting('marker_low')
marker_high = self.retrieve_setting('marker_high')
channel_cfg = {'ANALOG_METHOD_1': 1, 'CHANNEL_STATE_1': 1, 'ANALOG_AMPLITUDE_1': amplitude,
'MARKER1_METHOD_1': 2, 'MARKER1_LOW_1': marker_low, 'MARKER1_HIGH_1': marker_high,
'MARKER2_METHOD_1': 2, 'MARKER2_LOW_1': marker_low, 'MARKER2_HIGH_1': marker_high,
'ANALOG_OFFSET_1': offset,
'ANALOG_METHOD_2': 1, 'CHANNEL_STATE_2': 1, 'ANALOG_AMPLITUDE_2': amplitude,
'MARKER1_METHOD_2': 2, 'MARKER1_LOW_2': marker_low, 'MARKER1_HIGH_2': marker_high,
'MARKER2_METHOD_2': 2, 'MARKER2_LOW_2': marker_low, 'MARKER2_HIGH_2': marker_high,
'ANALOG_OFFSET_2': offset,
'ANALOG_METHOD_3': 1, 'CHANNEL_STATE_3': 1, 'ANALOG_AMPLITUDE_3': amplitude,
'MARKER1_METHOD_3': 2, 'MARKER1_LOW_3': marker_low, 'MARKER1_HIGH_3': marker_high,
'MARKER2_METHOD_3': 2, 'MARKER2_LOW_3': marker_low, 'MARKER2_HIGH_3': marker_high,
'ANALOG_OFFSET_3': offset,
'ANALOG_METHOD_4': 1, 'CHANNEL_STATE_4': 1, 'ANALOG_AMPLITUDE_4': amplitude,
'MARKER1_METHOD_4': 2, 'MARKER1_LOW_4': marker_low, 'MARKER1_HIGH_4': marker_high,
'MARKER2_METHOD_4': 2, 'MARKER2_LOW_4': marker_low, 'MARKER2_HIGH_4': marker_high,
'ANALOG_OFFSET_4': offset}
self.__awg.visa_handle.write('MMEMory:CDIRectory "C:\\Users\\OEM\\Documents"')
awg_file = self.__awg.generate_awg_file(packed_waveforms, np.array([]), [], [], [], [], channel_cfg)
self.__awg.send_awg_file(file_name, awg_file)
current_dir = self.__awg.visa_handle.query('MMEMory:CDIRectory?')
current_dir = current_dir.replace('"', '')
current_dir = current_dir.replace('\n', '\\')
self.__awg.load_awg_file(f'{current_dir}{file_name}')
[docs] def delete_sequence(self) -> None:
""" Clears the sequence from the AWG."""
self.set_sequence_length(0)
[docs] def set_sequence_length(self, row_count: int) -> None:
""" Sets the number of rows in the sequence.
Args:
row_count: The number of rows in the sequence.
"""
self.__awg.write(f'SEQuence:LENGth {row_count}')
[docs] def get_sequence_length(self) -> int:
""" Gets the number of rows in the sequence.
Returns:
The number of rows in the sequence.
"""
row_count = self.__awg.ask('SEQuence:LENGth?')
return int(row_count)