Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pyDevFilters.py 15.58 KiB
import numpy as np
import os
# remove the following when attenuation function completed
import random
import xraylib as xrl
THICK_MACRO = 'THICK'
MAT_MACRO = 'MAT'
TESTING = False
#Using local density definitions until package/library found with longer list
#in g/cm^3
DENSITY = {'Si': 2.33, 'TiO2': 4.23, 'InSb': 5.78}
def binary_representation_rev(num, bits):
# need to reverse order so 1st bit in list is for first filter
return format(num, '0'+str(bits)+'b')[::-1]
def get_densities(materials):
densities = dict.fromkeys(materials)
for material in list(densities):
try:
matdb = xrl.CompoundParser(material)
except ValueError as err:
print(f"{material} not found in xraylib.")
else:
if matdb['nAtomsAll'] == 1:
density = xrl.ElementDensity(matdb['Elements'][0])
else:
if material in list(DENSITY):
density = DENSITY[material]
else:
raise ValueError(f"{material} not found in DENSITY keys.")
densities[material]=density
return densities
def find_nearest_above(my_array, target):
diff = my_array - target
mask = np.ma.less_equal(diff, 0)
# We need to mask the negative differences and zero
# since we are looking for values above
if np.all(mask):
return -1 # returns None if target is greater than any value
masked_diff = np.ma.masked_array(diff, mask)
return masked_diff.argmin()
class filterBlock():
def __init__(self, filter_id = 'FL-1'):
'''
'''
#create necessary inputs/output; initialize to zero/empty
print(80*'#')
print('Initializing pyDevice filter block object')
self.materials = None
self.thicknesses = None
self.densities = None
self.lookupTable = []
self.culledTable = []
self.sorted_index = []
self.filter_attenuations = []
self.filter_id = filter_id
print(f'Filter ID: {self.filter_id}')
self.outMask = 0
self.inMask = 0
self.config = 0
self.attenuation = 1
self.attenuation_actual = 1
self.culledSize = 0
self.configIndex = 0
self.culledIndex = 0
self.configIndexSorted = 0
self.culledIndexSorted = 0
self.energy = 0 #in keV
self.verbose = False
self.num_filters = 0
print(80*'#')
def setupLookupTable(self, subs_file, n_filters, energy = 8.0):
'''
lookup table created after IOC startup (after filter materials and
thicknesses are set
'''
print(80*'#')
print('Setting up filter control...')
self.num_filters = n_filters
self.energy = energy
#read in substitutions file
try:
subsFile = open(subs_file,"r")
except:
raise RuntimeError(f"Substiution file ({subsFile}) not found.")
subsFileContent = subsFile.readlines()
subsFile.close()
macros = subsFileContent[2].replace('{','').replace('}','').replace(',','').split()
filter_properties = {key: [] for key in macros} # dictionary of lists
for i in range(self.num_filters):
try:
xx = subsFileContent[3+i].replace('{','').replace('}','').replace(',','').replace('"','').split()
filter_properties[macros[0]].append(xx[0])
filter_properties[macros[1]].append(xx[1])
filter_properties[macros[2]].append(xx[2])
filter_properties[macros[3]].append(xx[3])
except:
raise RuntimeError(f"Number of filters ({self.num_filters}) doesn't match substitution file")
self.materials = []
self.thicknesses = []
# get materials from filter properties dictionary-list
print('Getting filter materials...')
if MAT_MACRO in macros:
self.materials = filter_properties[MAT_MACRO]
print('Filter material read in.\n')
else:
raise RuntimeError(f"Material macro ({MAT_MACRO}) not found in substituion file")
# get densities from local definition (for compounds) or from xraylib (for elements)
densities = get_densities(self.materials)
self.densities = [densities[material] for material in self.materials]
# get thicknesses from filter properties dictionary-list
print('Getting filter thicknesses...')
if THICK_MACRO in macros:
self.thicknesses = [float(i) for i in filter_properties[THICK_MACRO]]
print('Filter thicknesses read in.\n')
else:
raise RuntimeError(f"Thickness macro ({THICK_MACRO}) not found in substituion file")
print('Calculating lookup table...')
self.calc_lookup_table()
print('Lookup table calculation complete.\n')
print('Filter control setup complete.')
print(80*'#')
def calc_lookup_table(self):
'''
'''
if TESTING:
# For testing, using a shuffled version of the reciprocal of array whose values equal their index position + 1
print('Using test array')
self.lookupTable = np.reciprocal((np.arange(2**self.num_filters)+1).astype(float))
np.random.shuffle(self.lookupTable)
else:
print('Using calculated attenuations')
self.filter_attenuations = np.empty(self.num_filters)
num_configs = 2**self.num_filters
att_log = np.empty(num_configs)
self.lookupTable = np.empty(num_configs)
for i in range(self.num_filters):
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
print(f'Getting attenuation for {self.materials[i]} at energy of {self.energy} of keV')
self.filter_attenuations[i] = xrl.CS_Total_CP(self.materials[i], self.energy)*mass_thickness
print(f'Filter: {i+1}, Mass_thickness: {mass_thickness}, log(attenuation): {self.filter_attenuations[i]}')
for j in range(num_configs):
binary_representation = binary_representation_rev(j, self.num_filters)
choice = [int(bit) for bit in binary_representation]
att_log[j] = sum(self.filter_attenuations*choice)
self.lookupTable = np.exp(att_log)
self.cull_lookup_table()
def cull_lookup_table(self):
'''
Culls the lookup table based on filters that are locked and or disabled
'''
self.culledSize = 2**(self.num_filters - (self.outMask | self.inMask).bit_count())
if self.verbose: print(f'Operating spaced now at {self.culledSize} configurations')
if self.verbose: print(f'Culling table with in mask {self.inMask} and out mask {self.outMask}')
self.culledConfigs = np.empty(self.culledSize, dtype=int)
self.culledTable = np.empty(self.culledSize)
j = 0
for i in range(2**self.num_filters):
if ((i & self.outMask == 0) and (i & self.inMask == self.inMask)):
self.culledConfigs[j]=i
self.culledTable[j] = self.lookupTable[i]
j += 1
self.sort_lookup_table()
def sort_lookup_table(self):
'''
'''
if self.verbose: print(f'Sorting culled lookup table of length {len(self.culledTable)}')
self.sorted_index = np.argsort(self.culledTable)
def calc_config_atten(self, filter_config, energy):
'''
Given filter set and incoming energy, calculate the attenuation
'''
att_log_total = 0
binary_representation = binary_representation_rev(filter_config, self.num_filters)
choice = [int(bit) for bit in binary_representation]
for i in range(self.num_filters):
if choice[i]:
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
att_log_total += xrl.CS_Total_CP(self.materials[i], energy)*mass_thickness
return np.exp(att_log_total)
def updateEnergy(self, energy):
'''
Energy variable sent from IOC as a string
'''
self.energy = float(energy)
if self.materials is None or self.thicknesses is None:
raise RuntimeError("Substitution file hasn't been loaded. Can't calc lookup table")
else:
self.calc_lookup_table()
pydev.iointr(self.filter_id+'_new_energy', self.energy)
self.setAttenActual()
self.updateAttenRBVs()
def updateAtten(self, attenuation):
'''
Attenuation variable sent from IOC as a string
'''
self.attenuation = float(attenuation)
if self.verbose: print(f'Updating attenuation to {self.attenuation}')
self.findFilterConfig()
def findFilterConfig(self):
'''
User selects attenuation, this function finds nearest acheivable attenuation
'''
# Code to search lookup table for nearest attenuation to desired
if self.verbose: print(f'Searching for config closest to {self.attenuation}')
nearest_above_idx = find_nearest_above(self.culledTable, self.attenuation)
if self.verbose: print(f'Nearest above index: {nearest_above_idx}')
if nearest_above_idx == -1:
# Desired attenuation is larger than possible at current energy and filter set
self.culledIndex = np.argmax(self.culledTable)
if self.verbose: print(f'Desired attenuation greater than maximum possible; setting to {self.culledIndex}')
else:
self.culledIndex = nearest_above_idx
#self.culledIndex = np.argmin(np.abs(self.culledTable - self.attenuation))
if self.verbose: print(f'Config index found at {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted config index found at {self.culledIndexSorted}')
# Update PVs
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def getPreviewAtten(self, sortedIndex):
'''
'''
attenuation_preview = self.culledTable[self.sorted_index[int(sortedIndex)]]
if self.verbose: print(f'Preview attenuation for {sortedIndex} is {attenuation_preview}')
pydev.iointr(self.filter_id+'_new_preview', attenuation_preview)
def updateIndex(self, sortedIndex):
'''
User has updated desired sorted index
'''
self.culledIndexSorted = int(sortedIndex)
self.culledIndex = self.sorted_index[self.culledIndexSorted]
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def updateConfig(self, config_BW):
'''
When user manually changes filters, this gets attenuation and displays it
along with updated RBVs but it doesn't set the config PV
'''
self.config = int(config_BW)
if self.verbose: print(f'User has set a filter, new base-2 config: {self.config}')
self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0]
if self.verbose: print(f'User has set a filter, new culled index: {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'User has set a filter, new sorted culled index: {self.culledIndexSorted}')
self.setAttenActual()
self.updateFilterRBV()
self.updateAttenRBVs()
def setInMask(self, inMask):
'''
update mask for filters that are locked in
'''
self.inMask = int(inMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via in Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via in Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting in mask RBV to {self.inMask}')
pydev.iointr(self.filter_id+'_new_inMask', int(self.inMask))
def setOutMask(self, outMask):
'''
update mask for filters that must remain out (either disabled or locked)
'''
self.outMask = int(outMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via out Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via out Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting out mask RBV to {self.outMask}')
pydev.iointr(self.filter_id+'_new_outMask', int(self.outMask))
def convertCulledIndex(self):
'''
When available configs change, need to update index so that tweaks
continue to work
'''
if self.verbose: print('Converting ...')
self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0]
if self.verbose: print(f'Culled index is {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted culled index is {self.culledIndexSorted}')
def setAttenActual(self):
'''
Find the RBVs for the actual attenuation and the values of the next
up/down as a preview for the tweak buttons
'''
self.attenuation_actual = self.culledTable[self.culledIndex]
next_up = find_nearest_above(self.culledTable, self.culledTable[self.culledIndex])
culledTable_sortedIndices = np.argsort(self.culledTable)
sorted_position = np.where(culledTable_sortedIndices == self.culledIndex)[0][0]
if self.verbose: print(f'Sorted position is {sorted_position}')
self.next_up_atten = self.culledTable[culledTable_sortedIndices[min(sorted_position + 1,len(culledTable_sortedIndices)-1)]]
self.next_dn_atten = self.culledTable[culledTable_sortedIndices[max(0,sorted_position - 1)]]
if self.verbose: print(f'Next up: {self.next_up_atten} and next down: {self.next_dn_atten}')
def updateFilterConfigPV(self):
'''
Update the filter config PV with the configuration associated
with the desired attenuation
'''
self.config = self.culledConfigs[self.culledIndex]
pydev.iointr(self.filter_id+'_new_filters', int(self.config))
def updateFilterRBV(self):
'''
Update the culled (masked) index RBV
'''
pydev.iointr(self.filter_id+'_new_index', int(self.culledIndexSorted))
def updateAttenRBVs(self):
'''
Updated RBVs for various attenuation-related PVs
'''
pydev.iointr(self.filter_id+'_new_atten', self.attenuation_actual)
pydev.iointr(self.filter_id+'_atten_up', self.next_up_atten)
pydev.iointr(self.filter_id+'_atten_dn', self.next_dn_atten)
self.attenuation_2E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*2)
pydev.iointr(self.filter_id+'_new_atten_2E', self.attenuation_2E_actual)
self.attenuation_3E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*3)
pydev.iointr(self.filter_id+'_new_atten_3E', self.attenuation_3E_actual)
def updateVerbosity(self, verbosity):
'''
Turn on minor printing
'''
print(f'Verbosity set to {verbosity}')
self.verbose = verbosity