Forked from
DM / dm-docs
261 commits behind, 659 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
fileProcessingManager.py 7.98 KiB
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
# Assign processor names
processorNumber = 0
for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey)
processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
processor.name = processorName
# Corect processor dependenciens
for processorKey in self.fileProcessorKeyList:
self.logger.debug('Determining dependencies for processor %s' % (processorKey))
processor = self.fileProcessorDict.get(processorKey)
dependsOn = []
for depProcessorKey in processor.dependsOn:
depProcessor = self.fileProcessorDict.get(depProcessorKey.lower())
if depProcessor:
dependsOn.append(depProcessor.name)
processor.dependsOn = dependsOn
self.logger.debug('Processor %s depends on: %s' % (processor.name, processor.dependsOn))
# Remove hidden files from dictionary of files to be processed
def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
self.logger.debug('Checking for hidden files')
nRemoved = 0
for (filePath,filePathDict) in filePathsDict.items():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
nRemoved += 1
self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')):
del uploadInfo['reprocessFiles']
return filePathsDict
self.logger.debug('Checking files with processor plugins')
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
# Processor will return list of files it must process
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict)))
return checkedFilePathsDict
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
def appendFileProcessor(self, fileProcessor):
key = fileProcessor.__class__.__name__
self.logger.debug('Adding file processor: %s' % key)
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList.append(key)
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()