Skip to content
Snippets Groups Projects
Commit b2d6f5e6 authored by sveseli's avatar sveseli
Browse files

latest round of fixes for the sftp utility garbage packet problem

parent 9cc45d98
No related branches found
No related tags found
No related merge requests found
......@@ -54,9 +54,7 @@ class FileProcessingManager(threading.Thread,Singleton):
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility)
self.logger.debug('Creating stat utility of class %s' % className)
statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
(statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility)
# Create processors
for (key,value) in configItems:
......@@ -67,7 +65,11 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.statUtility = statUtility
statUtilityObject = None
if statUtility:
statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
self.logger.debug('Using stat utility object %s' % str(statUtilityObject))
fileProcessor.statUtility = statUtilityObject
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
......
......@@ -57,6 +57,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Replacement dir path: %s' % replacementDirPath)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
self.logger.debug('Looking for existing files in %s' % storageDirectory)
ftpUtility = SftpUtility(storageHost)
......
#!/usr/bin/env python
import threading
import copy
import stat
import pysftp
......@@ -18,6 +19,7 @@ class SftpUtility:
self.password = password
self.privateKey = privateKey
self.sftpClient = None
self.lock = threading.RLock()
@classmethod
def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
......@@ -53,11 +55,21 @@ class SftpUtility:
return outputDict
def getFiles(self, dirPath, fileDict={}, replacementDirPath=None):
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
if not replacementDirPath:
replacementDirPath = dirPath
attrs = self.sftpClient.listdir_attr(dirPath)
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
if not replacementDirPath:
replacementDirPath = dirPath
try:
attrs = self.sftpClient.listdir_attr(dirPath)
except Exception, ex:
self.getLogger().error('Could not retrieve files from %s: %s' % (dirPath,ex))
self.closeConnection()
raise
finally:
self.lock.release()
for attr in attrs:
fileName = attr.filename
mode = attr.st_mode
......@@ -73,19 +85,51 @@ class SftpUtility:
return fileDict
def getMd5Sum(self, filePath, fileInfo={}):
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0]
fileInfo['md5Sum'] = md5Sum
return md5Sum
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0]
fileInfo['md5Sum'] = md5Sum
except Exception, ex:
self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return md5Sum
finally:
self.lock.release()
def statFile(self, filePath, fileInfo={}):
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
attr = self.sftpClient.stat(filePath)
fileInfo['fileSize'] = attr.st_size
fileInfo['fileModificationTime'] = attr.st_mtime
return fileInfo
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
attr = self.sftpClient.stat(filePath)
fileInfo['fileSize'] = attr.st_size
fileInfo['fileModificationTime'] = attr.st_mtime
except Exception, ex:
self.getLogger().error('Could not get stat file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return fileInfo
finally:
self.lock.release()
def closeConnection(self):
logger = self.getLogger()
self.lock.acquire()
try:
try:
if self.sftpClient:
logger.warn('Closing SFTP connection to host %s' % self.host)
self.sftpClient.close()
except Exception, ex:
logger.error('Could not close SFTP connection to host %s: %s' % (self.host, ex))
self.sftpClient = None
finally:
self.lock.release()
#######################################################################
# Testing.
......@@ -96,4 +140,6 @@ if __name__ == '__main__':
files = sftpUtility.getFiles('/export/dm/test')
print files
print sftpUtility.getMd5Sum('/export/dm/test/testfile01')
print 'Closing connection'
sftpUtility.closeConnection()
print sftpUtility.statFile('/export/dm/test/testfile01')
......@@ -6,7 +6,7 @@ from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
......
......@@ -6,7 +6,7 @@ from dm.common.utility.osUtility import OsUtility
class PollingFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
......
......@@ -6,7 +6,7 @@ from dm.common.utility.sftpUtility import SftpUtility
class SftpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_PORT = 22
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment