diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index aa5c4d31bcbae30c5b27040fcaf8c77aeb8cab8c..99fa71797e704b9b3a18bdb8f46efd3e8c70cdd9 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin): lines = subprocess.getStdOut().split('\n') pluginFilePathsDict = {} pathBase = dataDirectory - filePaths = filePathsDict.keys() for line in lines: if line.endswith(os.sep): continue filePath = os.path.join(pathBase, line) - if filePath in filePaths: - pluginFilePathsDict[filePath] = filePathsDict.get(filePath) + filePathDict = filePathsDict.get(filePath) + if filePathDict: + pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict diff --git a/src/python/dm/common/utility/dictUtility.py b/src/python/dm/common/utility/dictUtility.py index 3e357d6d58bff6996e058d33f6b048a2a19ea78f..9dc224db069b0e9e9a889778223b7d3b069219bb 100755 --- a/src/python/dm/common/utility/dictUtility.py +++ b/src/python/dm/common/utility/dictUtility.py @@ -6,6 +6,22 @@ class DictUtility: @classmethod def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]): + dict2 = {} + if len(includeKeys): + for key in includeKeys: + value = dict.get(key) + if value is not None: + dict2[key] = copy.deepcopy(value) + elif len(excludeKeys): + for key in dict.keys(): + if key not in excludeKeys: + dict2[key] = copy.deepcopy(dict[key]) + else: + dict2 = copy.deepcopy(dict) + return dict2 + + @classmethod + def deepCopy2(cls, dict, includeKeys=[], excludeKeys=[]): dict2 = copy.deepcopy(dict) if len(includeKeys): for key in dict2.keys(): diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index eca652131d1a86738d6b29fb4b92e9fd466a5341..a0695f409a226244ad119f6ba8df82f38c247266 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -8,6 +8,7 @@ import os import time import uuid import copy +import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager @@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ + UPLOAD_DELAY_IN_SECONDS = 1.0 + def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() @@ -91,24 +94,26 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} + UploadTracker.getInstance().put(uploadId, uploadInfo) + uploadInfo['fileDict'] = fileDict + uploadInfo['nFiles'] = len(filePathsDict) + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) + timer.start() + return uploadInfo + + def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): + uploadId = uploadInfo.get('id') + self.logger.debug('Preparing upload id: %s', uploadId) + fileDict = uploadInfo.get('fileDict') + dataDirectory = uploadInfo.get('dataDirectory') for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } - # Stat should be done by agent, not by observer. - try: - FileUtility.statFile(filePath, fileUploadInfo) - except: - # Ok, may be remote file - pass fileDict[filePath] = fileUploadInfo - fileProcessingManager.processFile(fileInfo) - - uploadInfo['fileDict'] = fileDict - #self.logger.debug('Upload info %s' % uploadInfo) - UploadTracker.getInstance().put(uploadId, uploadInfo) - return uploadInfo.scrub(includeFileDetails) + #fileProcessingManager.processFile(fileInfo) + self.logger.debug('Done preparing upload id: %s', uploadId) def getUploadInfo(self, id, includeFileDetails=False): uploadInfo = UploadTracker.getInstance().get(id)