Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 691 additions and 0 deletions
#!/usr/bin/env python
from dmObject import DmObject
class ChildProcess(DmObject):
DEFAULT_KEY_LIST = [ 'command', 'exitStatus', 'stdErr', 'stdOut', 'workingDir', 'childProcessNumber', 'stageId', 'status' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import copy
import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.originalKeys = dict.keys()
def fileAdded(self, filePath):
self.lock.acquire()
try:
self['nFiles'] = self.get('nFiles', 0) + 1
finally:
self.lock.release()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
# file can be processed multiple times, keep only the last error
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingSkipped(self, processorName, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
pluginStatsDict = self.get('pluginStats', {})
self['pluginStats'] = pluginStatsDict
pluginStats = pluginStatsDict.get(processorName, {})
pluginStatsDict[processorName] = pluginStats
pluginStats['nSkippedFiles'] = pluginStats.get('nSkippedFiles', 0) + 1
if processingError:
processingErrors = pluginStats.get('processingErrors', {})
processingErrors[filePath] = processingError
pluginStats['processingErrors'] = processingErrors
lastFileSkippedTime = pluginStats.get('lastFileSkippedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileSkippedTime:
pluginStats['lastFileSkippedTime'] = processingEndTime
finally:
self.lock.release()
def updateStatus(self):
now = time.time()
daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if daqStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
percentageComplete = 100.0
percentageProcessed = 100.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime'):
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING
if nCompletedFiles >= nFiles:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
if nProcessingErrors:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if self.get('endTime') > endTime:
endTime = self.get('endTime')
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = daqStatus
def toDictWithOriginalKeys(self):
dict = {}
for key in self.originalKeys:
if self.has_key(key):
dict[key] = self.get(key)
return dict
#!/usr/bin/env python
import time
from dmObject import DmObject
class DataFolder(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description', 'storageId', 'experimentId', 'dataPath' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class DatasetMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'datasetName', 'experimentName' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class DirectoryMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'directoryPath', 'experimentDirectoryPath' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class DirectoryUploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
startTime = self.get('startTime', now)
runTime = now - startTime
processingInfo = self.get('processingInfo')
endTime = 0
uploadStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
for processorName in processingInfo.keys():
processingEndTime = processingInfo[processorName].get('processingEndTime')
status = processingInfo[processorName].get('status')
if status in [dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED, dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]:
uploadStatus = status
if not processingEndTime and status != dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED:
endTime = None
break
if processingEndTime > endTime:
endTime = processingEndTime
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['status'] = uploadStatus
self['runTime'] = runTime
#!/usr/bin/env python
#
# DM Object class.
#
#######################################################################
import UserDict
import UserList
import types
import json
import datetime
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.utility import loggingManager
class DmObject(UserDict.UserDict):
""" Base dm object class. """
ALL_KEYS = 'ALL'
DEFAULT_KEYS = 'DEFAULT'
DICT_DISPLAY_FORMAT = 'dict'
TEXT_DISPLAY_FORMAT = 'text'
JSON_DISPLAY_FORMAT = 'json'
DEFAULT_KEY_LIST = [ 'id', 'name' ]
def __init__(self, dict={}):
if isinstance(dict, types.DictType):
UserDict.UserDict.__init__(self, dict)
elif isinstance(dict, UserDict.UserDict):
UserDict.UserDict.__init__(self, dict.data)
else:
raise InvalidArgument('DmObject instance must be initialized using dictionary.')
self.logger = None
def getLogger(self):
if not self.logger:
self.logger = loggingManager.getLogger(self._class__.__name__)
return self.logger
def getRequiredKeyValue(self, key):
value = self.get(key)
if value is None:
errorMsg = 'Required dictionary key %s is missing.' % key
raise ObjectNotFound(errorMsg)
@classmethod
def getFromDict(cls, dict):
inst = cls()
for key in dict.keys():
inst[key] = dict[key]
return inst
def getRepKeyList(self, keyList):
if keyList is None:
return self.DEFAULT_KEY_LIST
elif type(keyList) == types.ListType:
if not len(keyList):
return self.DEFAULT_KEY_LIST
else:
return keyList
elif type(keyList) == types.StringType:
if keyList == DmObject.ALL_KEYS:
return self.data.keys()
elif keyList == DmObject.DEFAULT_KEYS:
return self.DEFAULT_KEY_LIST
else:
# Assume keys are separated by comma
return keyList.split(',')
else:
# Unknown key list parameter.
raise InvalidArgument('Key list parameter must be one of: None, string "%s", string "%s", string containing comma-separated keys, or list of strings.' (DmObject.ALL_KEYS, DmObject.DEFAULT_KEYS))
def getDictRep(self, keyList=None):
# Dict representation is dict
dictRep = {}
displayKeyList = self.getRepKeyList(keyList)
for key in displayKeyList:
value = self.get(key)
if isinstance(value, DmObject):
dictRep[key] = value.getDictRep('ALL')
elif type(value) == types.ListType:
itemList = []
for item in value:
if isinstance(item, DmObject):
itemList.append(item.getDictRep('ALL'))
else:
itemList.append(item)
dictRep[key] = itemList
else:
if value is not None:
if isinstance(value, datetime.datetime):
dictRep[key] = str(value)
else:
dictRep[key] = value
return dictRep
def getTextRep(self, keyList=None):
display = ''
displayKeyList = self.getRepKeyList(keyList)
for key in displayKeyList:
value = self.get(key)
if isinstance(value, DmObject):
display = display + '%s={ %s} ' % (key, value.getTextRep())
elif isinstance(value, types.ListType):
display = display + '%s=[ ' % key
for item in value:
if isinstance(item, DmObject):
display = display + '{ %s}, ' % (item)
else:
display = display + ' %s, ' % (item)
display = display + '] '
else:
if value is not None:
display = display + '%s=%s ' % (key, value)
return display
def getJsonRep(self, keyList=None):
dictRep = self.getDictRep(keyList)
return json.dumps(dictRep)
def getFullJsonRep(self):
dictRep = self.getDictRep(DmObject.ALL_KEYS)
return json.dumps(dictRep)
@classmethod
def fromJsonString(cls, jsonString):
return cls.getFromDict(json.loads(jsonString))
def getDisplayString(self, displayKeyList=[], displayFormat=TEXT_DISPLAY_FORMAT):
""" Get display string. """
if displayFormat == DmObject.DICT_DISPLAY_FORMAT:
return self.getDictRep(displayKeyList)
elif displayFormat == DmObject.TEXT_DISPLAY_FORMAT:
return self.getTextRep(displayKeyList)
elif displayFormat == DmObject.JSON_DISPLAY_FORMAT:
return self.getJsonRep(displayKeyList)
raise InvalidArgument('Unrecognized display displayFormat: %s.' (displayFormat))
#######################################################################
# Testing.
if __name__ == '__main__':
x = {'name' : 'XYZ', 'one':1, 'two':2 }
o = DmObject(x)
print 'DM Object: ', o
print 'Type of DM object: ', type(o)
print 'JSON Rep: ', o.getJsonRep()
print 'Type of JSON rep: ', type(o.getJsonRep())
j = '{"name" : "XYZ", "one":1, "two":2 }'
print 'String: ', j
x2 = DmObject.fromJsonString(j)
print 'DM Object 2: ', x2
print 'Type of DM object 2: ', type(x2)
print x2.getDisplayString(displayKeyList='ALL')
#!/usr/bin/env python
#
# Base object manager class.
#
#######################################################################
import threading
from dm.common.utility.loggingManager import LoggingManager
#######################################################################
class DmObjectManager:
""" Base object manager class. """
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.lock = threading.RLock()
def getLogger(self):
return self.logger
def acquireLock(self):
self.lock.acquire()
def releaseLock(self):
self.lock.release()
#!/usr/bin/env python
from dmObject import DmObject
class DmSession(DmObject):
def __init__(self, dict={}, sessionId=None, updateTime=None):
DmObject.__init__(self, dict)
if sessionId is not None:
self['sessionId'] = sessionId
if updateTime is not None:
self['updateTime '] = updateTime
def setSessionId(self, sessionId):
self['sessionId'] = sessionId
def getSessionId(self):
return self.get('sessionId')
def setUpdateTime(self, updateTime):
self['updateTime'] = updateTime
def getUpdateTime(self):
return self.get('updateTime')
#!/usr/bin/env python
import time
from dmObject import DmObject
class Endpoint(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description', 'storageId', 'accessUrl' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import time
from dmObject import DmObject
class Experiment(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'experimentTypeId', 'experimentStationId', 'dataDirectory', 'startDate', 'endDate', 'daqStartTime', 'daqEndTime' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class ExperimentRoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentStation(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import time
from dmObject import DmObject
class ExperimentType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class FileMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'fileName', 'experimentName', 'locationList', 'filePath', 'fileSize', 'md5Sum', 'experimentFilePath' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class LdapUserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'userDn', 'userAttrs' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
def getUserPassword(self):
return self['userAttrs']['userPassword'][0]
#!/usr/bin/env python
import time
import os
from dmObject import DmObject
import urlparse
class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None, destDirectory=None):
DmObject.__init__(self, dict)
if filePath:
self['filePath'] = filePath
if dataDirectory:
self['dataDirectory'] = dataDirectory
if filePath:
parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if destDirectory:
self['destDirectory'] = destDirectory
self['experimentFilePath'] = '%s/%s' % (destDirectory, self['experimentFilePath'])
if experiment:
self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time()
def getLastUpdateTime(self):
self.get('lastUpdateTime')
def getFilePath(self):
return self.get('filePath')
def getDataDirectory(self):
return self.get('dataDirectory')
####################################################################
# Testing
if __name__ == '__main__':
of = ObservedFile(filePath='tmp/xyz')
print of
of.setLastUpdateTimeToNow()
print of
#!/usr/bin/env python
from dmObject import DmObject
class PluginInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'dependsOn' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.threadingUtility import ThreadingUtility
from dmObject import DmObject
class ProcessingJob(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'owner', 'experimentName', 'status', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
self.childProcessEvent = None
@ThreadingUtility.synchronize
def childProcessQueued(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
childProcess['submitTime'] = time.time()
stageId = childProcess.get('stageId')
childProcessNumber = childProcess.get('childProcessNumber', 0)
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) + 1
workflowStage['childProcesses'][childProcessNumber] = childProcess
@ThreadingUtility.synchronize
def childProcessStarted(self, childProcess):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
childProcess['startTime'] = time.time()
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nQueuedChildProcesses'] = workflowStage.get('nQueuedChildProcesses', 0) - 1
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) + 1
@ThreadingUtility.synchronize
def childProcessCompleted(self, childProcess, processInfo):
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nCompletedChildProcesses'] = workflowStage.get('nCompletedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
@ThreadingUtility.synchronize
def childProcessFailed(self, childProcess, processInfo={}, errorMessage=None):
self['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
childProcess['endTime'] = time.time()
childProcess['runTime'] = childProcess['startTime'] - childProcess['endTime']
for key in ['exitStatus', 'stdOut', 'stdErr']:
if processInfo.has_key(key):
childProcess[key] = processInfo.get(key)
if errorMessage:
childProcess['errorMessage'] = errorMessage
stageId = childProcess.get('stageId')
workflowStage = self['workflow']['stages'][stageId]
workflowStage['nRunningChildProcesses'] = workflowStage.get('nRunningChildProcesses', 0) - 1
workflowStage['nFailedChildProcesses'] = workflowStage.get('nFailedChildProcesses', 0) + 1
if self.childProcessEvent:
self.childProcessEvent.set()
#!/usr/bin/env python
from dmObject import DmObject
class ProposalInfo(DmObject):
DEFAULT_KEY_LIST = [ 'title', 'id', 'experimenters' ]
def __init__(self, dict):
DmObject.__init__(self, dict)