Commit aaf92c8c authored by hammonds's avatar hammonds
Browse files

Merge branch 'master' of https://git.aps.anl.gov/hammonds/dm.git

parents bd7870b2 b68f0a12
......@@ -46,6 +46,8 @@ An example of setting up the Data Management system for a developer is described
> cd ../dev
- Note some configuration can be changed before processing the as discussed below. There are two files **dm_dev.deploy.conf** and **dm.deploy.conf** which define some environment variables used in the scripts used to install & configure. For the test deployment, **dm_dev.deploy.conf** is used.
- Execute the dm/_deploy/_test/_system.sh file in the sbin directory
- Like installing the support tools, this script builds and installs several components of the DM system so it will take some time to complete.
- This deploy process will prompt for user input at several points in the process.
......
#!/usr/bin/env python
import sys
from dm import ExperimentDsApi
# This example assumes that the DM environment has been set by
# sourcing DM_INSTALL_ROOT/etc/dm.setup.sh:
# - DM_LOGIN_FILE should point to a file containing <username>|<password>
# - DM_DS_WEB_SERVICE_URL should point to the DM DS service
class ExperimentManager():
def __init__(self):
self.experimentDsApi = ExperimentDsApi()
def createExperiment(self, experimentName):
experiment = self.experimentDsApi.addExperiment(name=experimentName, typeName='TEST')
return experiment
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Usage: %s <experimentName>' % sys.argv[0])
sys.exit(1)
experimentName = sys.argv[1]
manager = ExperimentManager()
print(manager.createExperiment(experimentName))
#!/usr/bin/env python
import sys
from dm import ExperimentDaqApi
# This example assumes that the DM environment has been set by
# sourcing DM_INSTALL_ROOT/etc/dm.setup.sh:
# - DM_LOGIN_FILE should point to a file containing <username>|<password>
# - DM_DAQ_WEB_SERVICE_URL should point to the DM DAQ service
class DataUploader():
def __init__(self):
self.experimentDaqApi = ExperimentDaqApi()
def uploadDirectory(self, experimentName, dataDirectory):
# Experiment should already exist for this to work
uploadInfo = self.experimentDaqApi.upload(experimentName, dataDirectory)
return uploadInfo
if __name__ == '__main__':
if len(sys.argv) != 3:
print('Usage: %s <experimentName> <dataDirectory>' % sys.argv[0])
sys.exit(1)
experimentName = sys.argv[1]
dataDirectory = sys.argv[2]
uploader = DataUploader()
print(uploader.uploadDirectory(experimentName, dataDirectory))
#!/usr/bin/env python
import sys
from dm import ExperimentDaqApi
# This example assumes that the DM environment has been set by
# sourcing DM_INSTALL_ROOT/etc/dm.setup.sh:
# - DM_LOGIN_FILE should point to a file containing <username>|<password>
# - DM_DAQ_WEB_SERVICE_URL should point to the DM DAQ service
class DataUploader():
def __init__(self):
self.experimentDaqApi = ExperimentDaqApi()
def uploadFile(self, experimentName, dataDirectory, filePath):
# Experiment should already exist for this to work
uploadInfo = self.experimentDaqApi.upload(experimentName, dataDirectory, {'filePath' : filePath})
return uploadInfo
if __name__ == '__main__':
if len(sys.argv) != 4:
print('Usage: %s <experimentName> <dataDirectory> <filePath>' % sys.argv[0])
sys.exit(1)
experimentName = sys.argv[1]
dataDirectory = sys.argv[2]
filePath = sys.argv[3]
uploader = DataUploader()
print(uploader.uploadFile(experimentName, dataDirectory, filePath))
###################################################################
# Workflows
#
# Workflows are defined using dictionaries; they serve as templates for
# running processing jobs.
#
# Workflow keys:
# - name (required)
# - owner (required)
# - stages (required; value must be dictionary with one or more
# stage definitions)
# - description (optional)
# - id (assigned by the workflow DB; cannot be modified)
#
# Stage dictionary keys can be anything; they will get sorted,
# and stages will get executed in the sorted order
#
# Stage keys:
# - command (required; may use $variable strings that would get
# their values at runtime, via <key>:<value> arguments)
# - workingDir (optional; denotes command working directory)
# - parallelExec (optional; default True; this flag is relevant
# only if we iterate over files via the $filePath variable)
# - outputVariableRegexList (optional; list of regular expressions
# that define output variables that may be used in
# subsequent workflow stages)
# - repeatPeriod, repeatUntil, maxRepeats (optional; must be
# used together):
# - repeatPeriod:
# * defines period in seconds after which the stage command
# will be repeated
# * command must succeed every time it executes;
# command failure indicates stage failure
# * command will repeat as long as the repeatUntil condition
# evaluates to true, or number of command repeats
# reaches maxRepeats
# - repeatUntil:
# * defines condition that signals successful
# stage completion
# - maxRepeats:
# * defines maximum number of repeats; if this
# number is reached, stage will fail
#
# Reserved keys that cannot be used in a stage definition:
# - workflow: processing job workflow specification
#
# Reserved keys that may be used in a stage definition as command input
# variables:
# - id: processing job id
# - status: processing job status
# - owner: processing job owner
# - startTime: procesing job start time
# - endTime: procesing job end time
#
# Reserved keys designated for specifying processing job input
# files that may be used in a stage definition as command input
# variables:
# - filePath: input file path
# * if filePath is specified as one of the stage command
# input variables, the workflow stage will iterate over
# all job input files
# - filePathList: list of all input file paths
# - filePathPattern: glob pattern for input file paths
# - fileQueryDict: metadata catalog query dictionary
# * not yet implemented (reserved for future use)
# - dataDir: directory containing data files
#
# Any keys that are not reserved may be used in a stage
# definition as command input or output variables. Stage output
# variables can be used as input for any of the subsequent stages.
#
# Example workflow definition:
# {
# 'name': 'example-01'
# 'owner': 'dmtest',
# 'stages': {
# '01-START' : {
# 'command' : '/bin/date +%Y%m%d%H%M%S',
# 'outputVariableRegexList' : ['(?P<timeStamp>.*)']
# },
# '02-MKDIR' : {
# 'command' : '/bin/mkdir -p /tmp/workflow.$timeStamp'
# },
# '03-ECHO' : {
# 'command' : '/bin/echo "START JOB ID: $id" > /tmp/workflow.$timeStamp/$id.out'
# },
# '04-MD5SUM' : {
# 'command' : '/bin/md5sum $filePath | cut -f1 -d" "',
# 'outputVariableRegexList' : ['(?P<md5Sum>.*)']
# },
# '05-ECHO' : {
# 'command' : 'echo "FILE $filePath MD5 SUM: $md5Sum" >> /tmp/workflow.$timeStamp/$id.out'
# },
# '06-DONE' : {
# 'command' : '/bin/echo "STOP JOB ID: $id" >> /tmp/workflow.$timeStamp/$id.out'
# },
# },
# 'description': 'Workflow Example 01'
# }
#
###################################################################
###################################################################
# Processing Jobs
#
# Processing job are based on workflows. Their input arguments are
# specified at runtime as <key>:<value> pairs, where key names
# are matched with variable names in workflow definitions.
# For example, assume that one of the workflow stage commands
# is defined as
#
# {'command': '/bin/echo myArg=$myArg'}
#
# Processing job input argument 'myArg:ExampleString' would
# result in the actual command '/bin/echo myArg=ExampleString'
# that would be executed at stage runtime.
#
# Reserved keys that cannot be passed as job input at runtime:
# - workflow: processing job workflow specification
# - id: processing job id
# - stage: processing job workflow stage
# - status: processing job status
# - owner: processing job owner
# - startTime: procesing job start time
# - startTimestamp: procesing job start timestamp
# - endTime: procesing job end time
# - endTimeStamp: procesing job end timestamp
# - runTime: procesing job run time in seconds
# - errorMessage: error message
# - maxActiveJobs: maximum number of active jobs
# - nActiveJobs: number of active jobs
# - nFiles: number of input files
# - nProcessedFiles: number of processed files
# - nFailedFiles: number of failed files
# - nSkippedFiles: number of skipped files
# - nAbortedFiles: number of aborted files
# - nCompletedFiles: number of completed input files
# * sum of processed, failed, skipped and aborted files
# - processedFiles: list of processed files
# - failedFiles: list of failed files
# - skippedFiles: list of skipped files
# - abortedFiles: list of aborted files
#
# Reserved keys designated for specifying processing job input
# files that may be passed as job input at runtime:
# - filePath: input file path
# * if filePath:<aPath> is specified as part of job input, the
# job input file list will consist of a single file
# (i.e, [<aPath>])
# * filePath value will resolve into an actual file either as a
# full file path (starts with '/'), or as a path relative to
# the job data directory (does not start with '/' and
# dataDir is specified as well), or as a path relative to the
# command working directory (does not start with '/'
# and workingDir is specified)
# - filePathList: list of all input file paths
# * if filePathList:[<aPath1>, <aPath2>,...] is
# specified as part of job input, this list will be
# directly used as job input file list
# * paths in the list will resolve into actual files
# following the same rules as for the filePath above
# * filePathList is ignored if filePath is also specified
# as part of job input
# - filePathPattern: glob pattern for input file paths
# * if the pattern does not start with '/' and dataDir
# is specified, it will resolve into paths relative to
# dataDir
# * filePathPattern is ignored if either filePath or
# filePathList are specified as part of job input
# - fileQueryDict: metadata catalog query dictionary
# * not yet implemented (reserved for future use)
# - dataDir: directory containing data files
# * may be specified as part of job input together with
# any of filePath, filePathList, or filePathPattern
# to resolve relative paths into actual files
# * if none of filePath, filePathList, or filePathPattern
# are specified as part of job input, all files in
# this directory will be used as job input file list
#
# Other reserved keys that may be passed as job input at runtime:
# - workingDir: used to provide default value for job working
# directory (value provided as part of the stage command
# definition will override it)
# - jobOwner: used to specify job owner; default owner is
# the DM account under which job is submitted
#
# Any keys that are not reserved may be passed as part of job
# input at runtime.
#
###################################################################
{
'name': 'example-01',
'owner': 'dmtest',
'stages': {
'01-START' : {'command': '/bin/date +%Y%m%d%H%M%S', 'outputVariableRegexList' : ['(?P<timeStamp>.*)']},
'02-MKDIR' : {'command': '/bin/mkdir -p /tmp/workflow.$timeStamp'},
'03-ECHO' : {'command': '/bin/echo "START JOB ID: $id" > /tmp/workflow.$timeStamp/$id.out'},
'04-MD5SUM' : {'command': '/bin/md5sum $filePath | cut -f1 -d" "', 'outputVariableRegexList' : ['(?P<md5Sum>.*)']},
'05-ECHO' : {'command': 'echo "FILE $filePath MD5 SUM: $md5Sum" >> /tmp/workflow.$timeStamp/$id.out'},
'06-RANDOM' : {'command': 'echo $((1 + RANDOM % 10))', 'outputVariableRegexList' : ['(?P<randomNumber>.*)']},
'07-ECHO' : {'command': 'echo "RANDOM NUMBER: $randomNumber" >> /tmp/workflow.$timeStamp/$id.out'},
'08-SLEEP' : {'command': 'sleep $randomNumber'},
'09-COUNT' : {'command': 'echo 0', 'outputVariableRegexList' : ['(?P<count>.*)']},
'10-REPEAT' : {'command': 'echo "Count: `expr $count + 1`"', 'outputVariableRegexList' : ['Count: (?P<count>.*)'], 'repeatPeriod' : 10, 'repeatUntil' : '"$count" == "$randomNumber"', 'maxRepeats' : 10},
'11-ECHO' : {'command': 'echo "REPEAT COUNT: $count" >> /tmp/workflow.$timeStamp/$id.out'},
'12-DONE' : {'command': '/bin/echo "STOP JOB ID: $id" >> /tmp/workflow.$timeStamp/$id.out'},
},
'description': 'Workflow Example 01'
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment