From 22a58e0e72e718bfdf45f8670b857882001b7b0f Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 26 Jan 2016 03:58:17 +0000 Subject: [PATCH] modifications to eliminate processing of hidden files and of unchanged files that were processed already; checksum is now done by default --- .../processing/fileProcessingManager.py | 29 ++++++ .../processing/plugins/fileProcessor.py | 5 +- .../processing/plugins/fileTransferPlugin.py | 3 + .../plugins/gridftpFileTransferPlugin.py | 84 ++++++++++++++++- .../plugins/rsyncFileTransferPlugin.py | 89 +++++++++++++++++-- ...WithChecksumAndDeleteFileTransferPlugin.py | 4 +- 6 files changed, 202 insertions(+), 12 deletions(-) diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 23638d8c..74dc397d 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -2,10 +2,12 @@ import threading import time +import os from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.utility.objectUtility import ObjectUtility +from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue from dm.common.utility.singleton import Singleton from fileProcessingThread import FileProcessingThread @@ -67,6 +69,33 @@ class FileProcessingManager(threading.Thread,Singleton): self.fileProcessorKeyList.sort() self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) + # Remove hidden files from dictionary of files to be processed + def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo): + if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')): + del uploadInfo['processHiddenFiles'] + return filePathsDict + for filePath in filePathsDict.keys(): + fileName = os.path.basename(filePath) + if fileName.startswith('.'): + self.logger.debug('File path %s is hidden file, will not process it' % filePath) + del filePathsDict[filePath] + return filePathsDict + + # Each plugin calculates list of files that need to be processed + # Final result is union of all plugins + def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + if ValueUtility.toBoolean(uploadInfo.get('processAllFiles')): + del uploadInfo['processAllFiles'] + return filePathsDict + checkedFilePathsDict = {} + for processorKey in self.fileProcessorKeyList: + processor = self.fileProcessorDict.get(processorKey) + # Processor will return list of files it must process + pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo) + if len(pluginFilePathsDict): + checkedFilePathsDict.update(pluginFilePathsDict) + return checkedFilePathsDict + def processFile(self, fileInfo): self.fileProcessingQueue.push(fileInfo) self.eventFlag.set() diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index 823ddadb..c13fceda 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -12,7 +12,10 @@ class FileProcessor: @abc.abstractmethod def processFile(self, fileInfo): return NotImplemented - + + def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + return {} + def configure(self): # Use this method for processor configuration pass diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index b1e94f24..ea5e579a 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -19,6 +19,9 @@ class FileTransferPlugin(FileProcessor): self.command = command self.subprocess = None + def checkUploadFilesForProcessing(self, filePaths, uploadInfo): + pass + def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 594ebd66..89ebabc3 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -2,7 +2,11 @@ import os from fileTransferPlugin import FileTransferPlugin +from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility +from dm.common.exceptions.fileProcessingError import FileProcessingError +from dm.common.utility.dmSubprocess import DmSubprocess +from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class GridftpFileTransferPlugin(FileTransferPlugin): @@ -10,8 +14,12 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): FileTransferPlugin.__init__(self, command, src, dest) + self.dsFileApi = DsRestApiFactory.getFileRestApi() + self.localMd5Sum = localMd5Sum + self.remoteMd5Sum = remoteMd5Sum + self.deleteOriginal = deleteOriginal def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) @@ -30,6 +38,80 @@ class GridftpFileTransferPlugin(FileTransferPlugin): destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl + def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + storageHost = uploadInfo['storageHost'] + storageDirectory = uploadInfo['storageDirectory'] + dataDirectory = uploadInfo['dataDirectory'] + #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT) + ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT) + storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}) + pluginFilePathsDict = {} + filePaths = filePathsDict.keys() + for filePath in filePaths: + filePathDict = filePathsDict.get(filePath) + experimentFilePath = os.path.relpath(filePath, dataDirectory) + storageFilePath = os.path.join(storageDirectory, experimentFilePath) + storageFilePathDict = storageFilePathsDict.get(storageFilePath) + if not storageFilePathDict: + # remote directory does not have the file + pluginFilePathsDict[filePath] = filePathDict + else: + fSize = filePathDict.get('Size') + sfSize = storageFilePathDict.get('Size') + # check size + if not fSize or not sfSize or fSize != sfSize: + pluginFilePathsDict[filePath] = filePathDict + else: + # sizes are the same, check modify time + mTime = filePathDict.get('Modify') + sfTime = storageFilePathDict.get('Modify') + if not mTime or not sfTime or mTime > sfTime: + pluginFilePathsDict[filePath] = filePathDict + + self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) + return pluginFilePathsDict + + def processFile(self, fileInfo): + filePath = fileInfo.get('filePath') + dataDirectory = fileInfo.get('dataDirectory') + experimentFilePath = fileInfo.get('experimentFilePath') + experimentName = fileInfo.get('experimentName') + storageHost = fileInfo.get('storageHost') + storageDirectory = fileInfo.get('storageDirectory') + + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + srcUrl = self.getSrcUrl(filePath, dataDirectory) + + # Calculate checksum + if self.localMd5Sum: + FileUtility.statFile(filePath, fileInfo) + FileUtility.getMd5Sum(filePath, fileInfo) + + # Transfer file + self.logger.debug('Starting transfer: %s' % fileInfo) + self.start(srcUrl, destUrl) + + # Get remote checksum + if self.remoteMd5Sum: + fileInfo2 = {} + fileInfo2['experimentFilePath'] = experimentFilePath + fileInfo2['experimentName'] = experimentName + fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) + # If we have not done md5 locally, update file info + if not self.md5Sum: + fileInfo['md5Sum'] = fileMetadata.get('md5Sum') + + # Verify checksum + if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): + self.logger.error('Checksum mismatch for file: %s' % filePath) + raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) + self.logger.debug('Checksum test passed for file %s' % filePath) + + # Remove file + if self.deleteOriginal: + self.logger.debug('Deleting file %s' % filePath) + OsUtility.removeFile(srcUrl) + ####################################################################### # Testing. if __name__ == '__main__': diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 3550d0cf..f4d1b8a0 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -1,18 +1,91 @@ #!/usr/bin/env python +import os + from fileTransferPlugin import FileTransferPlugin +from dm.common.utility.osUtility import OsUtility +from dm.common.utility.fileUtility import FileUtility +from dm.common.exceptions.fileProcessingError import FileProcessingError +from dm.common.utility.dmSubprocess import DmSubprocess +from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory + class RsyncFileTransferPlugin(FileTransferPlugin): - COMMAND = 'rsync -arvlPR' + DEFAULT_COMMAND = 'rsync -arvlPR' + DRY_RUN_COMMAND = 'rsync -arvlP' + + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): + FileTransferPlugin.__init__(self, command, src, dest) + self.dsFileApi = DsRestApiFactory.getFileRestApi() + self.localMd5Sum = localMd5Sum + self.remoteMd5Sum = remoteMd5Sum + self.deleteOriginal = deleteOriginal + + def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + storageDirectory = uploadInfo['storageDirectory'] + storageHost = uploadInfo['storageHost'] + dataDirectory = uploadInfo['dataDirectory'] + dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory) + subprocess = DmSubprocess.getSubprocess(dryRunCommand) + subprocess.run() + lines = subprocess.getStdOut().split('\n') + pluginFilePathsDict = {} + pathBase = dataDirectory + filePaths = filePathsDict.keys() + for line in lines: + if line.endswith(os.sep): + continue + filePath = os.path.join(pathBase, line) + if filePath in filePaths: + pluginFilePathsDict[filePath] = filePathsDict.get(filePath) + self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) + return pluginFilePathsDict + + def processFile(self, fileInfo): + filePath = fileInfo.get('filePath') + dataDirectory = fileInfo.get('dataDirectory') + experimentFilePath = fileInfo.get('experimentFilePath') + experimentName = fileInfo.get('experimentName') + storageHost = fileInfo.get('storageHost') + storageDirectory = fileInfo.get('storageDirectory') + destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) + # Use relative path with respect to data directory as a source + os.chdir(dataDirectory) + srcUrl = self.getSrcUrl(filePath, dataDirectory) - def __init__(self, src=None, dest=None): - FileTransferPlugin.__init__(self, self.COMMAND, src, dest) + # Calculate checksum + if self.localMd5Sum: + FileUtility.statFile(filePath, fileInfo) + FileUtility.getMd5Sum(filePath, fileInfo) + + # Transfer file + self.logger.debug('Starting transfer: %s' % fileInfo) + self.start(srcUrl, destUrl) + + # Get remote checksum + if self.remoteMd5Sum: + fileInfo2 = {} + fileInfo2['experimentFilePath'] = experimentFilePath + fileInfo2['experimentName'] = experimentName + fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) + # If we have not done md5 locally, update file info + if not self.md5Sum: + fileInfo['md5Sum'] = fileMetadata.get('md5Sum') + + # Verify checksum + if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): + self.logger.error('Checksum mismatch for file: %s' % filePath) + raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) + self.logger.debug('Checksum test passed for file %s' % filePath) + + # Remove file + if self.deleteOriginal: + self.logger.debug('Deleting file %s' % filePath) + OsUtility.removeFile(srcUrl) ####################################################################### # Testing. if __name__ == '__main__': - ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') - ft.start() - print 'StdOut: ', ft.getStdOut() - print 'StdErr: ', ft.getStdErr() - print 'Exit Status: ', ft.getExitStatus() + ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"') + ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'}) + diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py index 87a5125e..f821f5e7 100755 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -20,8 +20,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') - experimentFilePath = fileInfo.get('experimentFilePath') - experimentName = fileInfo.get('experimentName') + experimentFilePath = fileInfo.get('experimentFilePath') + experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) -- GitLab