Skip to content
Snippets Groups Projects
Commit 9860103e authored by sveseli's avatar sveseli
Browse files

fix directory upload issue

parent 1eb392b2
No related branches found
No related tags found
No related merge requests found
...@@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility ...@@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility
class DirectoryUploadInfo(DmObject): class DirectoryUploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}): def __init__(self, dict={}):
DmObject.__init__(self, dict) DmObject.__init__(self, dict)
...@@ -27,7 +27,7 @@ class DirectoryUploadInfo(DmObject): ...@@ -27,7 +27,7 @@ class DirectoryUploadInfo(DmObject):
processingInfo = self.get('processingInfo') processingInfo = self.get('processingInfo')
endTime = 0 endTime = 0
uploadStatus = 'done' uploadStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
for processorName in processingInfo.keys(): for processorName in processingInfo.keys():
processingEndTime = processingInfo[processorName].get('processingEndTime') processingEndTime = processingInfo[processorName].get('processingEndTime')
status = processingInfo[processorName].get('status') status = processingInfo[processorName].get('status')
......
...@@ -290,6 +290,19 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -290,6 +290,19 @@ class ExperimentSessionControllerImpl(DmObjectManager):
UploadTracker.getInstance().startUpload(uploadId, uploadInfo) UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0 uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipPlugins:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName)) self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment]) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment])
timer.start() timer.start()
...@@ -314,18 +327,13 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -314,18 +327,13 @@ class ExperimentSessionControllerImpl(DmObjectManager):
return return
fileProcessingManager = FileProcessingManager.getInstance() fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName)) self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName))
for processorKey in fileProcessingManager.fileProcessorKeyList: for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey) processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name processorName = processor.name
if processorName in skipPlugins: if not processorName in skipPlugins:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict]) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
timer.start() timer.start()
uploadInfo['nFiles'] = len(filePathsDict) uploadInfo['nFiles'] = len(filePathsDict)
...@@ -351,7 +359,10 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -351,7 +359,10 @@ class ExperimentSessionControllerImpl(DmObjectManager):
canProcess = True canProcess = True
for depProcessorName in dependsOn: for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status') depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in ['skipped', 'aborted', 'failed']: if depProcessorStatus in [
dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED,
dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED,
dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]:
# We must skip processing # We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
...@@ -359,7 +370,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -359,7 +370,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]: elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
# Do nothing # Do nothing
pass pass
elif depProcessorStatus == 'done': elif depProcessorStatus == dmProcessingStatus.DM_PROCESSING_STATUS_DONE:
# We can proceed # We can proceed
canProcess = True canProcess = True
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment