Skip to content
Snippets Groups Projects
Commit ac5689a7 authored by sveseli's avatar sveseli
Browse files

refactored apis; added ability to skip optional plugins on error

parent b84356e3
No related branches found
No related tags found
No related merge requests found
Showing
with 140 additions and 50 deletions
- make
- autoconf
- expect
- gcc
- g++
- subversion
- zlib-devel
- openssl-devel
- libffi-devel
- openldap-devel
- readline-devel
- ncurses-devel
- qt-x11
- qt-postgresql
- qt-devel
- gtk2-devel
RHEL7 Packages
===============
make
autoconf
expect
gcc
g++
subversion
zlib-devel
openssl-devel
libffi-devel
openldap-devel
readline-devel
ncurses-devel
qt-x11
qt-postgresql
qt-devel
gtk2-devel
Globus Packages
===============
globus-openssl-module
globus-ftp-client
globus-gsi-proxy
globus-gsi-openssl
globus-ftp-control
globus-gass-copy
globus-common-16.8
globus-gass-transfer
globus-io-11.8
globus-gss-assist
globus-gsi-cert
globus-xio-popen
globus-xio-gsi
globus-gsi-credential
globus-callout-3.15
globus-gsi-sysconfig
globus-gsi-callback
globus-xio-5.14
globus-gssapi-gsi
globus-gass-copy
globus-gsi-proxy
globus-gssapi-error
......@@ -54,7 +54,7 @@ Description:
try:
if self.options.useDmRestApi:
self.logger.debug('Using DM REST API')
dmUserApi = DsRestApiFactory.getUserRestApi()
dmUserApi = DsRestApiFactory.getUserDsApi()
else:
self.logger.debug('Using DM DB API')
dmUserApi = UserDbApi()
......
......@@ -36,10 +36,10 @@ class CatRestApiFactory:
return (cls.__username, cls.__password, cls.__host, cls.__port, cls.__protocol)
@classmethod
def getFileRestApi(cls):
from userRestApi import FileRestApi
def getFileCatApi(cls):
from userCatApi import FileCatApi
(username, password, host, port, protocol) = cls.__getConfiguration()
api = FileRestApi(username, password, host, port, protocol)
api = FileCatApi(username, password, host, port, protocol)
return api
####################################################################
......
......@@ -11,7 +11,7 @@ from dm.common.objects.datasetMetadata import DatasetMetadata
from dm.common.objects.fileMetadata import FileMetadata
from catRestApi import CatRestApi
class DatasetRestApi(CatRestApi):
class DatasetCatApi(CatRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
CatRestApi.__init__(self, username, password, host, port, protocol)
......@@ -100,7 +100,7 @@ class DatasetRestApi(CatRestApi):
# Testing.
if __name__ == '__main__':
api = DatasetRestApi()
api = DatasetCatApi()
print api.getDatasets()
print api.getDatasetById('556de0059e058b0ef4c4413b')
print api.getDatasetByName('xyz-001')
......
......@@ -10,7 +10,7 @@ from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.fileMetadata import FileMetadata
from catRestApi import CatRestApi
class FileRestApi(CatRestApi):
class FileCatApi(CatRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
CatRestApi.__init__(self, username, password, host, port, protocol)
......@@ -87,7 +87,7 @@ class FileRestApi(CatRestApi):
# Testing.
if __name__ == '__main__':
api = FileRestApi()
api = FileCatApi()
import time
t = long(time.time())
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -32,7 +32,7 @@ Description:
pairs are interpreted as dataset metadata.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
datasetInfo = self.splitArgsIntoDict()
datasetInfo['experimentName'] = self.getExperimentName()
datasetInfo['datasetName'] = self.getDatasetName()
......
#!/usr/bin/env python
from dm.cat_web_service.api.fileRestApi import FileRestApi
from dm.cat_web_service.api.fileCatApi import FileCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -32,7 +32,7 @@ Description:
are interpreted as file metadata.
""")
self.checkArgs()
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = FileCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileInfo = self.splitArgsIntoDict()
fileInfo['experimentName'] = self.getExperimentName()
fileInfo['experimentFilePath'] = self.getExperimentFilePath()
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -23,7 +23,7 @@ Description:
returned.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
queryDict = self.splitArgsIntoDict()
datasetMetadataList = api.getDatasets(queryDict)
for datasetMetadata in datasetMetadataList:
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -30,7 +30,7 @@ Description:
Retrieve experiment dataset metadata from the catalog.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
datasetMetadata = api.getExperimentDataset(self.getExperimentName(), self.getDatasetName())
print datasetMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -31,7 +31,7 @@ Description:
Retrieve experiment dataset files from the catalog.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileMetadataList = api.getExperimentDatasetFiles(self.getExperimentName(), self.getDatasetName())
for fileMetadata in fileMetadataList:
print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -28,7 +28,7 @@ Description:
will be returned.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
queryDict = self.splitArgsIntoDict()
datasetMetadataList = api.getExperimentDatasets(self.getExperimentName(), queryDict)
for datasetMetadata in datasetMetadataList:
......
#!/usr/bin/env python
from dm.cat_web_service.api.fileRestApi import FileRestApi
from dm.cat_web_service.api.fileCatApi import FileCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -30,7 +30,7 @@ Description:
Retrieve experiment file metadata from the catalog.
""")
self.checkArgs()
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = FileCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileMetadata = api.getExperimentFile(self.getExperimentName(), self.getExperimentFilePath())
print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
......
#!/usr/bin/env python
from dm.cat_web_service.api.fileRestApi import FileRestApi
from dm.cat_web_service.api.fileCatApi import FileCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -27,7 +27,7 @@ Description:
metadata key/values are requested, all experiment files will be returned.
""")
self.checkArgs()
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = FileCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
queryDict = self.splitArgsIntoDict()
fileMetadataList = api.getExperimentFiles(self.getExperimentName(), queryDict)
for fileMetadata in fileMetadataList:
......
#!/usr/bin/env python
from dm.cat_web_service.api.datasetRestApi import DatasetRestApi
from dm.cat_web_service.api.datasetCatApi import DatasetCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -35,7 +35,7 @@ Description:
new keys.
""")
self.checkArgs()
api = DatasetRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = DatasetCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
datasetInfo = self.splitArgsIntoDict()
datasetInfo['experimentName'] = self.getExperimentName()
datasetInfo['datasetName'] = self.getDatasetName()
......
#!/usr/bin/env python
from dm.cat_web_service.api.fileRestApi import FileRestApi
from dm.cat_web_service.api.fileCatApi import FileCatApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from catWebServiceSessionCli import CatWebServiceSessionCli
......@@ -34,7 +34,7 @@ Description:
new keys.
""")
self.checkArgs()
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
api = FileCatApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileInfo = self.splitArgsIntoDict()
fileInfo['experimentName'] = self.getExperimentName()
fileInfo['experimentFilePath'] = self.getExperimentFilePath()
......
......@@ -51,6 +51,26 @@ class DaqInfo(DmObject):
finally:
self.lock.release()
def fileProcessingSkipped(self, processorName, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
pluginStatsDict = self.get('pluginStats', {})
self['pluginStats'] = pluginStatsDict
pluginStats = pluginStatsDict.get(processorName, {})
pluginStatsDict[processorName] = pluginStats
pluginStats['nSkippedFiles'] = pluginStats.get('nSkippedFiles', 0) + 1
if processingError:
processingErrors = pluginStats.get('processingErrors', {})
processingErrors[filePath] = processingError
pluginStats['processingErrors'] = processingErrors
lastFileSkippedTime = pluginStats.get('lastFileSkippedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileSkippedTime:
pluginStats['lastFileSkippedTime'] = processingEndTime
finally:
self.lock.release()
def updateStatus(self):
now = time.time()
daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
......
......@@ -40,6 +40,26 @@ class UploadInfo(DmObject):
finally:
self.lock.release()
def fileProcessingSkipped(self, processorName, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
pluginStatsDict = self.get('pluginStats', {})
self['pluginStats'] = pluginStatsDict
pluginStats = pluginStatsDict.get(processorName, {})
pluginStatsDict[processorName] = pluginStats
pluginStats['nSkippedFiles'] = pluginStats.get('nSkippedFiles', 0) + 1
if processingError:
processingErrors = pluginStats.get('processingErrors', {})
processingErrors[filePath] = processingError
pluginStats['processingErrors'] = processingErrors
lastFileSkippedTime = pluginStats.get('lastFileSkippedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileSkippedTime:
pluginStats['lastFileSkippedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingCancelled(self, filePath, processingEndTime):
self.lock.acquire()
try:
......
......@@ -63,8 +63,8 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.setNumberOfRetriesIfNotSet(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSecondsIfNotSet(self.defaultRetryWaitPeriodInSeconds)
statUtilityObject = None
if statUtility:
statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
......
......@@ -89,10 +89,16 @@ class FileProcessingThread(threading.Thread):
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
endProcessingTime = time.time()
if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
if not processor.getSkipOnFailure():
if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
else:
if statusMonitor:
statusMonitor.fileProcessingSkipped(processorName, filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, skipping it' % (processorName, filePath))
return
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
......
......@@ -38,15 +38,29 @@ class FileProcessor:
def getConfigKeyValue(self, key):
return self.configDict.get(key)
def setNumberOfRetriesIfNotSet(self, nRetries):
if not self.configDict.has_key('numberOfRetries'):
self.configDict['numberOfRetries'] = nRetries
def setNumberOfRetries(self, nRetries):
self.configDict['numberOfRetries'] = nRetries
def getNumberOfRetries(self):
return self.configDict.get('numberOfRetries', self.DEFAULT_NUMBER_OF_RETRIES)
def setRetryWaitPeriodInSecondsIfNotSet(self, waitPeriod):
if not self.configDict.has_key('retryWaitPeriodInSeconds'):
self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
def setRetryWaitPeriodInSeconds(self, waitPeriod):
self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
def getRetryWaitPeriodInSeconds(self):
return self.configDict.get('retryWaitPeriodInSeconds', self.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS)
def setSkipOnFailure(self, skipOnFailure):
self.configDict['skipOnFailure'] = skipOnFailure
def getSkipOnFailure(self, skipOnFailure):
return self.configDict.get('skipOnFailure', False)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment