From e370f2d44cce0b186e29d2f7dd7a650762702c6f Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Wed, 24 Feb 2016 16:47:44 +0000
Subject: [PATCH] fixes for upload status updates

---
 .../dm/common/objects/directoryUploadInfo.py  |  2 +-
 .../common/processing/fileProcessingThread.py | 11 +++++--
 .../dm/daq_web_service/cli/uploadCli.py       | 11 +++++++
 .../impl/experimentSessionControllerImpl.py   | 29 +++++++++++++++++--
 .../service/impl/uploadTracker.py             |  1 +
 5 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/src/python/dm/common/objects/directoryUploadInfo.py b/src/python/dm/common/objects/directoryUploadInfo.py
index b39e551d..6c8f16a8 100755
--- a/src/python/dm/common/objects/directoryUploadInfo.py
+++ b/src/python/dm/common/objects/directoryUploadInfo.py
@@ -33,7 +33,7 @@ class DirectoryUploadInfo(DmObject):
             if status in ['aborted', 'failed']:
                 uploadStatus = status
 
-            if not processingEndTime:
+            if not processingEndTime and status != 'skipped':
                 endTime = None
                 break
 
diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py
index 9c58a16a..76bcdd79 100755
--- a/src/python/dm/common/processing/fileProcessingThread.py
+++ b/src/python/dm/common/processing/fileProcessingThread.py
@@ -36,19 +36,24 @@ class FileProcessingThread(threading.Thread):
                 endProcessingTime = time.time() 
                 statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) 
                 return 
-            self.logger.debug('Starting processing file %s' % filePath)
-            fileInfo['startProcessingTime'] = time.time() 
+            self.logger.debug('Starting to process file %s' % filePath)
+            startProcessingTime = fileInfo.get('startProcessingTime', time.time())
+            fileInfo['startProcessingTime'] = startProcessingTime
             nProcessors = len(self.fileProcessorKeyList)
             processedByDict = fileInfo.get('processedByDict', {})
             fileInfo['processedByDict'] = processedByDict
+            skipPlugins = fileInfo.get('skipPlugins', [])
             processorNumber = 0
             for processorKey in self.fileProcessorKeyList: 
                 processorNumber += 1
                 processor = self.fileProcessorDict.get(processorKey)
                 processorName = processor.name
+                if processorName in skipPlugins:
+                    self.logger.debug('%s will be skipped by %s ' % (filePath, processorName))
+                    continue
 
                 if processedByDict.has_key(processorName):
-                    self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
+                    self.logger.debug('%s has already been processed by %s ' % (filePath, processorName))
                     continue
 
                 self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
diff --git a/src/python/dm/daq_web_service/cli/uploadCli.py b/src/python/dm/daq_web_service/cli/uploadCli.py
index dae08440..f578309d 100755
--- a/src/python/dm/daq_web_service/cli/uploadCli.py
+++ b/src/python/dm/daq_web_service/cli/uploadCli.py
@@ -3,6 +3,7 @@
 from daqWebServiceSessionCli import DaqWebServiceSessionCli
 from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
 from dm.common.exceptions.invalidRequest import InvalidRequest
+from dm.common.constants import dmProcessingMode
 
 class UploadCli(DaqWebServiceSessionCli):
     def __init__(self):
@@ -11,24 +12,34 @@ class UploadCli(DaqWebServiceSessionCli):
         self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
         self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
         self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
+        self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
+        self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process the given directory.')
 
     def checkArgs(self):
         if self.options.experimentName is None:
             raise InvalidRequest('Experiment name must be provided.')
         if self.options.dataDirectory is None:
             raise InvalidRequest('Experiment data directory must be provided.')
+        if self.options.processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
+            raise InvalidRequest('Processing mode must be one of %s.' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
+
 
     def updateDaqInfoFromOptions(self, daqInfo):
         if self.options.reprocess:
             daqInfo['reprocessFiles'] = True
         if self.options.processHidden:
             daqInfo['processHiddenFiles'] = True
+        if self.options.skipPlugins:
+            daqInfo['skipPlugins'] = self.options.skipPlugins
+        daqInfo['processingMode'] = self.options.processingMode
         
     def runCommand(self):
         self.parseArgs(usage="""
     dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
         [--reprocess]
         [--process-hidden]
+        [--processing-mode=PROCESSINGMODE]
+        [--skip-plugins=SKIPPLUGINS]
         [key1:value1, key2:value2, ...]
 
 Description:
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index c05d1f5d..edeeaaa5 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -94,6 +94,24 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         daqInfo['dataDirectory'] = dataDirectory
         daqInfo['uploadId'] = uploadId
 
+        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
+        if len(skipPlugins):
+            skipPlugins = skipPlugins.split(',')
+            uploadInfo['skipPlugins'] = skipPlugins
+        else:
+            skipPlugins = []
+
+        # Check that there is at least one processor that can process files
+        processorList = []
+        for processorKey in fileProcessingManager.fileProcessorKeyList:
+            processor = fileProcessingManager.fileProcessorDict.get(processorKey)
+            processorName = processor.name
+            if processorName not in skipPlugins:
+                processorList.append(processor)
+
+        if not len(processorList):
+            raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
+
         # Remove hidden files
         self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
         filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
@@ -123,6 +141,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
             fileInfo['daqInfo'] = daqInfo
             fileInfo['uploadId'] = uploadId
             fileInfo['statusMonitor'] = uploadInfo
+            fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
             try: 
                 if uploadInfo.get('status') != 'aborting':
                     fileProcessingManager.processFile(fileInfo)
@@ -180,15 +199,19 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         daqInfo['dataDirectory'] = dataDirectory
         daqInfo['uploadId'] = uploadId
 
-        skipProcessing = DictUtility.getAndRemoveKey(daqInfo, 'skipProcessing', '')         
-        skipProcessingList = skipProcessing.split(',')
+        skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '') 
+        if len(skipPlugins):
+            skipPlugins = skipPlugins.split(',')
+            uploadInfo['skipPlugins'] = skipPlugins
+        else:
+            skipPlugins = []
         fileProcessingManager = FileProcessingManager.getInstance()
         processingInfo = {}
         uploadInfo['processingInfo'] = processingInfo
         for processorKey in fileProcessingManager.fileProcessorKeyList:
             processor = fileProcessingManager.fileProcessorDict.get(processorKey)
             processorName = processor.name
-            if processorName in skipProcessingList:
+            if processorName in skipPlugins:
                 processingInfo[processorName] = {'status' : 'skipped'}
             else:
                 self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py
index 7c751eac..8639f59d 100755
--- a/src/python/dm/daq_web_service/service/impl/uploadTracker.py
+++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py
@@ -23,6 +23,7 @@ class UploadTracker(ObjectTracker):
         if uploadId:
             uploadInfo = self.get(uploadId)
             if uploadInfo is not None:
+                uploadInfo.updateStatus()
                 if uploadInfo.get('status') == 'running':
                     raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir))
                 del self.activeUploadDict[activeUploadKey]
-- 
GitLab