Skip to content
Snippets Groups Projects
Commit 32a72a07 authored by sveseli's avatar sveseli
Browse files

merged fixes from 0.8

parents 4c37150b 116fc336
No related branches found
No related tags found
No related merge requests found
......@@ -25,12 +25,12 @@ class DaqInfo(DmObject):
nProcessedFiles = 0
nProcessingErrors = 0
processingErrors = {}
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
for (filePath,fileProcessingInfo) in fileDict.items():
if fileProcessingInfo.get('processed'):
nProcessedFiles += 1
elif uploadFileInfo.get('processingError'):
elif fileProcessingInfo.get('processingError'):
nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError')
processingErrors[filePath] = fileProcessingInfo.get('processingError')
if len(processingErrors):
self['processingErrors'] = processingErrors
......
......@@ -24,14 +24,14 @@ class UploadInfo(DmObject):
nProcessingErrors = 0
processingErrors = {}
endTime = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
for (filePath,fileProcessingInfo) in fileDict.items():
if fileProcessingInfo.get('processed'):
nProcessedFiles += 1
elif uploadFileInfo.get('processingError'):
elif fileProcessingInfo.get('processingError'):
nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError')
processingErrors[filePath] = fileProcessingInfo.get('processingError')
endProcessingTime = uploadFileInfo.get('endProcessingTime')
endProcessingTime = fileProcessingInfo.get('endProcessingTime')
if endProcessingTime is not None and endProcessingTime > endTime:
endTime = endProcessingTime
if len(processingErrors):
......
......@@ -38,6 +38,8 @@ class FileProcessingThread(threading.Thread):
try:
fileInfo['startProcessingTime'] = time.time()
processingInfo = fileInfo.get('processingInfo')
processingInfo['startProcessingTime'] = fileInfo['startProcessingTime']
processorNumber = 0
nProcessors = len(self.fileProcessorKeyList)
for processorKey in self.fileProcessorKeyList:
......@@ -59,6 +61,8 @@ class FileProcessingThread(threading.Thread):
if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath))
fileInfo['endProcessingTime'] = time.time()
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processed'] = True
except Exception, ex:
self.logger.exception(ex)
errorMsg = '%s processing error: %s' % (processorName, str(ex))
......@@ -73,10 +77,11 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo
fileInfo['processingError'] = errorMsg
fileInfo['endProcessingTime'] = time.time()
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processingError'] = errorMsg
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
......
......@@ -114,8 +114,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
fileDict[filePath] = fileUploadInfo
fileProcessingInfo = { 'processed' : False }
fileDict[filePath] = fileProcessingInfo
fileInfo['processingInfo'] = fileProcessingInfo
try:
fileProcessingManager.processFile(fileInfo)
except Exception, ex:
......
......@@ -96,6 +96,8 @@ class FileSystemObserver(threading.Thread,Singleton):
return
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
fileProcessingInfo = { 'processed' : False }
observedFile['processingInfo'] = fileProcessingInfo
observedFile.setLastUpdateTimeToNow()
if daqInfo:
daqFileDict = daqInfo['fileDict']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment