From f8a58662e0feb67f33794da8c6a72db1e465a249 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 2 Feb 2016 07:28:38 +0000 Subject: [PATCH] performance improvements for upload: introduced timer for preparing upload files --- .../plugins/rsyncFileTransferPlugin.py | 6 ++-- src/python/dm/common/utility/dictUtility.py | 16 ++++++++++ .../impl/experimentSessionControllerImpl.py | 29 +++++++++++-------- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index aa5c4d31..99fa7179 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin): lines = subprocess.getStdOut().split('\n') pluginFilePathsDict = {} pathBase = dataDirectory - filePaths = filePathsDict.keys() for line in lines: if line.endswith(os.sep): continue filePath = os.path.join(pathBase, line) - if filePath in filePaths: - pluginFilePathsDict[filePath] = filePathsDict.get(filePath) + filePathDict = filePathsDict.get(filePath) + if filePathDict: + pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict diff --git a/src/python/dm/common/utility/dictUtility.py b/src/python/dm/common/utility/dictUtility.py index 3e357d6d..9dc224db 100755 --- a/src/python/dm/common/utility/dictUtility.py +++ b/src/python/dm/common/utility/dictUtility.py @@ -6,6 +6,22 @@ class DictUtility: @classmethod def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]): + dict2 = {} + if len(includeKeys): + for key in includeKeys: + value = dict.get(key) + if value is not None: + dict2[key] = copy.deepcopy(value) + elif len(excludeKeys): + for key in dict.keys(): + if key not in excludeKeys: + dict2[key] = copy.deepcopy(dict[key]) + else: + dict2 = copy.deepcopy(dict) + return dict2 + + @classmethod + def deepCopy2(cls, dict, includeKeys=[], excludeKeys=[]): dict2 = copy.deepcopy(dict) if len(includeKeys): for key in dict2.keys(): diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index eca65213..a0695f40 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -8,6 +8,7 @@ import os import time import uuid import copy +import threading from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager @@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ + UPLOAD_DELAY_IN_SECONDS = 1.0 + def __init__(self): DmObjectManager.__init__(self) self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() @@ -91,24 +94,26 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} + UploadTracker.getInstance().put(uploadId, uploadInfo) + uploadInfo['fileDict'] = fileDict + uploadInfo['nFiles'] = len(filePathsDict) + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) + timer.start() + return uploadInfo + + def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): + uploadId = uploadInfo.get('id') + self.logger.debug('Preparing upload id: %s', uploadId) + fileDict = uploadInfo.get('fileDict') + dataDirectory = uploadInfo.get('dataDirectory') for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } - # Stat should be done by agent, not by observer. - try: - FileUtility.statFile(filePath, fileUploadInfo) - except: - # Ok, may be remote file - pass fileDict[filePath] = fileUploadInfo - fileProcessingManager.processFile(fileInfo) - - uploadInfo['fileDict'] = fileDict - #self.logger.debug('Upload info %s' % uploadInfo) - UploadTracker.getInstance().put(uploadId, uploadInfo) - return uploadInfo.scrub(includeFileDetails) + #fileProcessingManager.processFile(fileInfo) + self.logger.debug('Done preparing upload id: %s', uploadId) def getUploadInfo(self, id, includeFileDetails=False): uploadInfo = UploadTracker.getInstance().get(id) -- GitLab