import logging
import functools
import time
from qcodes.instrument.visa import VisaInstrument
from qcodes.instrument.visa import visa
[docs]class FPGA_ave(VisaInstrument):
'''
This is the python driver for the FPGA averaging, it communicates with the FPGA to get an average of a pulse
Usage:
Initialize with
<name> = instruments.create('name', 'FPGA_AVE', address='<COM PORT>')
<COM PORT> = COM5 e.g.
'''
def __init__(self, name, address, mirrorfactors=[1, 1], verbose=1, **kwargs):
logging.debug(__name__ + ' : Initializing instrument')
super().__init__(name, address, **kwargs)
self._address = address
self._values = {}
self.verbose = verbose
self._total_cycle_num = 100
self._sampling_frequency = 1000e3
self.visa_handle.baud_rate = 57600
self.set_sampling_frequency(self._sampling_frequency)
self.mirrorfactors = mirrorfactors
# Add parameters
self.add_parameter('mode',
set_cmd=functools.partial(self.write_to_serial, 130))
self.add_parameter('ch1_cycle_num',
get_cmd=functools.partial(self.ask_from_serial, 1))
self.add_parameter('ch2_cycle_num',
get_cmd=functools.partial(self.ask_from_serial, 5))
self.add_parameter('measurement_done',
get_cmd=self.get_measurement_done)
self.add_parameter('total_cycle_num',
get_cmd=self.get_total_cycle_num,
set_cmd=self.set_total_cycle_num)
self.add_parameter('ch1_datapoint_num',
get_cmd=self.get_ch1_datapoint_num)
self.add_parameter('ch2_datapoint_num',
get_cmd=self.get_ch2_datapoint_num)
self.add_parameter('data',
get_cmd=self.get_data)
self.add_parameter('ch1_data',
get_cmd=self.get_ch1_data)
self.add_parameter('ch2_data',
get_cmd=self.get_ch2_data)
self.add_parameter('sampling_frequency',
get_cmd=self.get_sampling_frequency,
set_cmd=self.set_sampling_frequency)
v = self.visa_handle
# make sure term characters are ignored
logging.debug(__name__ + ' : set termchar settings')
v.set_visa_attribute(visa.constants.VI_ATTR_TERMCHAR_EN, 0)
v.set_visa_attribute(visa.constants.VI_ATTR_ASRL_END_IN, 0)
logging.debug(__name__ + ' : completed initialization')
[docs] def get_idn(self):
logging.debug(__name__ + ' : FPGA_ave: get_idn')
IDN = {'vendor': None, 'model': 'FPGA',
'serial': None, 'firmware': None}
return IDN
# def get_all(self):
# for cnt in self.get_parameter_names():
# self.get(cnt)
[docs] def serial(self, here):
self.visa_handle.write_raw(here)
[docs] def start(self):
self.write_to_serial(129, 1)
[docs] def write_to_serial(self, address, value):
# register=chr(address)
# first_byte=chr(value>>8)
# last_byte=chr(value&255)
self.visa_handle.write_raw(bytes([address]))
self.visa_handle.write_raw(bytes([value >> 8]))
self.visa_handle.write_raw(bytes([value & 255]))
[docs] def ask_from_serial(self, address, register=255):
self.visa_handle.write_raw(bytes([register]))
self.visa_handle.write_raw(bytes([0]))
self.visa_handle.write_raw(bytes([address]))
readresult = self.read_raw_bytes(size=3)
result = (int(readresult[1]) << 8) | int(readresult[2])
return result
[docs] def ask_from_serial_signed(self, address, register=255):
self.visa_handle.write_raw(chr(register))
self.visa_handle.write_raw(chr(0))
self.visa_handle.write_raw(chr(address))
readresult = self.read_raw_bytes(size=3)
if int(readresult[0]) == address:
unsigned = (int(readresult[1]) << 8) | int(readresult[2])
signed = unsigned - 256 if unsigned > 127 else unsigned
return signed
else:
raise Exception('Wrong address returned')
[docs] def read_raw_bytes(self, size=None):
'''
Returns the values that are in the FPGA buffer. Replacement for
read_raw in messagebased.py, also see:
https://github.com/hgrecco/pyvisa/issues/93
https://github.com/hgrecco/pyvisa/issues/190
'''
size = self.visa_handle.chunk_size if size is None else size
with self.visa_handle.ignore_warning(visa.constants.VI_SUCCESS_MAX_CNT):
chunk, status = self.visa_handle.visalib.read(self.visa_handle.session, size)
if not len(chunk) == size:
raise Exception('Visa received incorrect number of bytes')
return chunk
[docs] def set_(self, value):
'''
Set the number of traces over which pulse events will be counted
'''
self.write_to_serial(129, value)
[docs] def get_total_cycle_num(self):
'''
Get the total number of cycles which are averaged in FPGA
'''
return self._total_cycle_num
[docs] def set_total_cycle_num(self, value):
'''
Set the total number of cycles which are averaged in FPGA
'''
self._total_cycle_num = value
self.write_to_serial(131, value)
[docs] def get_ch1_datapoint_num(self):
'''
Get the total number of cycles which are averaged in FPGA
'''
return self.ask_from_serial(3) - 1
[docs] def get_ch2_datapoint_num(self):
'''
Get the total number of cycles which are averaged in FPGA
'''
return self.ask_from_serial(7) - 1
[docs] def get_measurement_done(self, ch=[1, 2]):
'''
Returns one if the measurement is done, else returns zero
'''
meas_done = True
for ch_name in ch:
meas_tmp = self.get('ch%i_cycle_num' % ch_name)
meas_done = meas_done and (meas_tmp == self._total_cycle_num)
return meas_done
[docs] def get_data(self, address=2):
'''
Read data ch1, unsigned
'''
if not self.get_measurement_done():
return False
self.visa_handle.write_raw(chr(255))
self.visa_handle.write_raw(chr(0))
self.visa_handle.write_raw(chr(address))
result = []
for x in range(0, 1000, 1):
readresult = self.read_raw_bytes(size=3)
result.append((int(readresult[1]) << 8) | int(readresult[2]))
self._data = result
return result
[docs] def get_ch1_data(self, address=2, checkdone=True, buf=True):
'''
Reads signed data out of the FPGA
'''
Npoint = self.get_ch1_datapoint_num()
if checkdone:
if not self.get_measurement_done(ch=[1]):
return False
self.visa_handle.write_raw(bytes([255]))
self.visa_handle.write_raw(bytes([0]))
self.visa_handle.write_raw(bytes([address]))
signed = []
if Npoint == 0:
raise ValueError('There is no fpga output, the number of data points recorded for ch1 is 0 ')
if buf:
readresultbuf = self.read_raw_bytes(3 * Npoint)
if len(readresultbuf) != 3 * Npoint:
print('Npoint %d' % Npoint)
raise Exception('get_ch1_data: error reading data')
for x in range(0, Npoint, 1):
readresult = readresultbuf[3 * x:3 * (x + 1)]
unsigned = (int(readresult[0]) << 16) | (int(readresult[1]) << 8) | int(readresult[2])
signed_temp = unsigned - 16777215 if unsigned > 8388607 else unsigned
if x < Npoint:
signed.append(signed_temp)
else:
for x in range(0, Npoint, 1):
readresult = self.read_raw_bytes(size=3)
unsigned = (int(readresult[0]) << 16) | (int(readresult[1]) << 8) | int(readresult[2])
signed_temp = unsigned - 16777215 if unsigned > 8388607 else unsigned
if x < Npoint:
signed.append(signed_temp)
self._data_signed = signed
self.visa_handle.flush(16)
signed = [x * self.mirrorfactors[0] for x in signed]
return signed
[docs] def get_ch2_data(self, address=6, checkdone=True):
'''
Reads signed data out of the FPGA
'''
Npoint = self.get_ch2_datapoint_num()
if checkdone:
if not self.get_measurement_done(ch=[2]):
return False
self.visa_handle.write_raw(bytes([255]))
self.visa_handle.write_raw(bytes([0]))
self.visa_handle.write_raw(bytes([address]))
signed = []
if Npoint == 0:
raise ValueError('There is no fpga output, the number of data points recorded for ch2 is 0 ')
readresultbuf = self.read_raw_bytes(3 * Npoint)
for x in range(0, Npoint, 1):
readresult = readresultbuf[3 * x:3 * (x + 1)]
unsigned = (int(readresult[0]) << 16) | (int(readresult[1]) << 8) | int(readresult[2])
signed_temp = unsigned - 16777215 if unsigned > 8388607 else unsigned
if x < Npoint:
signed.append(signed_temp)
self._data_signed = signed
self.visa_handle.flush(16)
signed = [x * self.mirrorfactors[1] for x in signed]
return signed
[docs] def set_sampling_frequency(self, value):
'''
Set the total number of cycles which are averaged in FPGA, maximum samp freq=1 MHz and minimum is freq=763.
'''
if value > 1e6:
raise ValueError('The sampling frequency can not be set higher than 1 MHz.')
internal_clock = 50e6
Ndivision = round(internal_clock * 1.0 / value)
fs = internal_clock / Ndivision
self._sampling_frequency = fs
if self.verbose:
print('FPGA internal clock is 50MHz, dividing it by %d, yields samp. freq. is %d Hz' %
(Ndivision, self._sampling_frequency))
return self.write_to_serial(132, int(Ndivision))
[docs] def get_sampling_frequency(self):
'''
Get the total number of cycles which are averaged in FPGA
'''
return self._sampling_frequency
[docs] def get_sampling_frequency_ratio(self):
'''
Get the total number of cycles which are averaged in FPGA
'''
return self.ask_from_serial(132)
[docs] def readFPGA(self, FPGA_mode=0, ReadDevice=['FPGA_ch1', 'FPGA_ch2'], Naverage=1, verbose=1, waittime=0):
'''
Basic function to read the data from the FPGA memory.
'''
t0 = time.clock()
self.set('mode', FPGA_mode)
self.set('total_cycle_num', Naverage)
self.start()
if verbose >= 2:
print(' readFPGA: dt %.3f [ms], after start' % (1e3 * (time.clock() - t0)))
time.sleep(waittime)
if verbose >= 2:
print(' readFPGA: dt %.3f [ms], after wait' % (1e3 * (time.clock() - t0)))
cc = []
if 'FPGA_ch1' in ReadDevice:
cc += [1]
if 'FPGA_ch2' in ReadDevice:
cc += [2]
for c in cc:
loop = 0
while not self.get_measurement_done(ch=[c]):
if verbose >= 2:
print('readFPGA: waiting for FPGA for 7.5 ms longer time (channel %d)' % c)
time.sleep(0.0075)
loop = loop + 1
if loop * .0075 > 1:
pass
raise Exception('readFPGA: error')
if verbose >= 2:
print(' readFPGA: dt %.3f [ms], after dynamic wait' % (1e3 * (time.clock() - t0)))
DataRead_ch1 = []
if 'FPGA_ch1' in ReadDevice:
DataRead_ch1 = self.get_ch1_data(checkdone=False, buf=True)
DataRead_ch2 = []
if 'FPGA_ch2' in ReadDevice:
DataRead_ch2 = self.get_ch2_data(checkdone=False)
if verbose >= 2:
print(' readFPGA: dt %.3f [ms]' % (1e3 * (time.clock() - t0)))
if verbose >= 2:
print(' readFPGA: DataRead_ch1 %d, DataRead_ch2 %d' % (len(DataRead_ch1), len(DataRead_ch2)))
totalpoints = max(len(DataRead_ch1), len(DataRead_ch2))
return totalpoints, DataRead_ch1, DataRead_ch2