import numpy as np import os # remove the following when attenuation function completed import random import xraylib as xrl THICK_MACRO = 'THICK' MAT_MACRO = 'MAT' TESTING = False #Using local density definitions until package/library found with longer list #in g/cm^3 DENSITY = {'Si': 2.33, 'TiO2': 4.23, 'InSb': 5.78} def binary_representation_rev(num, bits): # need to reverse order so 1st bit in list is for first filter return format(num, '0'+str(bits)+'b')[::-1] def get_densities(materials): densities = dict.fromkeys(materials) for material in list(densities): try: matdb = xrl.CompoundParser(material) except ValueError as err: print(f"{material} not found in xraylib.") else: if matdb['nAtomsAll'] == 1: density = xrl.ElementDensity(matdb['Elements'][0]) else: if material in list(DENSITY): density = DENSITY[material] else: raise ValueError(f"{material} not found in DENSITY keys.") densities[material]=density return densities class filterBlock(): def __init__(self): ''' ''' #create necessary inputs/output; initialize to zero/empty print(80*'#') print('Initializing pyDevice filter block object') self.materials = None self.thicknesses = None self.densities = None self.lookupTable = [] self.culledTable = [] self.sorted_index = [] self.filter_attenuations = [] self.outMask = 0 self.inMask = 0 self.config = 0 self.attenuation = 1 self.attenuation_actual = 1 self.culledSize = 0 self.configIndex = 0 self.culledIndex = 0 self.configIndexSorted = 0 self.culledIndexSorted = 0 self.energy = 0 #in keV self.verbose = False self.num_filters = 0 print(80*'#') def calc_lookup_table(self): ''' ''' if TESTING: # For testing, using a shuffled version of the reciprocal of array whose values equal their index position + 1 print('Using test array') self.lookupTable = np.reciprocal((np.arange(2**self.num_filters)+1).astype(float)) np.random.shuffle(self.lookupTable) else: print('Using calculated attenuations') self.filter_attenuations = np.empty(self.num_filters) num_configs = 2**self.num_filters att_log = np.empty(num_configs) self.lookupTable = np.empty(num_configs) for i in range(self.num_filters): mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4) print(f'Getting attenuation for {self.materials[i]} at energy of {self.energy} of keV') self.filter_attenuations[i] = xrl.CS_Total_CP(self.materials[i], self.energy)*mass_thickness print(f'Filter: {i+1}, Mass_thickness: {mass_thickness}, log(attenuation): {self.filter_attenuations[i]}') for j in range(num_configs): binary_representation = binary_representation_rev(j, self.num_filters) choice = [int(bit) for bit in binary_representation] att_log[j] = sum(self.filter_attenuations*choice) self.lookupTable = np.exp(att_log) self.cull_lookup_table() def cull_lookup_table(self): ''' Culls the lookup table based on filters that are locked and or disabled ''' self.culledSize = 2**(self.num_filters - (self.outMask | self.inMask).bit_count()) if self.verbose: print(f'Operating spaced now at {self.culledSize} configurations') if self.verbose: print(f'Culling table with in mask {self.inMask} and out mask {self.outMask}') self.culledConfigs = np.empty(self.culledSize, dtype=int) self.culledTable = np.empty(self.culledSize) j = 0 for i in range(2**self.num_filters): if ((i & self.outMask == 0) and (i & self.inMask == self.inMask)): self.culledConfigs[j]=i self.culledTable[j] = self.lookupTable[i] j += 1 self.sort_lookup_table() def sort_lookup_table(self): ''' ''' if self.verbose: print(f'Sorting culled lookup table of length {len(self.culledTable)}') self.sorted_index = np.argsort(self.culledTable) def calc_config_atten(self, filter_config, energy): ''' Given filter set and incoming energy, calculate the attenuation ''' att_log_total = 0 binary_representation = binary_representation_rev(filter_config, self.num_filters) choice = [int(bit) for bit in binary_representation] for i in range(self.num_filters): if choice[i]: mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4) att_log_total += xrl.CS_Total_CP(self.materials[i], energy)*mass_thickness return np.exp(att_log_total) def setupLookupTable(self, subs_file, n_filters, energy = 8.0): ''' lookup table created after IOC startup (after filter materials and thicknesses are set ''' print(80*'#') print('Setting up filter control...') self.num_filters = n_filters self.energy = energy #read in substitutions file try: subsFile = open(subs_file,"r") except: raise RuntimeError(f"Substiution file ({subsFile}) not found.") subsFileContent = subsFile.readlines() subsFile.close() macros = subsFileContent[2].replace('{','').replace('}','').replace(',','').split() filter_properties = {key: [] for key in macros} # dictionary of lists for i in range(self.num_filters): try: xx = subsFileContent[3+i].replace('{','').replace('}','').replace(',','').replace('"','').split() filter_properties[macros[0]].append(xx[0]) filter_properties[macros[1]].append(xx[1]) filter_properties[macros[2]].append(xx[2]) filter_properties[macros[3]].append(xx[3]) except: raise RuntimeError(f"Number of filters ({self.num_filters}) doesn't match substitution file") self.materials = [] self.thicknesses = [] # get materials from filter properties dictionary-list print('Getting filter materials...') if MAT_MACRO in macros: self.materials = filter_properties[MAT_MACRO] print('Filter material read in.\n') else: raise RuntimeError(f"Material macro ({MAT_MACRO}) not found in substituion file") # get densities from local definition (for compounds) or from xraylib (for elements) densities = get_densities(self.materials) self.densities = [densities[material] for material in self.materials] # get thicknesses from filter properties dictionary-list print('Getting filter thicknesses...') if THICK_MACRO in macros: self.thicknesses = [float(i) for i in filter_properties[THICK_MACRO]] print('Filter thicknesses read in.\n') else: raise RuntimeError(f"Thickness macro ({THICK_MACRO}) not found in substituion file") print('Calculating lookup table...') self.calc_lookup_table() print('Lookup table calculation complete.\n') print('Filter control setup complete.') print(80*'#') def updateEnergy(self, energy): ''' Energy variable sent from IOC as a string ''' self.energy = float(energy) if self.materials is None or self.thicknesses is None: raise RuntimeError("Substitution file hasn't been loaded. Can't calc lookup table") else: self.calc_lookup_table() pydev.iointr('new_energy', self.energy) self.setAttenActual() self.updateAttenRBVs() def updateAtten(self, attenuation): ''' Attenuation variable sent from IOC as a string ''' self.attenuation = float(attenuation) if self.verbose: print(f'Updating attenuation to {self.attenuation}') self.findFilterConfig() def findFilterConfig(self): ''' User selects attenuation, this function finds nearest acheivable attenuation ''' # Code to search lookup table for nearest attenuation to desired if self.verbose: print(f'Searching for config closest to {self.attenuation}') self.culledIndex = np.argmin(np.abs(self.culledTable - self.attenuation)) if self.verbose: print(f'Config index found at {self.culledIndex}') self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex) if self.verbose: print(f'Sorted config index found at {self.culledIndexSorted}') # Update PVs self.setAttenActual() self.updateFilterConfigPV() self.updateFilterRBV() self.updateAttenRBVs() def getPreviewAtten(self, sortedIndex): ''' ''' attenuation_preview = self.culledTable[self.sorted_index[int(sortedIndex)]] if self.verbose: print(f'Preview attenuation for {sortedIndex} is {attenuation_preview}') pydev.iointr('new_preview', attenuation_preview) def updateIndex(self, sortedIndex): ''' User has updated desired sorted index ''' self.culledIndexSorted = int(sortedIndex) self.culledIndex = self.sorted_index[self.culledIndexSorted] self.setAttenActual() self.updateFilterConfigPV() self.updateFilterRBV() self.updateAttenRBVs() def updateConfig(self, config_BW): ''' When user manually changes filters, this gets attenuation and displays it along with updated RBVs but it doesn't set the config PV ''' self.configIndex = int(config_BW) self.configIndexSorted = self.sorted_index.tolist().index(self.configIndex) self.setAttenActual() self.updateFilterRBV() self.updateAttenRBVs() def setInMask(self, inMask): ''' update mask for filters that are locked in ''' self.inMask = int(inMask) self.cull_lookup_table() if self.verbose: print(f'Converting culled index via in Mask') self.convertCulledIndex() if self.verbose: print(f'Updating filter RBV via in Mask') self.updateFilterRBV() if self.verbose: print(f'Setting in mask RBV to {self.inMask}') pydev.iointr('new_inMask', int(self.inMask)) def setOutMask(self, outMask): ''' update mask for filters that must remain out (either disabled or locked) ''' self.outMask = int(outMask) self.cull_lookup_table() if self.verbose: print(f'Converting culled index via out Mask') self.convertCulledIndex() if self.verbose: print(f'Updating filter RBV via out Mask') self.updateFilterRBV() if self.verbose: print(f'Setting out mask RBV to {self.outMask}') pydev.iointr('new_outMask', int(self.outMask)) def convertCulledIndex(self): ''' When available configs change, need to update index so that tweaks continue to work ''' if self.verbose: print('Converting ...') self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0] if self.verbose: print(f'Culled index is {self.culledIndex}') self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex) if self.verbose: print(f'Sorted culled index is {self.culledIndexSorted}') def setAttenActual(self): ''' ''' self.attenuation_actual = self.culledTable[self.culledIndex] def updateFilterConfigPV(self): ''' ''' self.config = self.culledConfigs[self.culledIndex] pydev.iointr('new_filters', int(self.config)) def updateFilterRBV(self): ''' ''' pydev.iointr('new_index', int(self.culledIndexSorted)) def updateAttenRBVs(self): ''' ''' pydev.iointr('new_atten', self.attenuation_actual) self.attenuation_2E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*2) pydev.iointr('new_atten_2E', self.attenuation_2E_actual) self.attenuation_3E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*3) pydev.iointr('new_atten_3E', self.attenuation_3E_actual) def updateVerbosity(self, verbosity): ''' Turn on minor printing ''' print(f'Verbosity set to {verbosity}') self.verbose = verbosity