diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 0e3e2a9e77cf4dfa1fae0d7592f5532aa4f0f3ee..a657e033152ee862f1b4488f6b408156bc62c347 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import copy +import time from dmObject import DmObject from dm.common.utility.dictUtility import DictUtility @@ -15,6 +16,7 @@ class DaqInfo(DmObject): self['fileDict'] = self.get('fileDict', {}) def updateStatus(self): + now = time.time() daqStatus = self.get('status', 'running') if daqStatus == 'done': return @@ -49,6 +51,9 @@ class DaqInfo(DmObject): self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors + runTime = now - self.get('startTime') + self['runTime'] = runTime + if self.get('endTime'): daqStatus = 'done' self['runTime'] = self.get('endTime') - self.get('startTime') diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 06469ab73042a1313063b3a519a19dfce8fac4e1..1c54ca9ca8dcefc8bdf15863ee22cebcb8677b75 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -14,11 +14,12 @@ class UploadInfo(DmObject): self['fileDict'] = self.get('fileDict', {}) def updateStatus(self): + now = time.time() uploadStatus = self.get('status', 'running') if uploadStatus == 'done': return fileDict = self.get('fileDict') - nFiles = len(fileDict) + nFiles = self.get('nFiles', len(fileDict)) nProcessedFiles = 0 nProcessingErrors = 0 processingErrors = {} @@ -36,22 +37,26 @@ class UploadInfo(DmObject): if len(processingErrors): self['processingErrors'] = processingErrors + startTime = self.get('startTime') + if startTime: + runTime = now - startTime + self['runTime'] = runTime + # need to handle 'failed' uploads nCompletedFiles = nProcessedFiles+nProcessingErrors if nCompletedFiles == nFiles: uploadStatus = 'done' if not endTime: - endTime = time.time() + endTime = now self['endTime'] = endTime self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime) - startTime = self.get('startTime') if startTime: runTime = endTime - startTime self['runTime'] = runTime self['status'] = uploadStatus self['nProcessedFiles'] = '%s' % (nProcessedFiles) self['nProcessingErrors'] = '%s' % (nProcessingErrors) - self['nFiles'] = '%s' % (nFiles) + #self['nFiles'] = '%s' % (nFiles) percentageComplete = 100.0 percentageProcessed = 100.0 diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index aa5c4d31bcbae30c5b27040fcaf8c77aeb8cab8c..99fa71797e704b9b3a18bdb8f46efd3e8c70cdd9 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin): lines = subprocess.getStdOut().split('\n') pluginFilePathsDict = {} pathBase = dataDirectory - filePaths = filePathsDict.keys() for line in lines: if line.endswith(os.sep): continue filePath = os.path.join(pathBase, line) - if filePath in filePaths: - pluginFilePathsDict[filePath] = filePathsDict.get(filePath) + filePathDict = filePathsDict.get(filePath) + if filePathDict: + pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict diff --git a/src/python/dm/common/utility/dictUtility.py b/src/python/dm/common/utility/dictUtility.py index 3e357d6d58bff6996e058d33f6b048a2a19ea78f..9dc224db069b0e9e9a889778223b7d3b069219bb 100755 --- a/src/python/dm/common/utility/dictUtility.py +++ b/src/python/dm/common/utility/dictUtility.py @@ -6,6 +6,22 @@ class DictUtility: @classmethod def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]): + dict2 = {} + if len(includeKeys): + for key in includeKeys: + value = dict.get(key) + if value is not None: + dict2[key] = copy.deepcopy(value) + elif len(excludeKeys): + for key in dict.keys(): + if key not in excludeKeys: + dict2[key] = copy.deepcopy(dict[key]) + else: + dict2 = copy.deepcopy(dict) + return dict2 + + @classmethod + def deepCopy2(cls, dict, includeKeys=[], excludeKeys=[]): dict2 = copy.deepcopy(dict) if len(includeKeys): for key in dict2.keys(): diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index eca652131d1a86738d6b29fb4b92e9fd466a5341..cc91ed87452a544ea7629918db821127951d69fc 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -8,6 +8,7 @@ import os import time import uuid import copy +import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager @@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ + UPLOAD_DELAY_IN_SECONDS = 1.0 + def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() @@ -58,6 +61,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) + UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) + experiment['daqInfo'] = daqInfo storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: @@ -91,24 +96,31 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} + #UploadTracker.getInstance().put(uploadId, uploadInfo) + UploadTracker.getInstance().startUpload(uploadId, uploadInfo) + uploadInfo['fileDict'] = fileDict + uploadInfo['nFiles'] = len(filePathsDict) + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) + timer.start() + return uploadInfo + + def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): + uploadId = uploadInfo.get('id') + self.logger.debug('Preparing upload id: %s' % uploadId) + fileDict = uploadInfo.get('fileDict') + dataDirectory = uploadInfo.get('dataDirectory') + fileProcessingManager = FileProcessingManager.getInstance() for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } - # Stat should be done by agent, not by observer. - try: - FileUtility.statFile(filePath, fileUploadInfo) - except: - # Ok, may be remote file - pass fileDict[filePath] = fileUploadInfo - fileProcessingManager.processFile(fileInfo) - - uploadInfo['fileDict'] = fileDict - #self.logger.debug('Upload info %s' % uploadInfo) - UploadTracker.getInstance().put(uploadId, uploadInfo) - return uploadInfo.scrub(includeFileDetails) + try: + fileProcessingManager.processFile(fileInfo) + except Exception, ex: + self.logger.error('Processing error: %s', ex) + self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) def getUploadInfo(self, id, includeFileDetails=False): uploadInfo = UploadTracker.getInstance().get(id) diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py index 990fae1dbda5934a5a670068a91c055e0bd8b620..7c751eac6a388ca46ec32684353f27595592d836 100755 --- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -1,7 +1,9 @@ #!/usr/bin/env python +import os from dm.common.objects.uploadInfo import UploadInfo from dm.common.utility.objectTracker import ObjectTracker +from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists class UploadTracker(ObjectTracker): @@ -9,6 +11,30 @@ class UploadTracker(ObjectTracker): objectClass = UploadInfo cacheSize = 100 + def __init__(self, *args, **kwargs): + ObjectTracker.__init__(self, args, kwargs) + self.activeUploadDict = {} + + def checkForActiveUpload(self, experiment, dataDirectory): + experimentName = experiment.get('name') + dataDir = os.path.normpath(dataDirectory) + activeUploadKey = experimentName + dataDir + uploadId = self.activeUploadDict.get(activeUploadKey) + if uploadId: + uploadInfo = self.get(uploadId) + if uploadInfo is not None: + if uploadInfo.get('status') == 'running': + raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir)) + del self.activeUploadDict[activeUploadKey] + + def startUpload(self, uploadId, uploadInfo): + experimentName = uploadInfo.get('experimentName') + dataDirectory = uploadInfo.get('dataDirectory') + dataDir = os.path.normpath(dataDirectory) + activeUploadKey = experimentName + dataDir + self.activeUploadDict[activeUploadKey] = uploadId + self.put(uploadId, uploadInfo) + #################################################################### # Testing