Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 632 additions and 0 deletions
#!/usr/bin/env python
from dmObject import DmObject
class ExperimentRoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentStation(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class FileMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'fileName', 'experimentName', 'locationList', 'filePath', 'fileSize', 'md5Sum', 'experimentFilePath' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class LdapUserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'userDn', 'userAttrs' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
def getUserPassword(self):
return self['userAttrs']['userPassword'][0]
#!/usr/bin/env python
import time
import os
from dmObject import DmObject
import urlparse
class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, destDirectory=None):
DmObject.__init__(self, dict)
if filePath:
self['filePath'] = filePath
if dataDirectory:
self['dataDirectory'] = dataDirectory
if filePath:
parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if destDirectory:
self['destDirectory'] = destDirectory
self['experimentFilePath'] = '%s/%s' % (destDirectory, self['experimentFilePath'])
if experiment:
self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time()
def getLastUpdateTime(self):
self.get('lastUpdateTime')
def getFilePath(self):
return self.get('filePath')
def getDataDirectory(self):
return self.get('dataDirectory')
####################################################################
# Testing
if __name__ == '__main__':
of = ObservedFile(filePath='tmp/xyz')
print of
of.setLastUpdateTimeToNow()
print of
#!/usr/bin/env python
from dmObject import DmObject
class PluginInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'dependsOn' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.threadingUtility import ThreadingUtility
from dmObject import DmObject
class ProcessingJob(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.childProcessEvent = None
@ThreadingUtility.synchronize
def childProcessQueued(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
childProcess['submitTime'] = time.time()
stageId = childProcess.get('stageId')
childProcessNumber = childProcess.get('childProcessNumber', 0)
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1
workflowStage['childProcesses'][childProcessNumber] = childProcess
@ThreadingUtility.synchronize
def childProcessStarted(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
childProcess['startTime'] = time.time()
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1
@ThreadingUtility.synchronize
def childProcessCompleted(self, childProcess, processInfo):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
@ThreadingUtility.synchronize
def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None):
self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
if errorMessage:
childProcess['errorMessage'] = errorMessage
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
#!/usr/bin/env python
from dmObject import DmObject
class ProposalInfo(DmObject):
DEFAULT_KEY_LIST = [ 'title', 'id', 'experimenters' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class RoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class RunInfo(DmObject):
DEFAULT_KEY_LIST = [ 'name', 'startTime', 'endTime' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
from dmObject import DmObject
class Storage(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description', 'defaultScheme' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class SystemRoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingSkipped(self, processorName, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
pluginStatsDict = self.get('pluginStats', {})
self['pluginStats'] = pluginStatsDict
pluginStats = pluginStatsDict.get(processorName, {})
pluginStatsDict[processorName] = pluginStats
pluginStats['nSkippedFiles'] = pluginStats.get('nSkippedFiles', 0) + 1
if processingError:
processingErrors = pluginStats.get('processingErrors', {})
processingErrors[filePath] = processingError
pluginStats['processingErrors'] = processingErrors
lastFileSkippedTime = pluginStats.get('lastFileSkippedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileSkippedTime:
pluginStats['lastFileSkippedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingCancelled(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + 1
lastFileProcessingCancelledTime = self.get('lastFileProcessingCancelledTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingCancelledTime:
self['lastFileProcessingCancelledTime'] = processingEndTime
finally:
self.lock.release()
def uploadAborted(self, nCancelledFiles):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + nCancelledFiles
finally:
self.lock.release()
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nCancelledFiles = self.get('nCancelledFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
if nCompletedFiles == nFiles and uploadStatus != dmProcessingStatus.DM_PROCESSING_STATUS_PENDING:
uploadStatus = 'done'
if nProcessingErrors:
uploadStatus = 'failed'
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if nCancelledFiles > 0 and nCancelledFiles+nCompletedFiles == nFiles:
uploadStatus = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
endTime = self.get('lastFileProcessingCancelledTime', now)
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = uploadStatus
percentageComplete = 100.0
percentageProcessed = 100.0
percentageCancelled = 0.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageCancelled = float(nCancelledFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if nCancelledFiles > 0:
self['percentageCancelled'] = '%.2f' % percentageCancelled
#!/usr/bin/env python
from dmObject import DmObject
class UserExperimentRole(DmObject):
DEFAULT_KEY_LIST = [ 'userId', 'experimentId', 'roleTypeId' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'username', 'firstName', 'lastName', 'middleName', 'email', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserSystemRole(DmObject):
DEFAULT_KEY_LIST = [ 'userId', 'experimentStationId', 'roleTypeId' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class Workflow(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
STAT_UTILITY_KEY = 'statutility'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility)
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetriesIfNotSet(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSecondsIfNotSet(self.defaultRetryWaitPeriodInSeconds)
statUtilityObject = None
if statUtility:
statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
self.logger.debug('Using stat utility object %s' % str(statUtilityObject))
fileProcessor.statUtility = statUtilityObject
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
# Assign processor names
processorNumber = 0
for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey)
processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
processor.name = processorName
# Corect processor dependenciens
for processorKey in self.fileProcessorKeyList:
self.logger.debug('Determining dependencies for processor %s' % (processorKey))
processor = self.fileProcessorDict.get(processorKey)
dependsOn = []
for depProcessorKey in processor.dependsOn:
depProcessor = self.fileProcessorDict.get(depProcessorKey.lower())
if depProcessor:
dependsOn.append(depProcessor.name)
processor.dependsOn = dependsOn
self.logger.debug('Processor %s depends on: %s' % (processor.name, processor.dependsOn))
# Remove hidden files from dictionary of files to be processed
def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
self.logger.debug('Checking for hidden files')
nRemoved = 0
for (filePath,filePathDict) in filePathsDict.items():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
nRemoved += 1
self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')):
del uploadInfo['reprocessFiles']
return filePathsDict
self.logger.debug('Checking files with processor plugins')
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
# Processor will return list of files it must process
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict)))
return checkedFilePathsDict
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
def appendFileProcessor(self, fileProcessor):
key = fileProcessor.__class__.__name__
self.logger.debug('Adding file processor: %s' % key)
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList.append(key)
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()