diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 0e3e2a9e77cf4dfa1fae0d7592f5532aa4f0f3ee..a657e033152ee862f1b4488f6b408156bc62c347 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import copy +import time from dmObject import DmObject from dm.common.utility.dictUtility import DictUtility @@ -15,6 +16,7 @@ class DaqInfo(DmObject): self['fileDict'] = self.get('fileDict', {}) def updateStatus(self): + now = time.time() daqStatus = self.get('status', 'running') if daqStatus == 'done': return @@ -49,6 +51,9 @@ class DaqInfo(DmObject): self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors + runTime = now - self.get('startTime') + self['runTime'] = runTime + if self.get('endTime'): daqStatus = 'done' self['runTime'] = self.get('endTime') - self.get('startTime') diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 98088420f88b27108979ae7e09c631c4b17e2321..1c54ca9ca8dcefc8bdf15863ee22cebcb8677b75 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -14,6 +14,7 @@ class UploadInfo(DmObject): self['fileDict'] = self.get('fileDict', {}) def updateStatus(self): + now = time.time() uploadStatus = self.get('status', 'running') if uploadStatus == 'done': return @@ -36,15 +37,19 @@ class UploadInfo(DmObject): if len(processingErrors): self['processingErrors'] = processingErrors + startTime = self.get('startTime') + if startTime: + runTime = now - startTime + self['runTime'] = runTime + # need to handle 'failed' uploads nCompletedFiles = nProcessedFiles+nProcessingErrors if nCompletedFiles == nFiles: uploadStatus = 'done' if not endTime: - endTime = time.time() + endTime = now self['endTime'] = endTime self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime) - startTime = self.get('startTime') if startTime: runTime = endTime - startTime self['runTime'] = runTime diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index cddc1cec89d8c770a74790a1158b82f13a6b116f..cc91ed87452a544ea7629918db821127951d69fc 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -61,6 +61,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) + UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) + experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: @@ -94,7 +96,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} - UploadTracker.getInstance().put(uploadId, uploadInfo) + #UploadTracker.getInstance().put(uploadId, uploadInfo) + UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['fileDict'] = fileDict uploadInfo['nFiles'] = len(filePathsDict) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py index 990fae1dbda5934a5a670068a91c055e0bd8b620..3e9d385427f092c02b1038ed78220f4f99246442 100755 --- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -1,7 +1,9 @@ #!/usr/bin/env python +import os from dm.common.objects.uploadInfo import UploadInfo from dm.common.utility.objectTracker import ObjectTracker +from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists class UploadTracker(ObjectTracker): @@ -9,6 +11,30 @@ class UploadTracker(ObjectTracker): objectClass = UploadInfo cacheSize = 100 + def __init__(self, *args, **kwargs): + ObjectTracker.__init__(self, args, kwargs) + self.activeUploadDict = {} + + def checkForActiveUpload(self, experiment, dataDirectory): + experimentName = experiment.get('name') + dataDir = os.path.normpath(dataDirectory) + activeUploadKey = experimentName + dataDir + uploadId = self.activeUploadDict.get(activeUploadKey) + if uploadId: + uploadInfo = self.get(uploadId) + if uploadInfo is not None: + if uploadInfo.get('status') == 'running': + raise ObjectAlreadyExists('Upload is already active for experiment %s in data directory %s.' % (experimentName, dataDir)) + del self.activeUploadDict[activeUploadKey] + + def startUpload(self, uploadId, uploadInfo): + experimentName = uploadInfo.get('experimentName') + dataDirectory = uploadInfo.get('dataDirectory') + dataDir = os.path.normpath(dataDirectory) + activeUploadKey = experimentName + dataDir + self.activeUploadDict[activeUploadKey] = uploadId + self.put(uploadId, uploadInfo) + #################################################################### # Testing