diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 8a5ea3db991ec5d050d90535ff3323530677b28a..e320d673ed89dff946133fd30b772d7fe6f65708 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -74,11 +74,15 @@ class FileProcessingManager(threading.Thread,Singleton): if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')): del uploadInfo['processHiddenFiles'] return filePathsDict - for filePath in filePathsDict.keys(): + self.logger.debug('Checking for hidden files') + nRemoved = 0 + for (filePath,filePathDict) in filePathsDict.items(): fileName = os.path.basename(filePath) if fileName.startswith('.'): self.logger.debug('File path %s is hidden file, will not process it' % filePath) del filePathsDict[filePath] + nRemoved += 1 + self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict))) return filePathsDict # Each plugin calculates list of files that need to be processed @@ -87,6 +91,7 @@ class FileProcessingManager(threading.Thread,Singleton): if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')): del uploadInfo['reprocessFiles'] return filePathsDict + self.logger.debug('Checking files with processor plugins') checkedFilePathsDict = {} for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) @@ -94,6 +99,7 @@ class FileProcessingManager(threading.Thread,Singleton): pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if len(pluginFilePathsDict): checkedFilePathsDict.update(pluginFilePathsDict) + self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict))) return checkedFilePathsDict def processFile(self, fileInfo): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 7993a59bd67d9e28df391bddc48358d8a15a9ba2..0e69071992b09e78fc5322b51dbf23941a9f12ba 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -16,12 +16,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, + remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) @@ -44,15 +46,19 @@ class GridftpFileTransferPlugin(FileTransferPlugin): return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + if not self.pluginMustProcessFiles: + return {} storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] self.logger.debug('Upload info: %s', uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) + self.logger.debug('Number of original files: %s', len(filePathsDict)) + self.logger.debug('Looking for existing files in %s', storageDirectory) ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) - pluginFilePathsDict = copy.copy(filePathsDict) + self.logger.debug('There are %s files in %s', (len(storageFilePathsDict), storageDirectory)) # Remove file from plugin dict if we do not need to transfer it for (filePath,storageFilePathDict) in storageFilePathsDict.items(): filePathDict = filePathsDict.get(filePath) @@ -76,10 +82,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin): continue # No need to transfer file - del pluginFilePathsDict[filePath] + del filePathsDict[filePath] - self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) - return pluginFilePathsDict + self.logger.debug('Number of files that require processing: %s', len(filePathsDict)) + return filePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 659e9849db79902eebf873ddb07f69528b237e78..9cf0837f92f1cb793298db102a285971da071517 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -14,14 +14,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR' DRY_RUN_COMMAND = 'rsync -arvlP' - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, + remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.pluginMustProcessFiles = pluginMustProcessFiles def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + if not self.pluginMustProcessFiles: + return {} storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index 4e69f676f72455ea339f6d79e0ead5a91caa6b0e..9662a35b3c76d78c81007e2d95bce9ebf75af563 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -57,7 +57,6 @@ class SftpUtility: if not replacementDirPath: replacementDirPath = dirPath attrs = self.sftpClient.listdir_attr(dirPath) - mode = attrs[0].st_mode for attr in attrs: fileName = attr.filename mode = attr.st_mode diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 0bf7ff67e5a4eb314aecdd08f2d3f936cc4b540b..0f6b285fd5a2a22b15e7b4b86e9ba48425108284 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -89,6 +89,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo['uploadId'] = uploadId # Remove hidden files + self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) @@ -96,10 +97,10 @@ class ExperimentSessionControllerImpl(DmObjectManager): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) fileDict = {} - #UploadTracker.getInstance().put(uploadId, uploadInfo) UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['fileDict'] = fileDict uploadInfo['nFiles'] = len(filePathsDict) + self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo