From b2d6f5e6e5028a79782c75a2e86f6925c7c309e3 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Wed, 8 Jun 2016 14:08:12 +0000
Subject: [PATCH] latest round of fixes for the sftp utility garbage packet
 problem

---
 .../processing/fileProcessingManager.py       | 10 ++-
 .../plugins/gridftpFileTransferPlugin.py      |  1 +
 src/python/dm/common/utility/sftpUtility.py   | 78 +++++++++++++++----
 .../impl/ftpFileSystemObserverAgent.py        |  2 +-
 .../impl/pollingFileSystemObserverAgent.py    |  2 +-
 .../impl/sftpFileSystemObserverAgent.py       |  2 +-
 6 files changed, 72 insertions(+), 23 deletions(-)

diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py
index 76aebefb..97745477 100755
--- a/src/python/dm/common/processing/fileProcessingManager.py
+++ b/src/python/dm/common/processing/fileProcessingManager.py
@@ -54,9 +54,7 @@ class FileProcessingManager(threading.Thread,Singleton):
         self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
         statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
         if statUtility:
-            (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility)
-            self.logger.debug('Creating stat utility of class %s' % className)
-            statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
+            (statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility)
 
         # Create processors  
         for (key,value) in configItems:
@@ -67,7 +65,11 @@ class FileProcessingManager(threading.Thread,Singleton):
                 self.logger.debug('Configuring file processor %s' % fileProcessor)
                 fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
                 fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
-                fileProcessor.statUtility = statUtility
+                statUtilityObject = None
+                if statUtility:
+                    statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
+                    self.logger.debug('Using stat utility object %s' % str(statUtilityObject))
+                fileProcessor.statUtility = statUtilityObject
                 fileProcessor.configure()
                 self.fileProcessorDict[key] = fileProcessor
         self.fileProcessorKeyList = self.fileProcessorDict.keys()
diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
index 8c2d1062..acd76c7b 100755
--- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
@@ -57,6 +57,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
         self.logger.debug('Upload info: %s' % uploadInfo)
         # Original data directory may contain host/port
         (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
+        self.logger.debug('Replacement dir path: %s' % replacementDirPath)
         self.logger.debug('Number of original files: %s' % len(filePathsDict))
         self.logger.debug('Looking for existing files in %s' % storageDirectory)
         ftpUtility = SftpUtility(storageHost)
diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py
index 2b4ef868..233db3de 100755
--- a/src/python/dm/common/utility/sftpUtility.py
+++ b/src/python/dm/common/utility/sftpUtility.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+import threading
 import copy
 import stat
 import pysftp
@@ -18,6 +19,7 @@ class SftpUtility:
         self.password = password
         self.privateKey = privateKey
         self.sftpClient = None
+        self.lock = threading.RLock()
 
     @classmethod
     def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
@@ -53,11 +55,21 @@ class SftpUtility:
         return outputDict
         
     def getFiles(self, dirPath, fileDict={}, replacementDirPath=None):
-        if not self.sftpClient:
-            self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
-        if not replacementDirPath:
-            replacementDirPath = dirPath
-        attrs = self.sftpClient.listdir_attr(dirPath)
+        self.lock.acquire()
+        try:
+            if not self.sftpClient:
+                self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
+            if not replacementDirPath:
+                replacementDirPath = dirPath
+            try:
+                attrs = self.sftpClient.listdir_attr(dirPath)
+            except Exception, ex:
+                self.getLogger().error('Could not retrieve files from %s: %s' % (dirPath,ex))
+                self.closeConnection()
+                raise
+        finally:
+            self.lock.release()
+
         for attr in attrs:
             fileName = attr.filename
             mode = attr.st_mode
@@ -73,19 +85,51 @@ class SftpUtility:
         return fileDict
 
     def getMd5Sum(self, filePath, fileInfo={}):
-        if not self.sftpClient:
-            self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
-        md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0]
-        fileInfo['md5Sum'] = md5Sum
-        return md5Sum
+        self.lock.acquire()
+        try:
+            if not self.sftpClient:
+                self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
+            try:
+                md5Sum = self.sftpClient.execute('md5sum %s' % filePath)[0].split()[0]
+                fileInfo['md5Sum'] = md5Sum
+            except Exception, ex:
+                self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
+                self.closeConnection()
+                raise
+            return md5Sum
+        finally:
+            self.lock.release()
 
     def statFile(self, filePath, fileInfo={}):
-        if not self.sftpClient:
-            self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
-        attr = self.sftpClient.stat(filePath)
-        fileInfo['fileSize'] = attr.st_size 
-        fileInfo['fileModificationTime'] = attr.st_mtime
-        return fileInfo
+        self.lock.acquire()
+        try:
+            if not self.sftpClient:
+                self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
+            try:
+                attr = self.sftpClient.stat(filePath)
+                fileInfo['fileSize'] = attr.st_size 
+                fileInfo['fileModificationTime'] = attr.st_mtime
+            except Exception, ex:
+                self.getLogger().error('Could not get stat file %s: %s' % (filePath,ex))
+                self.closeConnection()
+                raise
+            return fileInfo
+        finally:
+            self.lock.release()
+
+    def closeConnection(self):
+        logger = self.getLogger()
+        self.lock.acquire()
+        try:
+            try:
+                if self.sftpClient:
+                    logger.warn('Closing SFTP connection to host %s' % self.host)
+                    self.sftpClient.close()
+            except Exception, ex:
+                logger.error('Could not close SFTP connection to host %s: %s' % (self.host, ex))
+            self.sftpClient = None
+        finally:
+            self.lock.release()
 
 #######################################################################
 # Testing.
@@ -96,4 +140,6 @@ if __name__ == '__main__':
     files = sftpUtility.getFiles('/export/dm/test')
     print files
     print sftpUtility.getMd5Sum('/export/dm/test/testfile01')
+    print 'Closing connection'
+    sftpUtility.closeConnection()
     print sftpUtility.statFile('/export/dm/test/testfile01')
diff --git a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py
index 5203ef92..b6f81cbe 100755
--- a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py
@@ -6,7 +6,7 @@ from dm.common.utility.ftpUtility import FtpUtility
 
 class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent):
 
-    DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
+    DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
 
     def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
         PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
diff --git a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
index 53c15630..9d8b2834 100755
--- a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py
@@ -6,7 +6,7 @@ from dm.common.utility.osUtility import OsUtility
 
 class PollingFileSystemObserverAgent(FileSystemObserverAgent):
 
-    DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
+    DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
     DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
 
     def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
diff --git a/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py
index 33b406d1..3a6a693e 100755
--- a/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/sftpFileSystemObserverAgent.py
@@ -6,7 +6,7 @@ from dm.common.utility.sftpUtility import SftpUtility
 
 class SftpFileSystemObserverAgent(PollingFileSystemObserverAgent):
 
-    DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
+    DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
     DEFAULT_PORT = 22
 
     def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
-- 
GitLab