Newer
Older
mwyman
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import numpy as np
import os
# remove the following when attenuation function completed
import random
import xraylib as xrl
THICK_MACRO = 'THICK'
MAT_MACRO = 'MAT'
TESTING = False
#Using local density definitions until package/library found with longer list
#in g/cm^3
DENSITY = {'Si': 2.33, 'TiO2': 4.23, 'InSb': 5.78}
def binary_representation_rev(num, bits):
# need to reverse order so 1st bit in list is for first filter
return format(num, '0'+str(bits)+'b')[::-1]
def get_densities(materials):
densities = dict.fromkeys(materials)
for material in list(densities):
try:
matdb = xrl.CompoundParser(material)
except ValueError as err:
print(f"{material} not found in xraylib.")
else:
if matdb['nAtomsAll'] == 1:
density = xrl.ElementDensity(matdb['Elements'][0])
else:
if material in list(DENSITY):
density = DENSITY[material]
else:
raise ValueError(f"{material} not found in DENSITY keys.")
densities[material]=density
return densities
def find_nearest_above(my_array, target):
diff = my_array - target
mask = np.ma.less(diff, 0)
# We need to mask the negative differences and zero
# since we are looking for values above
if np.all(mask):
return -1 # returns None if target is greater than any value
masked_diff = np.ma.masked_array(diff, mask)
return masked_diff.argmin()
mwyman
committed
class filterBlock():
mwyman
committed
def __init__(self, filter_id = 'FL-1'):
mwyman
committed
'''
'''
#create necessary inputs/output; initialize to zero/empty
print(80*'#')
print('Initializing pyDevice filter block object')
self.materials = None
self.thicknesses = None
self.densities = None
self.lookupTable = []
self.culledTable = []
self.sorted_index = []
self.filter_attenuations = []
mwyman
committed
self.filter_id = filter_id
print(f'Filter ID: {self.filter_id}')
mwyman
committed
self.outMask = 0
self.inMask = 0
self.config = 0
self.attenuation = 1
self.attenuation_actual = 1
self.culledSize = 0
self.configIndex = 0
self.culledIndex = 0
self.configIndexSorted = 0
self.culledIndexSorted = 0
self.energy = 0 #in keV
self.verbose = False
self.num_filters = 0
print(80*'#')
mwyman
committed
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def setupLookupTable(self, subs_file, n_filters, energy = 8.0):
'''
lookup table created after IOC startup (after filter materials and
thicknesses are set
'''
print(80*'#')
print('Setting up filter control...')
self.num_filters = n_filters
self.energy = energy
#read in substitutions file
try:
subsFile = open(subs_file,"r")
except:
raise RuntimeError(f"Substiution file ({subsFile}) not found.")
subsFileContent = subsFile.readlines()
subsFile.close()
macros = subsFileContent[2].replace('{','').replace('}','').replace(',','').split()
filter_properties = {key: [] for key in macros} # dictionary of lists
for i in range(self.num_filters):
try:
xx = subsFileContent[3+i].replace('{','').replace('}','').replace(',','').replace('"','').split()
filter_properties[macros[0]].append(xx[0])
filter_properties[macros[1]].append(xx[1])
filter_properties[macros[2]].append(xx[2])
filter_properties[macros[3]].append(xx[3])
except:
raise RuntimeError(f"Number of filters ({self.num_filters}) doesn't match substitution file")
self.materials = []
self.thicknesses = []
# get materials from filter properties dictionary-list
print('Getting filter materials...')
if MAT_MACRO in macros:
self.materials = filter_properties[MAT_MACRO]
print('Filter material read in.\n')
else:
raise RuntimeError(f"Material macro ({MAT_MACRO}) not found in substituion file")
# get densities from local definition (for compounds) or from xraylib (for elements)
densities = get_densities(self.materials)
self.densities = [densities[material] for material in self.materials]
# get thicknesses from filter properties dictionary-list
print('Getting filter thicknesses...')
if THICK_MACRO in macros:
self.thicknesses = [float(i) for i in filter_properties[THICK_MACRO]]
print('Filter thicknesses read in.\n')
else:
raise RuntimeError(f"Thickness macro ({THICK_MACRO}) not found in substituion file")
print('Calculating lookup table...')
self.calc_lookup_table()
print('Lookup table calculation complete.\n')
print('Filter control setup complete.')
print(80*'#')
mwyman
committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def calc_lookup_table(self):
'''
'''
if TESTING:
# For testing, using a shuffled version of the reciprocal of array whose values equal their index position + 1
print('Using test array')
self.lookupTable = np.reciprocal((np.arange(2**self.num_filters)+1).astype(float))
np.random.shuffle(self.lookupTable)
else:
print('Using calculated attenuations')
self.filter_attenuations = np.empty(self.num_filters)
num_configs = 2**self.num_filters
att_log = np.empty(num_configs)
self.lookupTable = np.empty(num_configs)
for i in range(self.num_filters):
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
print(f'Getting attenuation for {self.materials[i]} at energy of {self.energy} of keV')
self.filter_attenuations[i] = xrl.CS_Total_CP(self.materials[i], self.energy)*mass_thickness
print(f'Filter: {i+1}, Mass_thickness: {mass_thickness}, log(attenuation): {self.filter_attenuations[i]}')
for j in range(num_configs):
binary_representation = binary_representation_rev(j, self.num_filters)
choice = [int(bit) for bit in binary_representation]
att_log[j] = sum(self.filter_attenuations*choice)
self.lookupTable = np.exp(att_log)
self.cull_lookup_table()
def cull_lookup_table(self):
'''
Culls the lookup table based on filters that are locked and or disabled
'''
self.culledSize = 2**(self.num_filters - (self.outMask | self.inMask).bit_count())
if self.verbose: print(f'Operating spaced now at {self.culledSize} configurations')
if self.verbose: print(f'Culling table with in mask {self.inMask} and out mask {self.outMask}')
self.culledConfigs = np.empty(self.culledSize, dtype=int)
self.culledTable = np.empty(self.culledSize)
j = 0
for i in range(2**self.num_filters):
if ((i & self.outMask == 0) and (i & self.inMask == self.inMask)):
self.culledConfigs[j]=i
self.culledTable[j] = self.lookupTable[i]
j += 1
self.sort_lookup_table()
def sort_lookup_table(self):
'''
'''
if self.verbose: print(f'Sorting culled lookup table of length {len(self.culledTable)}')
self.sorted_index = np.argsort(self.culledTable)
def calc_config_atten(self, filter_config, energy):
'''
Given filter set and incoming energy, calculate the attenuation
'''
att_log_total = 0
binary_representation = binary_representation_rev(filter_config, self.num_filters)
choice = [int(bit) for bit in binary_representation]
for i in range(self.num_filters):
if choice[i]:
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
att_log_total += xrl.CS_Total_CP(self.materials[i], energy)*mass_thickness
return np.exp(att_log_total)
def updateEnergy(self, energy):
'''
Energy variable sent from IOC as a string
'''
self.energy = float(energy)
if self.materials is None or self.thicknesses is None:
raise RuntimeError("Substitution file hasn't been loaded. Can't calc lookup table")
else:
self.calc_lookup_table()
mwyman
committed
pydev.iointr(self.filter_id+'_new_energy', self.energy)
mwyman
committed
self.setAttenActual()
self.updateAttenRBVs()
def updateAtten(self, attenuation):
'''
Attenuation variable sent from IOC as a string
'''
self.attenuation = float(attenuation)
if self.verbose: print(f'Updating attenuation to {self.attenuation}')
self.findFilterConfig()
def findFilterConfig(self):
'''
User selects attenuation, this function finds nearest acheivable attenuation
'''
# Code to search lookup table for nearest attenuation to desired
if self.verbose: print(f'Searching for config closest to {self.attenuation}')
nearest_above_idx = find_nearest_above(self.culledTable, self.attenuation)
mwyman
committed
if self.verbose: print(f'Nearest above index: {nearest_above_idx}')
if nearest_above_idx == -1:
# Desired attenuation is larger than possible at current energy and filter set
self.culledIndex = np.argmax(self.culledTable)
mwyman
committed
if self.verbose: print(f'Desired attenuation greater than maximum possible; setting to {self.culledIndex}')
else:
self.culledIndex = nearest_above_idx
#self.culledIndex = np.argmin(np.abs(self.culledTable - self.attenuation))
mwyman
committed
if self.verbose: print(f'Config index found at {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted config index found at {self.culledIndexSorted}')
# Update PVs
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def getPreviewAtten(self, sortedIndex):
'''
'''
attenuation_preview = self.culledTable[self.sorted_index[int(sortedIndex)]]
if self.verbose: print(f'Preview attenuation for {sortedIndex} is {attenuation_preview}')
mwyman
committed
pydev.iointr(self.filter_id+'_new_preview', attenuation_preview)
mwyman
committed
def updateIndex(self, sortedIndex):
'''
User has updated desired sorted index
'''
self.culledIndexSorted = int(sortedIndex)
self.culledIndex = self.sorted_index[self.culledIndexSorted]
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def updateConfig(self, config_BW):
'''
When user manually changes filters, this gets attenuation and displays it
along with updated RBVs but it doesn't set the config PV
'''
self.config = int(config_BW)
if self.verbose: print(f'User has set a filter, new base-2 config: {self.config}')
self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0]
if self.verbose: print(f'User has set a filter, new culled index: {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'User has set a filter, new sorted culled index: {self.culledIndexSorted}')
mwyman
committed
self.setAttenActual()
self.updateFilterRBV()
self.updateAttenRBVs()
def setInMask(self, inMask):
'''
update mask for filters that are locked in
'''
self.inMask = int(inMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via in Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via in Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting in mask RBV to {self.inMask}')
mwyman
committed
pydev.iointr(self.filter_id+'_new_inMask', int(self.inMask))
mwyman
committed
def setOutMask(self, outMask):
'''
update mask for filters that must remain out (either disabled or locked)
'''
self.outMask = int(outMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via out Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via out Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting out mask RBV to {self.outMask}')
mwyman
committed
pydev.iointr(self.filter_id+'_new_outMask', int(self.outMask))
mwyman
committed
def convertCulledIndex(self):
'''
When available configs change, need to update index so that tweaks
continue to work
'''
if self.verbose: print('Converting ...')
self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0]
if self.verbose: print(f'Culled index is {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted culled index is {self.culledIndexSorted}')
def setAttenActual(self):
'''
Find the RBVs for the actual attenuation and the values of the next
up/down as a preview for the tweak buttons
mwyman
committed
'''
self.attenuation_actual = self.culledTable[self.culledIndex]
next_up = find_nearest_above(self.culledTable, self.culledTable[self.culledIndex])
culledTable_sortedIndices = np.argsort(self.culledTable)
sorted_position = np.where(culledTable_sortedIndices == self.culledIndex)[0][0]
if self.verbose: print(f'Sorted position is {sorted_position}')
mwyman
committed
self.next_up_atten = self.culledTable[culledTable_sortedIndices[min(sorted_position + 1,len(culledTable_sortedIndices)-1)]]
self.next_dn_atten = self.culledTable[culledTable_sortedIndices[max(0,sorted_position - 1)]]
if self.verbose: print(f'Next up: {self.next_up_atten} and next down: {self.next_dn_atten}')
mwyman
committed
def updateFilterConfigPV(self):
'''
Update the filter config PV with the configuration associated
with the desired attenuation
mwyman
committed
'''
self.config = self.culledConfigs[self.culledIndex]
mwyman
committed
pydev.iointr(self.filter_id+'_new_filters', int(self.config))
mwyman
committed
def updateFilterRBV(self):
'''
Update the culled (masked) index RBV
mwyman
committed
'''
mwyman
committed
pydev.iointr(self.filter_id+'_new_index', int(self.culledIndexSorted))
mwyman
committed
def updateAttenRBVs(self):
'''
Updated RBVs for various attenuation-related PVs
mwyman
committed
'''
mwyman
committed
pydev.iointr(self.filter_id+'_new_atten', self.attenuation_actual)
pydev.iointr(self.filter_id+'_atten_up', self.next_up_atten)
pydev.iointr(self.filter_id+'_atten_dn', self.next_dn_atten)
mwyman
committed
self.attenuation_2E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*2)
mwyman
committed
pydev.iointr(self.filter_id+'_new_atten_2E', self.attenuation_2E_actual)
mwyman
committed
self.attenuation_3E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*3)
mwyman
committed
pydev.iointr(self.filter_id+'_new_atten_3E', self.attenuation_3E_actual)