Skip to content
Snippets Groups Projects
Commit 7a10ec34 authored by sveseli's avatar sveseli
Browse files

found bug with empty destination directory;prevent checking for upload if...

found bug with empty destination directory;prevent checking for upload if plugin does not need to process files
parent 422489bc
No related branches found
No related tags found
No related merge requests found
......@@ -74,11 +74,15 @@ class FileProcessingManager(threading.Thread,Singleton):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
for filePath in filePathsDict.keys():
self.logger.debug('Checking for hidden files')
nRemoved = 0
for (filePath,filePathDict) in filePathsDict.items():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
nRemoved += 1
self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Each plugin calculates list of files that need to be processed
......@@ -87,6 +91,7 @@ class FileProcessingManager(threading.Thread,Singleton):
if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')):
del uploadInfo['reprocessFiles']
return filePathsDict
self.logger.debug('Checking files with processor plugins')
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
......@@ -94,6 +99,7 @@ class FileProcessingManager(threading.Thread,Singleton):
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict)))
return checkedFilePathsDict
def processFile(self, fileInfo):
......
......@@ -16,12 +16,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True,
remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True):
FileTransferPlugin.__init__(self, command, src, dest)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
......@@ -44,15 +46,19 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
return destUrl
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s', uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Number of original files: %s', len(filePathsDict))
self.logger.debug('Looking for existing files in %s', storageDirectory)
ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath)
pluginFilePathsDict = copy.copy(filePathsDict)
self.logger.debug('There are %s files in %s', (len(storageFilePathsDict), storageDirectory))
# Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath)
......@@ -76,10 +82,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
continue
# No need to transfer file
del pluginFilePathsDict[filePath]
del filePathsDict[filePath]
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
return pluginFilePathsDict
self.logger.debug('Number of files that require processing: %s', len(filePathsDict))
return filePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
......
......@@ -14,14 +14,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR'
DRY_RUN_COMMAND = 'rsync -arvlP'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True,
remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True):
FileTransferPlugin.__init__(self, command, src, dest)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
......
......@@ -57,7 +57,6 @@ class SftpUtility:
if not replacementDirPath:
replacementDirPath = dirPath
attrs = self.sftpClient.listdir_attr(dirPath)
mode = attrs[0].st_mode
for attr in attrs:
fileName = attr.filename
mode = attr.st_mode
......
......@@ -89,6 +89,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
daqInfo['uploadId'] = uploadId
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
......@@ -96,10 +97,10 @@ class ExperimentSessionControllerImpl(DmObjectManager):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
fileDict = {}
#UploadTracker.getInstance().put(uploadId, uploadInfo)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['fileDict'] = fileDict
uploadInfo['nFiles'] = len(filePathsDict)
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment