Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 826 additions and 0 deletions
#!/usr/bin/env python
import threading
import os.path
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from dm.common.exceptions.commandFailed import CommandFailed
from dm.common.exceptions.configurationError import ConfigurationError
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dmMongoClient import DmMongoClient
class MongoDbManager:
""" Singleton class for mongo db management. """
CONFIG_SECTION_NAME = 'MongoDbManager'
MONGO_DB_NAME_KEY = 'mongoDbName'
MONGO_DB_USER_KEY = 'mongoDbUser'
MONGO_DB_URI_KEY = 'mongoDbUri'
MONGO_DB_PASSWORD_FILE_KEY = 'mongoDbPasswordFile'
CONFIG_OPTION_NAME_LIST = [ 'dbName', 'dbUser', 'dbPasswordFile' ]
# Singleton.
__lock = threading.RLock()
__instance = None
@classmethod
def getInstance(cls):
from dm.common.mongodb.impl.mongoDbManager import MongoDbManager
try:
mgr = MongoDbManager()
except MongoDbManager, ex:
mgr = ex
return mgr
def __init__(self):
MongoDbManager.__lock.acquire()
try:
if MongoDbManager.__instance is not None:
raise MongoDbManager.__instance
MongoDbManager.__instance = self
self.lock = threading.RLock()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
cm = ConfigurationManager.getInstance()
self.dbName = cm.getConfigOption(MongoDbManager.CONFIG_SECTION_NAME, MongoDbManager.MONGO_DB_NAME_KEY)
self.logger.debug('Mongo DB name: %s' % self.dbName)
self.dbUri = cm.getConfigOption(MongoDbManager.CONFIG_SECTION_NAME, MongoDbManager.MONGO_DB_URI_KEY)
self.logger.debug('Mongo DB URI: %s' % self.dbUri)
self.dbUser = cm.getConfigOption(MongoDbManager.CONFIG_SECTION_NAME, MongoDbManager.MONGO_DB_USER_KEY)
self.logger.debug('Mongo DB user: %s' % self.dbUser)
self.dbPasswordFile = cm.getConfigOption(MongoDbManager.CONFIG_SECTION_NAME, MongoDbManager.MONGO_DB_PASSWORD_FILE_KEY)
self.logger.debug('Mongo DB password file: %s' % self.dbPasswordFile)
#dbPassword = open(dbPasswordFile, 'r').readline().strip()
finally:
MongoDbManager.__lock.release()
def getLogger(self):
return self.logger
def getDbClient(self):
return DmMongoClient(self.dbName, self.dbUri)
#######################################################################
# Testing.
if __name__ == '__main__':
ConfigurationManager.getInstance().setConsoleLogLevel('debug')
mgr = MongoDbManager.getInstance()
#!/usr/bin/env python
from dmObject import DmObject
class AuthorizationPrincipal(DmObject):
def __init__(self, dict={}, name=None, token=None, userInfo={}):
DmObject.__init__(self, dict)
if name is not None:
self['name'] = name
if token is not None:
self['token'] = token
if userInfo is not None and len(userInfo):
self['userInfo'] = userInfo
def getName(self):
return self.get('name')
def getAuthenticationToken(self):
return self.get('token')
def getToken(self):
return self.get('token')
def setRole(self, role):
self['role'] = role
def getRole(self):
return self.get('role')
def setUserInfo(self, userInfo):
self['userInfo'] = userInfo
def getUserInfo(self):
return self.get('userInfo')
#!/usr/bin/env python
import copy
from dmObject import DmObject
from dm.common.utility.dictUtility import DictUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.originalKeys = dict.keys()
self['fileDict'] = self.get('fileDict', {})
def updateStatus(self):
daqStatus = self.get('status', 'running')
if daqStatus == 'done':
return
fileDict = self.get('fileDict')
nFiles = len(fileDict)
nProcessedFiles = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
nProcessedFiles += 1
self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nFiles'] = '%s' % (nFiles)
# need to handle 'failed' uploads
if self.get('endTime'):
daqStatus = 'done'
self['status'] = daqStatus
def toDictWithOriginalKeys(self):
dict = copy.deepcopy(self.data)
for key in dict.keys():
if key not in self.originalKeys:
del dict[key]
return dict
def scrub(self):
# Remove redundant information
daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict')
fileDict = self.get('fileDict', {})
fileDict2 = {}
for (filePath,fileInfo) in fileDict.items():
fileInfo2 = {}
for key in ['processed', 'lastUpdateTime']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo[key]
fileDict2[filePath] = fileInfo2
daqInfo2['fileDict'] = fileDict2
return DaqInfo(daqInfo2)
#!/usr/bin/env python
from dmObject import DmObject
class DatasetMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'datasetName', 'experimentName' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
#
# DM Object class.
#
#######################################################################
import UserDict
import UserList
import types
import json
import datetime
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.utility import loggingManager
class DmObject(UserDict.UserDict):
""" Base dm object class. """
ALL_KEYS = '__all__'
DEFAULT_KEYS = '__default__'
DICT_DISPLAY_FORMAT = 'dict'
TEXT_DISPLAY_FORMAT = 'text'
JSON_DISPLAY_FORMAT = 'json'
DEFAULT_KEY_LIST = [ 'id', 'name' ]
def __init__(self, dict={}):
if isinstance(dict, types.DictType):
UserDict.UserDict.__init__(self, dict)
elif isinstance(dict, UserDict.UserDict):
UserDict.UserDict.__init__(self, dict.data)
else:
raise InvalidArgument('DmObject instance must be initialized using dictionary.')
self.logger = None
def getLogger(self):
if not self.logger:
self.logger = loggingManager.getLogger(self._class__.__name__)
return self.logger
def getRequiredKeyValue(self, key):
value = self.get(key)
if value is None:
errorMsg = 'Required dictionary key %s is missing.' % key
raise ObjectNotFound(errorMsg)
@classmethod
def getFromDict(cls, dict):
inst = cls()
for key in dict.keys():
inst[key] = dict[key]
return inst
def getRepKeyList(self, keyList):
if keyList is None:
return self.DEFAULT_KEY_LIST
elif type(keyList) == types.ListType:
if not len(keyList):
return self.DEFAULT_KEY_LIST
else:
return keyList
elif type(keyList) == types.StringType:
if keyList == DmObject.ALL_KEYS:
return self.data.keys()
elif keyList == DmObject.DEFAULT_KEYS:
return self.DEFAULT_KEY_LIST
else:
# Assume keys are separated by comma
return keyList.split(',')
else:
# Unknown key list parameter.
raise InvalidArgument('Key list parameter must be one of: None, string "%s", string "%s", string containing comma-separated keys, or list of strings.' (DmObject.ALL_KEYS, DmObject.DEFAULT_KEYS))
def getDictRep(self, keyList=None):
# Dict representation is dict
dictRep = {}
displayKeyList = self.getRepKeyList(keyList)
for key in displayKeyList:
value = self.get(key)
if isinstance(value, DmObject):
dictRep[key] = value.getDictRep('__all__')
elif type(value) == types.ListType:
itemList = []
for item in value:
if isinstance(item, DmObject):
itemList.append(item.getDictRep('__all__'))
else:
itemList.append(item)
dictRep[key] = itemList
else:
if value is not None:
if isinstance(value, datetime.datetime):
dictRep[key] = str(value)
else:
dictRep[key] = value
return dictRep
def getTextRep(self, keyList=None):
display = ''
displayKeyList = self.getRepKeyList(keyList)
for key in displayKeyList:
value = self.get(key)
if isinstance(value, DmObject):
display = display + '%s={ %s} ' % (key, value.getTextRep())
elif isinstance(value, types.ListType):
display = display + '%s=[ ' % key
for item in value:
if isinstance(item, DmObject):
display = display + '{ %s}, ' % (item)
else:
display = display + ' %s, ' % (item)
display = display + '] '
else:
if value is not None:
display = display + '%s=%s ' % (key, value)
return display
def getJsonRep(self, keyList=None):
dictRep = self.getDictRep(keyList)
return json.dumps(dictRep)
def getFullJsonRep(self):
dictRep = self.getDictRep(DmObject.ALL_KEYS)
return json.dumps(dictRep)
@classmethod
def fromJsonString(cls, jsonString):
return cls.getFromDict(json.loads(jsonString))
def getDisplayString(self, displayKeyList=[], displayFormat=TEXT_DISPLAY_FORMAT):
""" Get display string. """
if displayFormat == DmObject.DICT_DISPLAY_FORMAT:
return self.getDictRep(displayKeyList)
elif displayFormat == DmObject.TEXT_DISPLAY_FORMAT:
return self.getTextRep(displayKeyList)
elif displayFormat == DmObject.JSON_DISPLAY_FORMAT:
return self.getJsonRep(displayKeyList)
raise InvalidArgument('Unrecognized display displayFormat: %s.' (displayFormat))
#######################################################################
# Testing.
if __name__ == '__main__':
x = {'name' : 'XYZ', 'one':1, 'two':2 }
o = DmObject(x)
print 'DM Object: ', o
print 'Type of DM object: ', type(o)
print 'JSON Rep: ', o.getJsonRep()
print 'Type of JSON rep: ', type(o.getJsonRep())
j = '{"name" : "XYZ", "one":1, "two":2 }'
print 'String: ', j
x2 = DmObject.fromJsonString(j)
print 'DM Object 2: ', x2
print 'Type of DM object 2: ', type(x2)
print x2.getDisplayString(displayKeyList='__all__')
#!/usr/bin/env python
#
# Base object manager class.
#
#######################################################################
import threading
from dm.common.utility.loggingManager import LoggingManager
#######################################################################
class DmObjectManager:
""" Base object manager class. """
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.lock = threading.RLock()
def getLogger(self):
return self.logger
def acquireLock(self):
self.lock.acquire()
def releaseLock(self):
self.lock.release()
#!/usr/bin/env python
from dmObject import DmObject
class DmSession(DmObject):
def __init__(self, dict={}, sessionId=None, updateTime=None):
DmObject.__init__(self, dict)
if sessionId is not None:
self['sessionId'] = sessionId
if updateTime is not None:
self['updateTime '] = updateTime
def setSessionId(self, sessionId):
self['sessionId'] = sessionId
def getSessionId(self):
return self.get('sessionId')
def setUpdateTime(self, updateTime):
self['updateTime'] = updateTime
def getUpdateTime(self):
return self.get('updateTime')
#!/usr/bin/env python
import time
from dmObject import DmObject
class Experiment(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'dataDirectory', 'startDate', 'endDate', 'daqStartTime', 'daqEndTime' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description`', 'rootDataPath' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class FileMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'fileName', 'experimentName', 'locationList', 'filePath', 'fileSize', 'md5Sum', 'experimentFilePath' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
import os
from dmObject import DmObject
import urlparse
class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None):
DmObject.__init__(self, dict)
if filePath:
self['filePath'] = filePath
if dataDirectory:
self['dataDirectory'] = dataDirectory
if filePath:
parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if experiment:
self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time()
def getLastUpdateTime(self):
self.get('lastUpdateTime')
def getFilePath(self):
return self.get('filePath')
def getDataDirectory(self):
return self.get('dataDirectory')
####################################################################
# Testing
if __name__ == '__main__':
of = ObservedFile(filePath='tmp/xyz')
print of
of.setLastUpdateTimeToNow()
print of
#!/usr/bin/env python
from dmObject import DmObject
class RoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
from dm.common.utility.dictUtility import DictUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nFiles', 'percentageComplete', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self['fileDict'] = self.get('fileDict', {})
def updateStatus(self):
uploadStatus = self.get('status', 'running')
if uploadStatus == 'done':
return
fileDict = self.get('fileDict')
nFiles = len(fileDict)
nProcessedFiles = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
nProcessedFiles += 1
# need to handle 'failed' uploads
if nProcessedFiles == nFiles:
uploadStatus = 'done'
self['status'] = uploadStatus
self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nFiles'] = '%s' % (nFiles)
percentageComplete = 100.0
if nFiles > 0:
percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
def scrub(self):
# Remove redundant information
uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys='fileDict')
fileDict = self.get('fileDict', {})
fileDict2 = {}
for (filePath,fileInfo) in fileDict.items():
fileInfo2 = {}
for key in ['processed', 'lastUpdateTime']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo[key]
fileDict2[filePath] = fileInfo2
uploadInfo2['fileDict'] = fileDict2
return UploadInfo(uploadInfo2)
#!/usr/bin/env python
from dmObject import DmObject
class UserExperimentRole(DmObject):
DEFAULT_KEY_LIST = [ 'user_id', 'experiment_id', 'role_type_id' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'username', 'firstName', 'lastName', 'middleName', 'email', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserSystemRole(DmObject):
DEFAULT_KEY_LIST = [ 'user_id', 'role_type_id' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.processedFileDict = {}
self.unprocessedFileDict = {}
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
# Remove hidden files from dictionary of files to be processed
def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
for filePath in filePathsDict.keys():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processAllFiles')):
del uploadInfo['processAllFiles']
return filePathsDict
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
# Processor will return list of files it must process
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
return checkedFilePathsDict
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
def appendFileProcessor(self, fileProcessor):
key = fileProcessor.__class__.__name__
self.logger.debug('Adding file processor: %s' % key)
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList.append(key)
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()
#!/usr/bin/env python
import threading
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False
self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.processedFileDict = processedFileDict
self.unprocessedFileDict = unprocessedFileDict
self.logger = LoggingManager.getInstance().getLogger(name)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
self.fileProcessingManager.clearEvent()
if self.exitFlag:
self.logger.debug('Exit flag is set')
break
while True:
fileInfo = self.fileProcessingQueue.pop()
if fileInfo is None:
break
filePath = fileInfo.get('filePath')
try:
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.__class__.__name__
fileProcessedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = fileProcessedByDict
if fileProcessedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
continue
self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo))
try:
processor.processFile(fileInfo)
fileProcessedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
except Exception, ex:
self.logger.exception(ex)
self.logger.debug('%s processing failed for file at path %s ' % (processorName, filePath))
fileProcessingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = fileProcessingDict
processorDict = fileProcessingDict.get(processorName, {})
fileProcessingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo))
self.unprocessedFileDict[filePath] = fileInfo
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until
# this plugin is done
break
except Exception, ex:
self.logger.exception(ex)
self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
pass