diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index a657e033152ee862f1b4488f6b408156bc62c347..c355cd03207bbd2aa6c83e963337a2940172bb14 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -25,12 +25,12 @@ class DaqInfo(DmObject): nProcessedFiles = 0 nProcessingErrors = 0 processingErrors = {} - for (filePath,uploadFileInfo) in fileDict.items(): - if uploadFileInfo.get('processed'): + for (filePath,fileProcessingInfo) in fileDict.items(): + if fileProcessingInfo.get('processed'): nProcessedFiles += 1 - elif uploadFileInfo.get('processingError'): + elif fileProcessingInfo.get('processingError'): nProcessingErrors += 1 - processingErrors[filePath] = uploadFileInfo.get('processingError') + processingErrors[filePath] = fileProcessingInfo.get('processingError') if len(processingErrors): self['processingErrors'] = processingErrors diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 1c54ca9ca8dcefc8bdf15863ee22cebcb8677b75..e00a44a6f56a240b25d767a4fa661ab0941fc7d1 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -24,14 +24,14 @@ class UploadInfo(DmObject): nProcessingErrors = 0 processingErrors = {} endTime = 0 - for (filePath,uploadFileInfo) in fileDict.items(): - if uploadFileInfo.get('processed'): + for (filePath,fileProcessingInfo) in fileDict.items(): + if fileProcessingInfo.get('processed'): nProcessedFiles += 1 - elif uploadFileInfo.get('processingError'): + elif fileProcessingInfo.get('processingError'): nProcessingErrors += 1 - processingErrors[filePath] = uploadFileInfo.get('processingError') + processingErrors[filePath] = fileProcessingInfo.get('processingError') - endProcessingTime = uploadFileInfo.get('endProcessingTime') + endProcessingTime = fileProcessingInfo.get('endProcessingTime') if endProcessingTime is not None and endProcessingTime > endTime: endTime = endProcessingTime if len(processingErrors): diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index e546cd9ebefbc7dae732ad4569dd2ccd2dce37a9..9a50ae9e108fb639249a302a5e61ea8f1ddd173d 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -38,6 +38,8 @@ class FileProcessingThread(threading.Thread): try: fileInfo['startProcessingTime'] = time.time() + processingInfo = fileInfo.get('processingInfo') + processingInfo['startProcessingTime'] = fileInfo['startProcessingTime'] processorNumber = 0 nProcessors = len(self.fileProcessorKeyList) for processorKey in self.fileProcessorKeyList: @@ -59,6 +61,8 @@ class FileProcessingThread(threading.Thread): if processorNumber == nProcessors: self.logger.debug('File %s processing is complete' % (filePath)) fileInfo['endProcessingTime'] = time.time() + processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] + processingInfo['processed'] = True except Exception, ex: self.logger.exception(ex) errorMsg = '%s processing error: %s' % (processorName, str(ex)) @@ -73,10 +77,11 @@ class FileProcessingThread(threading.Thread): self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: - self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) self.unprocessedFileDict[filePath] = fileInfo - fileInfo['processingError'] = errorMsg fileInfo['endProcessingTime'] = time.time() + processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] + processingInfo['processingError'] = errorMsg + self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index cc91ed87452a544ea7629918db821127951d69fc..0bf7ff67e5a4eb314aecdd08f2d3f936cc4b540b 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -114,8 +114,9 @@ class ExperimentSessionControllerImpl(DmObjectManager): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId - fileUploadInfo = { 'processed' : False } - fileDict[filePath] = fileUploadInfo + fileProcessingInfo = { 'processed' : False } + fileDict[filePath] = fileProcessingInfo + fileInfo['processingInfo'] = fileProcessingInfo try: fileProcessingManager.processFile(fileInfo) except Exception, ex: diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 89c8df9725a154a3c4304ce3cbf8f069733f978e..90b0e099a377ee62d23c5f749f0df0d91d837d2e 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -96,6 +96,8 @@ class FileSystemObserver(threading.Thread,Singleton): return observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) + fileProcessingInfo = { 'processed' : False } + observedFile['processingInfo'] = fileProcessingInfo observedFile.setLastUpdateTimeToNow() if daqInfo: daqFileDict = daqInfo['fileDict']