Commit 6987b8db authored by sveseli's avatar sveseli
Browse files

Merge branch '0.9.1' into release/0.9

parents 902801c1 a5ba277e
#!/bin/sh
# Run command
if [ -z $DM_ROOT_DIR ]; then
cd `dirname $0` && myDir=`pwd`
setupFile=$myDir/../setup.sh
if [ ! -f $setupFile ]; then
echo "Cannot find setup file: $setupFile"
exit 1
fi
source $setupFile > /dev/null
fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listDaqsCli.py $@
#!/bin/sh
# Run command
if [ -z $DM_ROOT_DIR ]; then
cd `dirname $0` && myDir=`pwd`
setupFile=$myDir/../setup.sh
if [ ! -f $setupFile ]; then
echo "Cannot find setup file: $setupFile"
exit 1
fi
source $setupFile > /dev/null
fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/listUploadsCli.py $@
Release 0.10 (03/11/2016)
=============================
- Added dm-list-daqs and dm-list-uploads commands
- Resolved issue with newly created directories treated as files for
real-time data acquisitions
Release 0.9 (02/25/2016)
=============================
......
......@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.9 (DM_DATE)"
DM_SOFTWARE_VERSION="0.10 (DM_DATE)"
#!/bin/sh
#
# Script used for backing up DM database + web app
# Deployment configuration can be set in etc/$DM_DM_DB_NAME.deploy.conf file
#
# Usage:
#
# $0 [DM_DB_NAME [DM_BACKUP_DIR]]
#
DM_DB_NAME=dm
DM_DB_HOST=127.0.0.1
DM_DB_PORT=11136
DM_DB_ADMIN_USER=postgres
DM_DB_ADMIN_PASSWORD=
CURRENT_DIR=`pwd`
MY_DIR=`dirname $0` && cd $MY_DIR && MY_DIR=`pwd`
cd $CURRENT_DIR
if [ -z "${DM_ROOT_DIR}" ]; then
DM_ROOT_DIR=$MY_DIR/..
fi
DM_SQL_DIR=$DM_ROOT_DIR/db/sql/dm
DM_ENV_FILE=${DM_ROOT_DIR}/setup.sh
if [ ! -f ${DM_ENV_FILE} ]; then
echo "Environment file ${DM_ENV_FILE} does not exist."
exit 2
fi
. ${DM_ENV_FILE} > /dev/null
# Use first argument as db name, if provided
if [ ! -z "$1" ]; then
DM_DB_NAME=$1
fi
echo "Backing up $DM_DB_NAME"
# Look for deployment file in etc directory, and use it to override
# default entries
deployConfigFile=$DM_ROOT_DIR/etc/${DM_DB_NAME}.deploy.conf
if [ -f $deployConfigFile ]; then
echo "Using deployment config file: $deployConfigFile"
. $deployConfigFile
else
echo "Deployment config file $deployConfigFile not found, using defaults"
fi
# Determine run directory
if [ -z "${DM_INSTALL_DIR}" ]; then
DM_INSTALL_DIR=$DM_ROOT_DIR/..
fi
# Second argument overrides directory with db population scripts
#timestamp=`date +%Y%m%d.%H%M%S`
timestamp=`date +%Y%m%d`
DM_BACKUP_DIR=$2
if [ -z $DM_BACKUP_DIR ]; then
DM_BACKUP_DIR=$DM_INSTALL_DIR/backup/$DM_DB_NAME/$timestamp
fi
backupFile=${DM_DB_NAME}.backup.$timestamp.sql
fullBackupFilePath=$DM_BACKUP_DIR/$backupFile
# Read password
sttyOrig=`stty -g`
stty -echo
read -p "Enter $DM_DB_NAME user password: " DM_DB_USER_PASSWORD
stty $sttyOrig
echo
DM_DB_USER_PASSWORD_FILE=/tmp/${DM_DB_NAME}.${DM_DB_USER}.passwd
echo $DM_DB_HOST:$DM_DB_PORT:$DM_DB_NAME:$DM_DB_USER:$DM_DB_USER_PASSWORD > $DM_DB_USER_PASSWORD_FILE && chmod 600 $DM_DB_USER_PASSWORD_FILE || exit 1
pgDumpCmd="PGPASSFILE=$DM_DB_USER_PASSWORD_FILE pg_dump -C -c -w --column-inserts -p $DM_DB_PORT -h $DM_DB_HOST -U $DM_DB_USER -d $DM_DB_NAME"
cleanup() {
rm -f $DM_DB_USER_PASSWORD
}
execute() {
eval "$@"
}
echo
echo "Using DB backup directory: $DM_BACKUP_DIR"
mkdir -p $DM_BACKUP_DIR
eval $pgDumpCmd > $fullBackupFilePath || ( cleanup && exit 1 )
nTables=`grep -n "Data for Name" $fullBackupFilePath | grep TABLE | wc -l`
echo "Processing $nTables tables"
tableCnt=0
processingFile=$DM_BACKUP_DIR/process.txt
while [ $tableCnt -lt $nTables ]; do
tableCnt=`expr $tableCnt + 1`
headLine=$tableCnt
tailLine=2
echo "Working on table #: $tableCnt"
grep -n "TABLE DATA" $fullBackupFilePath | head -$headLine | tail -$tailLine > $processingFile
dbTable=`cat $processingFile | head -1 | awk '{print $5}' | sed 's?;??g'`
echo "Creating sql script for $dbTable"
targetFile=$DM_BACKUP_DIR/populate_$dbTable.sql
pgDumpCmd="PGPASSFILE=$DM_DB_USER_PASSWORD_FILE pg_dump -C -a -t $dbTable -w --column-inserts -p $DM_DB_PORT -h $DM_DB_HOST -U $DM_DB_USER -d $DM_DB_NAME"
eval $pgDumpCmd > $targetFile || ( cleanup && exit 1 )
done
rm -f $processingFile
# Backup web app
echo "Backing up $DM_DB_NAME web app"
rsync -arlvP $DM_SUPPORT_DIR/glassfish/linux-x86_64/glassfish/domains/domain1/autodeploy/$DM_DB_NAME.war $DM_BACKUP_DIR
cleanup
echo "Backup of $DM_DB_NAME is done."
__version__ = "0.9 (2016.02.24)"
__version__ = "0.10 (2016.03.21)"
......@@ -159,7 +159,7 @@ class SessionManager:
parsedUrl = urlparse.urlparse(url)
protocol = parsedUrl[0]
path = parsedUrl[2]
self.logger.debug('Sending request: %s' % url)
#self.logger.debug('Sending request: %s' % url)
encodedData = ''
if data is not None:
if type(data) == types.DictType and len(data):
......
#!/usr/bin/env python
#######################################################################
DM_PROCESSING_STATUS_ANY = 'any'
DM_PROCESSING_STATUS_PENDING = 'pending'
DM_PROCESSING_STATUS_RUNNING = 'running'
DM_PROCESSING_STATUS_FINALIZING = 'finalizing'
DM_PROCESSING_STATUS_DONE = 'done'
DM_PROCESSING_STATUS_FAILED = 'failed'
DM_PROCESSING_STATUS_SKIPPED = 'skipped'
DM_PROCESSING_STATUS_ABORTING = 'aborting'
DM_PROCESSING_STATUS_ABORTED = 'aborted'
DM_ALLOWED_PROCESSING_STATUS_LIST = [
DM_PROCESSING_STATUS_ANY,
DM_PROCESSING_STATUS_PENDING,
DM_PROCESSING_STATUS_RUNNING,
DM_PROCESSING_STATUS_FINALIZING,
DM_PROCESSING_STATUS_DONE,
DM_PROCESSING_STATUS_FAILED,
DM_PROCESSING_STATUS_SKIPPED,
DM_PROCESSING_STATUS_ABORTING,
DM_PROCESSING_STATUS_ABORTED
]
DM_INACTIVE_PROCESSING_STATUS_LIST = [
DM_PROCESSING_STATUS_DONE,
DM_PROCESSING_STATUS_FAILED,
DM_PROCESSING_STATUS_SKIPPED,
DM_PROCESSING_STATUS_ABORTED
]
DM_ACTIVE_PROCESSING_STATUS_LIST = [
DM_PROCESSING_STATUS_PENDING,
DM_PROCESSING_STATUS_RUNNING,
DM_PROCESSING_STATUS_ABORTING
]
......@@ -5,6 +5,7 @@ import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
......@@ -51,8 +52,8 @@ class DaqInfo(DmObject):
def updateStatus(self):
now = time.time()
daqStatus = self.get('status', 'running')
if daqStatus in ['done', 'failed']:
daqStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if daqStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
......@@ -75,10 +76,12 @@ class DaqInfo(DmObject):
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if self.get('endTime') and nCompletedFiles == nFiles:
daqStatus = 'done'
if nProcessingErrors:
daqStatus = 'failed'
if self.get('endTime'):
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FINALIZING
if nCompletedFiles == nFiles:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
if nProcessingErrors:
daqStatus = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
......
......@@ -4,6 +4,7 @@ import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
......@@ -17,8 +18,8 @@ class DirectoryUploadInfo(DmObject):
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', 'running')
if uploadStatus in ['done', 'aborted', 'failed']:
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
startTime = self.get('startTime', now)
......@@ -30,10 +31,10 @@ class DirectoryUploadInfo(DmObject):
for processorName in processingInfo.keys():
processingEndTime = processingInfo[processorName].get('processingEndTime')
status = processingInfo[processorName].get('status')
if status in ['aborted', 'failed']:
if status in [dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED, dmProcessingStatus.DM_PROCESSING_STATUS_FAILED]:
uploadStatus = status
if not processingEndTime and status != 'skipped':
if not processingEndTime and status != dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED:
endTime = None
break
......
......@@ -4,6 +4,7 @@ import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
......@@ -58,8 +59,8 @@ class UploadInfo(DmObject):
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', 'running')
if uploadStatus in ['done', 'aborted', 'failed']:
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
......
......@@ -2,6 +2,7 @@
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager
......@@ -31,10 +32,11 @@ class FileProcessingThread(threading.Thread):
try:
statusMonitor = fileInfo.get('statusMonitor')
if statusMonitor and statusMonitor.get('status') == 'aborting':
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
statusMonitor.updateStatus()
return
self.logger.debug('Starting to process file %s' % filePath)
startProcessingTime = fileInfo.get('startProcessingTime', time.time())
......@@ -66,6 +68,7 @@ class FileProcessingThread(threading.Thread):
endProcessingTime = time.time()
if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime)
statusMonitor.updateStatus()
except Exception, ex:
self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex))
......@@ -83,11 +86,12 @@ class FileProcessingThread(threading.Thread):
endProcessingTime = time.time()
if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
return
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until
# this plugin is done
......
......@@ -34,15 +34,14 @@ class FileTransferPlugin(FileProcessor):
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
def getSrcUrl(self, filePath, dataDirectory):
# Use relative path with respect to data directory as a source
srcUrl = os.path.relpath(filePath, dataDirectory)
srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s' % (self.dest)
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s' % (storageHost, storageDirectory)
destUrl = '%s:%s/' % (storageHost, storageDirectory)
return destUrl
def getSrcDirUrl(self, dataDirectory):
......
......@@ -18,7 +18,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file "%s" for experiment %s: %s' % (experimentFilePath, experimentName, fileInfo))
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
......
......@@ -11,9 +11,9 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP'
DRY_RUN_COMMAND = 'rsync -arvlP'
DEFAULT_COMMAND = 'rsync -arvlPR --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
......@@ -29,7 +29,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run()
lines = subprocess.getStdOut().split('\n')
......@@ -64,7 +64,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s' % fileInfo)
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory)
# Get remote checksum
......
......@@ -87,6 +87,10 @@ class ObjectCache:
id, item, updateTime, expirationTime = itemTuple
return item
def getAll(self):
# Item tuple: id, item, updateTime, expirationTime = itemTuple
return map(lambda itemTuple:itemTuple[1], self.objectMap.values())
def getItemTuple(self, id):
itemTuple = self.objectMap.get(id)
if itemTuple is None:
......
......@@ -37,6 +37,9 @@ class ObjectTracker(Singleton):
def get(self, id):
return self.objectCache.get(id)
def getAll(self):
return self.objectCache.getAll()
def remove(self, id):
return self.objectCache.remove(id)
......
......@@ -4,6 +4,7 @@ import os
import json
import urllib
from dm.common.constants import dmProcessingStatus
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
......@@ -46,6 +47,14 @@ class ExperimentRestApi(DaqRestApi):
responseDict = self.sendSessionRequest(url=url, method='GET')
return DaqInfo(responseDict)
@DaqRestApi.execute
def listDaqs(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentDaqsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, DaqInfo)
@DaqRestApi.execute
def upload(self, experimentName, dataDirectory, daqInfo={}):
if not experimentName:
......@@ -65,6 +74,14 @@ class ExperimentRestApi(DaqRestApi):
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
@DaqRestApi.execute
def listUploads(self, status=None):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
url = '%s/experimentUploadsByStatus/%s' % (self.getContextRoot(),status)
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UploadInfo)
@DaqRestApi.execute
def stopUpload(self, id):
url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
......
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment