From 62d0324047daec2031921b61dbe572bf554c84d3 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Tue, 23 Jun 2015 20:16:37 +0000
Subject: [PATCH] move processing classes under common area

---
 src/python/dm/common/processing/__init__.py   |  0
 .../processing}/fileProcessingManager.py      | 18 +++-
 .../processing}/fileProcessingThread.py       | 24 ++++-
 .../dm/common/processing/plugins/__init__.py  |  0
 .../processing/plugins/fileProcessor.py       | 37 ++++++++
 .../processing/plugins/fileTransferPlugin.py  | 92 +++++++++++++++++++
 .../plugins/rsyncFileTransferPlugin.py        | 18 ++++
 .../service/impl/fileSystemObserver.py        |  2 +-
 8 files changed, 183 insertions(+), 8 deletions(-)
 create mode 100644 src/python/dm/common/processing/__init__.py
 rename src/python/dm/{daq_web_service/service/impl => common/processing}/fileProcessingManager.py (77%)
 rename src/python/dm/{daq_web_service/service/impl => common/processing}/fileProcessingThread.py (58%)
 create mode 100644 src/python/dm/common/processing/plugins/__init__.py
 create mode 100755 src/python/dm/common/processing/plugins/fileProcessor.py
 create mode 100755 src/python/dm/common/processing/plugins/fileTransferPlugin.py
 create mode 100755 src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py

diff --git a/src/python/dm/common/processing/__init__.py b/src/python/dm/common/processing/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py
similarity index 77%
rename from src/python/dm/daq_web_service/service/impl/fileProcessingManager.py
rename to src/python/dm/common/processing/fileProcessingManager.py
index 7e24dfa7..173b8e5c 100755
--- a/src/python/dm/daq_web_service/service/impl/fileProcessingManager.py
+++ b/src/python/dm/common/processing/fileProcessingManager.py
@@ -6,14 +6,16 @@ import time
 from dm.common.utility.loggingManager import LoggingManager
 from dm.common.utility.configurationManager import ConfigurationManager
 from dm.common.utility.objectUtility import ObjectUtility
-from dm.common.utility.threadSafeQueue import ThreadSafeQueue 
+from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
 from dm.common.utility.singleton import Singleton
-from dm.daq_web_service.service.impl.fileProcessingThread import FileProcessingThread
+from fileProcessingThread import FileProcessingThread
 
 class FileProcessingManager(threading.Thread,Singleton):
 
     CONFIG_SECTION_NAME = 'FileProcessingManager'
-    N_PROCESSING_THREADS_KEY = 'nprocessingthreads'
+    NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
+    DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
+    DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
     FILE_PROCESSOR_KEY = 'fileprocessor'
 
     # Singleton.
@@ -34,7 +36,7 @@ class FileProcessingManager(threading.Thread,Singleton):
             self.eventFlag = threading.Event()
             self.fileProcessorDict = {}
             self.fileProcessorKeyList = []
-            self.fileProcessingQueue = ThreadSafeQueue()
+            self.fileProcessingQueue = TimeBasedProcessingQueue()
             self.processedFileDict = {}
             self.unprocessedFileDict = {}
             self.__configure()
@@ -46,7 +48,9 @@ class FileProcessingManager(threading.Thread,Singleton):
         cm = ConfigurationManager.getInstance()
         configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
         self.logger.debug('Got config items: %s' % configItems)
-        self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.N_PROCESSING_THREADS_KEY))
+        self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
+        self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
+        self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
 
         # Create processors  
         for (key,value) in configItems:
@@ -54,6 +58,10 @@ class FileProcessingManager(threading.Thread,Singleton):
                 (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
                 self.logger.debug('Creating file processor instance of class %s' % className)
                 fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
+                self.logger.debug('Configring file processor %s' % fileProcessor)
+                fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
+                fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
+                fileProcessor.configure()
                 self.fileProcessorDict[key] = fileProcessor
         self.fileProcessorKeyList = self.fileProcessorDict.keys()
         self.fileProcessorKeyList.sort()
diff --git a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py
similarity index 58%
rename from src/python/dm/daq_web_service/service/impl/fileProcessingThread.py
rename to src/python/dm/common/processing/fileProcessingThread.py
index fda96165..0f548ac8 100755
--- a/src/python/dm/daq_web_service/service/impl/fileProcessingThread.py
+++ b/src/python/dm/common/processing/fileProcessingThread.py
@@ -33,19 +33,39 @@ class FileProcessingThread(threading.Thread):
                 filePath = file.getFilePath()
                 daqPath = file.getDaqPath()
                 experiment = file.getExperiment()
+                
                 try:
                     for processorKey in self.fileProcessorKeyList: 
                         processor = self.fileProcessorDict.get(processorKey)
                         processorName = processor.__class__.__name__
+                        fileProcessedByDict = file.get('processedByDict', {})
+                        if fileProcessedByDict.has_key(processorName):
+                            self.logger.debug('%s has already been processed by %s ' % (file, processorName))
+                            continue
+
                         self.logger.debug('%s is about to process file %s ' % (processorName, file))
                         try:
                             processor.processFile(filePath, daqPath, experiment)
+                            fileProcessedByDict[processorName] = True
+                            file['processedByDict'] = fileProcessedByDict
                             self.logger.debug('%s processed file %s ' % (processorName, file))
                         except Exception, ex:
                             self.logger.exception(ex)
                             self.logger.debug('%s processing failed for file %s ' % (processorName, file))
-                            file[processorName] = {'error' : ex}
-                            self.unprocessedFileDict[filePath] = file
+                            fileProcessingDict = file.get('processingDict', {})
+                            file['processingDict'] = fileProcessingDict
+                            processorDict = fileProcessingDict.get(processorName, {}) 
+                            processorDict['lastError'] = ex
+                            nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
+                            self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, file, nRetriesLeft))
+                            processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
+                            if nRetriesLeft <= 0:
+                                self.logger.debug('No more %s retries left for file %s' % (processorName, file))
+                                self.unprocessedFileDict[filePath] = file
+                            else:
+                                retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
+                                self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, file, retryWaitPeriod))
+                                self.fileProcessingQueue.push(file, retryWaitPeriod)
 
                 except Exception, ex:
                     self.logger.exception(ex)
diff --git a/src/python/dm/common/processing/plugins/__init__.py b/src/python/dm/common/processing/plugins/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py
new file mode 100755
index 00000000..63745e6f
--- /dev/null
+++ b/src/python/dm/common/processing/plugins/fileProcessor.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+import abc
+class FileProcessor:
+
+    DEFAULT_NUMBER_OF_RETRIES = 0
+    DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS = 60
+
+    def __init__(self):
+        self.configDict = {}
+
+    @abc.abstractmethod
+    def processFile(self, filePath, daqPath, experiment):
+        return NotImplemented
+        
+    def configure(self):
+        # Use this method for processor configuration
+        pass
+
+    def setConfigKeyValue(self, key, value):
+        self.configDict[key] = value
+
+    def getConfigKeyValue(self, key):
+        return self.configDict.get(key)
+
+    def setNumberOfRetries(self, nRetries):
+        self.configDict['numberOfRetries'] = nRetries
+
+    def getNumberOfRetries(self):
+        self.configDict.get('numberOfRetries', self.DEFAULT_NUMBER_OF_RETRIES)
+
+    def setRetryWaitPeriodInSeconds(self, waitPeriod):
+        self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
+
+    def getRetryWaitPeriodInSeconds(self):
+        self.configDict.get('retryWaitPeriodInSeconds', DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS)
+
diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
new file mode 100755
index 00000000..8aafaf52
--- /dev/null
+++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+
+import os
+from dm.common.utility.loggingManager import LoggingManager
+from dm.common.utility.dmSubprocess import DmSubprocess
+from dm.common.exceptions.invalidArgument import InvalidArgument
+from dm.common.exceptions.invalidRequest import InvalidRequest
+from fileProcessor import FileProcessor
+
+class FileTransferPlugin(FileProcessor):
+
+    def __init__(self, command, src=None, dest=None):
+        FileProcessor.__init__(self)
+        self.src = src
+        self.dest = dest
+        self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
+        if command is None or not len(command):
+            raise InvalidArgument('File transfer command must be non-empty string.')
+        self.command = command
+        self.subprocess = None
+
+    def processFile(self, filePath, daqPath, experiment):
+        storageHost = experiment.get('storageHost')
+        storageDirectory = experiment.get('storageDirectory')
+        dest = '%s:%s' % (storageHost, storageDirectory)
+        # Use relative path with respect to daq directory as a source
+        os.chdir(daqPath)
+        src = os.path.relpath(filePath, daqPath)
+        self.start(src, dest)
+
+    def getFullCommand(self, src, dest):
+        return '%s %s %s' % (self.command, src, dest)
+
+    def setSrc(self, src):
+        self.src = src
+
+    def setDest(self, dest):
+        self.dest = dest
+
+    def start(self, src=None, dest=None):
+        # Use preconfigured source if provided source is None
+        fileSrc = src
+        if src is None:
+            fileSrc = self.src
+        # Use provided destination only if preconfigured destination is None
+        # Plugins may have desired destination preconfigured for all files
+        fileDest = self.dest
+        if self.dest is None:
+            fileDest = dest
+
+        if not fileSrc or not fileDest:
+            raise InvalidRequest('Both source and destination must be non-empty strings.')
+        self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
+        return self.subprocess.run()
+
+    def wait(self):
+        if self.subprocess:
+            return self.subprocess.wait()
+        return None
+
+    def poll(self):
+        if self.subprocess:
+            return self.subprocess.poll()
+        return None
+
+    def getStdOut(self):
+        if self.subprocess:
+            return self.subprocess.getStdOut()
+        return None
+
+
+    def getStdErr(self):
+        if self.subprocess:
+            return self.subprocess.getStdErr()
+        return None
+
+    def getExitStatus(self):
+        if self.subprocess:
+            return self.subprocess.getExitStatus()
+        return None
+
+    def reset(self):
+        self.subprocess = None
+
+#######################################################################
+# Testing.
+if __name__ == '__main__':
+    ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
+    ft.start()
+    print 'StdOut: ', ft.getStdOut()
+    print 'StdErr: ', ft.getStdErr()
+    print 'Exit Status: ', ft.getExitStatus()
diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
new file mode 100755
index 00000000..3550d0cf
--- /dev/null
+++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+from fileTransferPlugin import FileTransferPlugin
+class RsyncFileTransferPlugin(FileTransferPlugin):
+
+    COMMAND = 'rsync -arvlPR'
+
+    def __init__(self, src=None, dest=None):
+        FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
+
+#######################################################################
+# Testing.
+if __name__ == '__main__':
+    ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
+    ft.start()
+    print 'StdOut: ', ft.getStdOut()
+    print 'StdErr: ', ft.getStdErr()
+    print 'Exit Status: ', ft.getExitStatus()
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index be8245f6..27dad6d0 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -10,9 +10,9 @@ from dm.common.utility.configurationManager import ConfigurationManager
 from dm.common.objects.observedFile import ObservedFile
 from dm.common.utility.singleton import Singleton
 from dm.common.utility.threadingUtility import ThreadingUtility
+from dm.common.processing.fileProcessingManager import FileProcessingManager
 
 from dmFileSystemEventHandler import DmFileSystemEventHandler
-from fileProcessingManager import FileProcessingManager
 
 class FileSystemObserver(threading.Thread,Singleton):
 
-- 
GitLab