Skip to content
Snippets Groups Projects
Commit 116fc336 authored by sveseli's avatar sveseli
Browse files

fixes for monitorint errors

parent 4d37fa61
No related branches found
No related tags found
No related merge requests found
...@@ -25,12 +25,12 @@ class DaqInfo(DmObject): ...@@ -25,12 +25,12 @@ class DaqInfo(DmObject):
nProcessedFiles = 0 nProcessedFiles = 0
nProcessingErrors = 0 nProcessingErrors = 0
processingErrors = {} processingErrors = {}
for (filePath,uploadFileInfo) in fileDict.items(): for (filePath,fileProcessingInfo) in fileDict.items():
if uploadFileInfo.get('processed'): if fileProcessingInfo.get('processed'):
nProcessedFiles += 1 nProcessedFiles += 1
elif uploadFileInfo.get('processingError'): elif fileProcessingInfo.get('processingError'):
nProcessingErrors += 1 nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError') processingErrors[filePath] = fileProcessingInfo.get('processingError')
if len(processingErrors): if len(processingErrors):
self['processingErrors'] = processingErrors self['processingErrors'] = processingErrors
......
...@@ -24,14 +24,14 @@ class UploadInfo(DmObject): ...@@ -24,14 +24,14 @@ class UploadInfo(DmObject):
nProcessingErrors = 0 nProcessingErrors = 0
processingErrors = {} processingErrors = {}
endTime = 0 endTime = 0
for (filePath,uploadFileInfo) in fileDict.items(): for (filePath,fileProcessingInfo) in fileDict.items():
if uploadFileInfo.get('processed'): if fileProcessingInfo.get('processed'):
nProcessedFiles += 1 nProcessedFiles += 1
elif uploadFileInfo.get('processingError'): elif fileProcessingInfo.get('processingError'):
nProcessingErrors += 1 nProcessingErrors += 1
processingErrors[filePath] = uploadFileInfo.get('processingError') processingErrors[filePath] = fileProcessingInfo.get('processingError')
endProcessingTime = uploadFileInfo.get('endProcessingTime') endProcessingTime = fileProcessingInfo.get('endProcessingTime')
if endProcessingTime is not None and endProcessingTime > endTime: if endProcessingTime is not None and endProcessingTime > endTime:
endTime = endProcessingTime endTime = endProcessingTime
if len(processingErrors): if len(processingErrors):
......
...@@ -38,6 +38,8 @@ class FileProcessingThread(threading.Thread): ...@@ -38,6 +38,8 @@ class FileProcessingThread(threading.Thread):
try: try:
fileInfo['startProcessingTime'] = time.time() fileInfo['startProcessingTime'] = time.time()
processingInfo = fileInfo.get('processingInfo')
processingInfo['startProcessingTime'] = fileInfo['startProcessingTime']
processorNumber = 0 processorNumber = 0
nProcessors = len(self.fileProcessorKeyList) nProcessors = len(self.fileProcessorKeyList)
for processorKey in self.fileProcessorKeyList: for processorKey in self.fileProcessorKeyList:
...@@ -59,6 +61,8 @@ class FileProcessingThread(threading.Thread): ...@@ -59,6 +61,8 @@ class FileProcessingThread(threading.Thread):
if processorNumber == nProcessors: if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath)) self.logger.debug('File %s processing is complete' % (filePath))
fileInfo['endProcessingTime'] = time.time() fileInfo['endProcessingTime'] = time.time()
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processed'] = True
except Exception, ex: except Exception, ex:
self.logger.exception(ex) self.logger.exception(ex)
errorMsg = '%s processing error: %s' % (processorName, str(ex)) errorMsg = '%s processing error: %s' % (processorName, str(ex))
...@@ -73,10 +77,11 @@ class FileProcessingThread(threading.Thread): ...@@ -73,10 +77,11 @@ class FileProcessingThread(threading.Thread):
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0: if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo self.unprocessedFileDict[filePath] = fileInfo
fileInfo['processingError'] = errorMsg
fileInfo['endProcessingTime'] = time.time() fileInfo['endProcessingTime'] = time.time()
processingInfo['endProcessingTime'] = fileInfo['endProcessingTime']
processingInfo['processingError'] = errorMsg
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
else: else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
......
...@@ -114,8 +114,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -114,8 +114,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False } fileProcessingInfo = { 'processed' : False }
fileDict[filePath] = fileUploadInfo fileDict[filePath] = fileProcessingInfo
fileInfo['processingInfo'] = fileProcessingInfo
try: try:
fileProcessingManager.processFile(fileInfo) fileProcessingManager.processFile(fileInfo)
except Exception, ex: except Exception, ex:
......
...@@ -96,6 +96,8 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -96,6 +96,8 @@ class FileSystemObserver(threading.Thread,Singleton):
return return
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
fileProcessingInfo = { 'processed' : False }
observedFile['processingInfo'] = fileProcessingInfo
observedFile.setLastUpdateTimeToNow() observedFile.setLastUpdateTimeToNow()
if daqInfo: if daqInfo:
daqFileDict = daqInfo['fileDict'] daqFileDict = daqInfo['fileDict']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment