Skip to content
Snippets Groups Projects
Commit f8a58662 authored by sveseli's avatar sveseli
Browse files

performance improvements for upload: introduced timer for preparing upload files

parent c103f7ad
No related branches found
No related tags found
No related merge requests found
...@@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin): ...@@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
lines = subprocess.getStdOut().split('\n') lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {} pluginFilePathsDict = {}
pathBase = dataDirectory pathBase = dataDirectory
filePaths = filePathsDict.keys()
for line in lines: for line in lines:
if line.endswith(os.sep): if line.endswith(os.sep):
continue continue
filePath = os.path.join(pathBase, line) filePath = os.path.join(pathBase, line)
if filePath in filePaths: filePathDict = filePathsDict.get(filePath)
pluginFilePathsDict[filePath] = filePathsDict.get(filePath) if filePathDict:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
return pluginFilePathsDict return pluginFilePathsDict
......
...@@ -6,6 +6,22 @@ class DictUtility: ...@@ -6,6 +6,22 @@ class DictUtility:
@classmethod @classmethod
def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]): def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]):
dict2 = {}
if len(includeKeys):
for key in includeKeys:
value = dict.get(key)
if value is not None:
dict2[key] = copy.deepcopy(value)
elif len(excludeKeys):
for key in dict.keys():
if key not in excludeKeys:
dict2[key] = copy.deepcopy(dict[key])
else:
dict2 = copy.deepcopy(dict)
return dict2
@classmethod
def deepCopy2(cls, dict, includeKeys=[], excludeKeys=[]):
dict2 = copy.deepcopy(dict) dict2 = copy.deepcopy(dict)
if len(includeKeys): if len(includeKeys):
for key in dict2.keys(): for key in dict2.keys():
......
...@@ -8,6 +8,7 @@ import os ...@@ -8,6 +8,7 @@ import os
import time import time
import uuid import uuid
import copy import copy
import threading
from dm.common.objects.experiment import Experiment from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.objects.dmObjectManager import DmObjectManager
...@@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver ...@@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager): class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """ """ Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
def __init__(self): def __init__(self):
DmObjectManager.__init__(self) DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
...@@ -91,24 +94,26 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -91,24 +94,26 @@ class ExperimentSessionControllerImpl(DmObjectManager):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
fileDict = {} fileDict = {}
UploadTracker.getInstance().put(uploadId, uploadInfo)
uploadInfo['fileDict'] = fileDict
uploadInfo['nFiles'] = len(filePathsDict)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s', uploadId)
fileDict = uploadInfo.get('fileDict')
dataDirectory = uploadInfo.get('dataDirectory')
for filePath in filePathsDict.keys(): for filePath in filePathsDict.keys():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False } fileUploadInfo = { 'processed' : False }
# Stat should be done by agent, not by observer.
try:
FileUtility.statFile(filePath, fileUploadInfo)
except:
# Ok, may be remote file
pass
fileDict[filePath] = fileUploadInfo fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo) #fileProcessingManager.processFile(fileInfo)
self.logger.debug('Done preparing upload id: %s', uploadId)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo.scrub(includeFileDetails)
def getUploadInfo(self, id, includeFileDetails=False): def getUploadInfo(self, id, includeFileDetails=False):
uploadInfo = UploadTracker.getInstance().get(id) uploadInfo = UploadTracker.getInstance().get(id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment