Newer
Older
mwyman
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import numpy as np
import os
# remove the following when attenuation function completed
import random
import xraylib as xrl
THICK_MACRO = 'THICK'
MAT_MACRO = 'MAT'
TESTING = False
#Using local density definitions until package/library found with longer list
#in g/cm^3
DENSITY = {'Si': 2.33, 'TiO2': 4.23, 'InSb': 5.78}
def binary_representation_rev(num, bits):
# need to reverse order so 1st bit in list is for first filter
return format(num, '0'+str(bits)+'b')[::-1]
def get_densities(materials):
densities = dict.fromkeys(materials)
for material in list(densities):
try:
matdb = xrl.CompoundParser(material)
except ValueError as err:
print(f"{material} not found in xraylib.")
else:
if matdb['nAtomsAll'] == 1:
density = xrl.ElementDensity(matdb['Elements'][0])
else:
if material in list(DENSITY):
density = DENSITY[material]
else:
raise ValueError(f"{material} not found in DENSITY keys.")
densities[material]=density
return densities
class filterBlock():
def __init__(self):
'''
'''
#create necessary inputs/output; initialize to zero/empty
print(80*'#')
print('Initializing pyDevice filter block object')
self.materials = None
self.thicknesses = None
self.densities = None
self.lookupTable = []
self.culledTable = []
self.sorted_index = []
self.filter_attenuations = []
self.outMask = 0
self.inMask = 0
self.config = 0
self.attenuation = 1
self.attenuation_actual = 1
self.culledSize = 0
self.configIndex = 0
self.culledIndex = 0
self.configIndexSorted = 0
self.culledIndexSorted = 0
self.energy = 0 #in keV
self.verbose = False
self.num_filters = 0
print(80*'#')
def calc_lookup_table(self):
'''
'''
if TESTING:
# For testing, using a shuffled version of the reciprocal of array whose values equal their index position + 1
print('Using test array')
self.lookupTable = np.reciprocal((np.arange(2**self.num_filters)+1).astype(float))
np.random.shuffle(self.lookupTable)
else:
print('Using calculated attenuations')
self.filter_attenuations = np.empty(self.num_filters)
num_configs = 2**self.num_filters
att_log = np.empty(num_configs)
self.lookupTable = np.empty(num_configs)
for i in range(self.num_filters):
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
print(f'Getting attenuation for {self.materials[i]} at energy of {self.energy} of keV')
self.filter_attenuations[i] = xrl.CS_Total_CP(self.materials[i], self.energy)*mass_thickness
print(f'Filter: {i+1}, Mass_thickness: {mass_thickness}, log(attenuation): {self.filter_attenuations[i]}')
for j in range(num_configs):
binary_representation = binary_representation_rev(j, self.num_filters)
choice = [int(bit) for bit in binary_representation]
att_log[j] = sum(self.filter_attenuations*choice)
self.lookupTable = np.exp(att_log)
self.cull_lookup_table()
def cull_lookup_table(self):
'''
Culls the lookup table based on filters that are locked and or disabled
'''
self.culledSize = 2**(self.num_filters - (self.outMask | self.inMask).bit_count())
if self.verbose: print(f'Operating spaced now at {self.culledSize} configurations')
if self.verbose: print(f'Culling table with in mask {self.inMask} and out mask {self.outMask}')
self.culledConfigs = np.empty(self.culledSize, dtype=int)
self.culledTable = np.empty(self.culledSize)
j = 0
for i in range(2**self.num_filters):
if ((i & self.outMask == 0) and (i & self.inMask == self.inMask)):
self.culledConfigs[j]=i
self.culledTable[j] = self.lookupTable[i]
j += 1
self.sort_lookup_table()
def sort_lookup_table(self):
'''
'''
if self.verbose: print(f'Sorting culled lookup table of length {len(self.culledTable)}')
self.sorted_index = np.argsort(self.culledTable)
def calc_config_atten(self, filter_config, energy):
'''
Given filter set and incoming energy, calculate the attenuation
'''
att_log_total = 0
binary_representation = binary_representation_rev(filter_config, self.num_filters)
choice = [int(bit) for bit in binary_representation]
for i in range(self.num_filters):
if choice[i]:
mass_thickness = self.densities[i]*(self.thicknesses[i]*1e-4)
att_log_total += xrl.CS_Total_CP(self.materials[i], energy)*mass_thickness
return np.exp(att_log_total)
def setupLookupTable(self, subs_file, n_filters, energy = 8.0):
'''
lookup table created after IOC startup (after filter materials and
thicknesses are set
'''
print(80*'#')
print('Setting up filter control...')
self.num_filters = n_filters
self.energy = energy
#read in substitutions file
try:
subsFile = open(subs_file,"r")
except:
raise RuntimeError(f"Substiution file ({subsFile}) not found.")
subsFileContent = subsFile.readlines()
subsFile.close()
macros = subsFileContent[2].replace('{','').replace('}','').replace(',','').split()
filter_properties = {key: [] for key in macros} # dictionary of lists
for i in range(self.num_filters):
try:
xx = subsFileContent[3+i].replace('{','').replace('}','').replace(',','').replace('"','').split()
filter_properties[macros[0]].append(xx[0])
filter_properties[macros[1]].append(xx[1])
filter_properties[macros[2]].append(xx[2])
filter_properties[macros[3]].append(xx[3])
except:
raise RuntimeError(f"Number of filters ({self.num_filters}) doesn't match substitution file")
self.materials = []
self.thicknesses = []
# get materials from filter properties dictionary-list
print('Getting filter materials...')
if MAT_MACRO in macros:
self.materials = filter_properties[MAT_MACRO]
print('Filter material read in.\n')
else:
raise RuntimeError(f"Material macro ({MAT_MACRO}) not found in substituion file")
# get densities from local definition (for compounds) or from xraylib (for elements)
densities = get_densities(self.materials)
self.densities = [densities[material] for material in self.materials]
# get thicknesses from filter properties dictionary-list
print('Getting filter thicknesses...')
if THICK_MACRO in macros:
self.thicknesses = [float(i) for i in filter_properties[THICK_MACRO]]
print('Filter thicknesses read in.\n')
else:
raise RuntimeError(f"Thickness macro ({THICK_MACRO}) not found in substituion file")
print('Calculating lookup table...')
self.calc_lookup_table()
print('Lookup table calculation complete.\n')
print('Filter control setup complete.')
print(80*'#')
def updateEnergy(self, energy):
'''
Energy variable sent from IOC as a string
'''
self.energy = float(energy)
if self.materials is None or self.thicknesses is None:
raise RuntimeError("Substitution file hasn't been loaded. Can't calc lookup table")
else:
self.calc_lookup_table()
pydev.iointr('new_energy', self.energy)
self.setAttenActual()
self.updateAttenRBVs()
def updateAtten(self, attenuation):
'''
Attenuation variable sent from IOC as a string
'''
self.attenuation = float(attenuation)
if self.verbose: print(f'Updating attenuation to {self.attenuation}')
self.findFilterConfig()
def findFilterConfig(self):
'''
User selects attenuation, this function finds nearest acheivable attenuation
'''
# Code to search lookup table for nearest attenuation to desired
if self.verbose: print(f'Searching for config closest to {self.attenuation}')
self.culledIndex = np.argmin(np.abs(self.culledTable - self.attenuation))
if self.verbose: print(f'Config index found at {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted config index found at {self.culledIndexSorted}')
# Update PVs
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def getPreviewAtten(self, sortedIndex):
'''
'''
attenuation_preview = self.culledTable[self.sorted_index[int(sortedIndex)]]
if self.verbose: print(f'Preview attenuation for {sortedIndex} is {attenuation_preview}')
pydev.iointr('new_preview', attenuation_preview)
def updateIndex(self, sortedIndex):
'''
User has updated desired sorted index
'''
self.culledIndexSorted = int(sortedIndex)
self.culledIndex = self.sorted_index[self.culledIndexSorted]
self.setAttenActual()
self.updateFilterConfigPV()
self.updateFilterRBV()
self.updateAttenRBVs()
def updateConfig(self, config_BW):
'''
When user manually changes filters, this gets attenuation and displays it
along with updated RBVs but it doesn't set the config PV
'''
self.configIndex = int(config_BW)
self.configIndexSorted = self.sorted_index.tolist().index(self.configIndex)
self.setAttenActual()
self.updateFilterRBV()
self.updateAttenRBVs()
def setInMask(self, inMask):
'''
update mask for filters that are locked in
'''
self.inMask = int(inMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via in Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via in Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting in mask RBV to {self.inMask}')
pydev.iointr('new_inMask', int(self.inMask))
def setOutMask(self, outMask):
'''
update mask for filters that must remain out (either disabled or locked)
'''
self.outMask = int(outMask)
self.cull_lookup_table()
if self.verbose: print(f'Converting culled index via out Mask')
self.convertCulledIndex()
if self.verbose: print(f'Updating filter RBV via out Mask')
self.updateFilterRBV()
if self.verbose: print(f'Setting out mask RBV to {self.outMask}')
pydev.iointr('new_outMask', int(self.outMask))
def convertCulledIndex(self):
'''
When available configs change, need to update index so that tweaks
continue to work
'''
if self.verbose: print('Converting ...')
self.culledIndex = (np.where(self.culledConfigs == self.config))[0][0]
if self.verbose: print(f'Culled index is {self.culledIndex}')
self.culledIndexSorted = self.sorted_index.tolist().index(self.culledIndex)
if self.verbose: print(f'Sorted culled index is {self.culledIndexSorted}')
def setAttenActual(self):
'''
'''
self.attenuation_actual = self.culledTable[self.culledIndex]
def updateFilterConfigPV(self):
'''
'''
self.config = self.culledConfigs[self.culledIndex]
pydev.iointr('new_filters', int(self.config))
def updateFilterRBV(self):
'''
'''
pydev.iointr('new_index', int(self.culledIndexSorted))
def updateAttenRBVs(self):
'''
'''
pydev.iointr('new_atten', self.attenuation_actual)
self.attenuation_2E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*2)
pydev.iointr('new_atten_2E', self.attenuation_2E_actual)
self.attenuation_3E_actual = self.calc_config_atten(self.culledConfigs[self.culledIndex], self.energy*3)
pydev.iointr('new_atten_3E', self.attenuation_3E_actual)
def updateVerbosity(self, verbosity):
'''
Turn on minor printing
'''
print(f'Verbosity set to {verbosity}')
self.verbose = verbosity