diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 76aebefb7becf2c27cda684b870186b8326d460b..97745477db34bf59fd7f7b20174820e778450035 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -54,9 +54,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY) if statUtility: - (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility) - self.logger.debug('Creating stat utility of class %s' % className) - statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) + (statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility) # Create processors for (key,value) in configItems: @@ -67,7 +65,11 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) - fileProcessor.statUtility = statUtility + statUtilityObject = None + if statUtility: + statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor) + self.logger.debug('Using stat utility object %s' % str(statUtilityObject)) + fileProcessor.statUtility = statUtilityObject fileProcessor.configure() self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 8c2d1062cfe43f61cf5e3610d5a32450ff7a587c..acd76c7ba5df5ab4cf6911764fc1cd1dade63132 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -57,6 +57,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): self.logger.debug('Upload info: %s' % uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) + self.logger.debug('Replacement dir path: %s' % replacementDirPath) self.logger.debug('Number of original files: %s' % len(filePathsDict)) self.logger.debug('Looking for existing files in %s' % storageDirectory) ftpUtility = SftpUtility(storageHost) diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index 2b4ef86816126c0aec3bc27c180dd981ae6599ea..233db3dedbb3c20937b0c77a27ca0a29289c7b07 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import threading import copy import stat import pysftp @@ -18,6 +19,7 @@ class SftpUtility: self.password = password self.privateKey = privateKey self.sftpClient = None + self.lock = threading.RLock() @classmethod def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None): @@ -53,11 +55,21 @@ class SftpUtility: return outputDict def getFiles(self, dirPath, fileDict={}, replacementDirPath=None): - if not self.sftpClient: - self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) - if not replacementDirPath: - replacementDirPath = dirPath - attrs = self.sftpClient.listdir_attr(dirPath) + self.lock.acquire() + try: + if not self.sftpClient: + self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) + if not replacementDirPath: + replacementDirPath = dirPath + try: + attrs = self.sftpClient.listdir_attr(dirPath) + except Exception, ex: + self.getLogger().error('Could not retrieve files from %s: %s' % (dirPath,ex)) + self.closeConnection() + raise + finally: + self.lock.release() + for attr in attrs: fileName = attr.filename mode = attr.st_mode @@ -73,19 +85,51 @@ class SftpUtility: return fileDict def getMd5Sum(self, filePath, fileInfo={}): - if not self.sftpClient: - self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) - md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0] - fileInfo['md5Sum'] = md5Sum - return md5Sum + self.lock.acquire() + try: + if not self.sftpClient: + self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) + try: + md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0] + fileInfo['md5Sum'] = md5Sum + except Exception, ex: + self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex)) + self.closeConnection() + raise + return md5Sum + finally: + self.lock.release() def statFile(self, filePath, fileInfo={}): - if not self.sftpClient: - self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) - attr = self.sftpClient.stat(filePath) - fileInfo['fileSize'] = attr.st_size - fileInfo['fileModificationTime'] = attr.st_mtime - return fileInfo + self.lock.acquire() + try: + if not self.sftpClient: + self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) + try: + attr = self.sftpClient.stat(filePath) + fileInfo['fileSize'] = attr.st_size + fileInfo['fileModificationTime'] = attr.st_mtime + except Exception, ex: + self.getLogger().error('Could not get stat file %s: %s' % (filePath,ex)) + self.closeConnection() + raise + return fileInfo + finally: + self.lock.release() + + def closeConnection(self): + logger = self.getLogger() + self.lock.acquire() + try: + try: + if self.sftpClient: + logger.warn('Closing SFTP connection to host %s' % self.host) + self.sftpClient.close() + except Exception, ex: + logger.error('Could not close SFTP connection to host %s: %s' % (self.host, ex)) + self.sftpClient = None + finally: + self.lock.release() ####################################################################### # Testing. @@ -96,4 +140,6 @@ if __name__ == '__main__': files = sftpUtility.getFiles('/export/dm/test') print files print sftpUtility.getMd5Sum('/export/dm/test/testfile01') + print 'Closing connection' + sftpUtility.closeConnection() print sftpUtility.statFile('/export/dm/test/testfile01') diff --git a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py index 5203ef92633deb0dc94850071a1f5486c6a7d509..b6f81cbe301c784a273de28d22f6d523b6457ee9 100755 --- a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py +++ b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py @@ -6,7 +6,7 @@ from dm.common.utility.ftpUtility import FtpUtility class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent): - DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 + DEFAULT_POLLING_PERIOD_IN_SECONDS = 15 def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): PollingFileSystemObserverAgent.__init__(self, pollingPeriod) diff --git a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py index 53c15630de396158ec277d63b08eaa333fbe5ec2..9d8b2834355db05e368a0bd214a6451b2ae432fb 100755 --- a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py +++ b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py @@ -6,7 +6,7 @@ from dm.common.utility.osUtility import OsUtility class PollingFileSystemObserverAgent(FileSystemObserverAgent): - DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 + DEFAULT_POLLING_PERIOD_IN_SECONDS = 15 DEFAULT_RETRY_PERIOD_IN_SECONDS = 60 def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): diff --git a/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py index 33b406d1631365f2ffd03737bb71eaffab76e154..3a6a693e062120cd0519b73ac07deedb9b5a5afd 100755 --- a/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py +++ b/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py @@ -6,7 +6,7 @@ from dm.common.utility.sftpUtility import SftpUtility class SftpFileSystemObserverAgent(PollingFileSystemObserverAgent): - DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 + DEFAULT_POLLING_PERIOD_IN_SECONDS = 15 DEFAULT_PORT = 22 def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):