diff --git a/bin/dm-get-upload-info b/bin/dm-get-upload-info new file mode 100755 index 0000000000000000000000000000000000000000..a696b30b70d478d46362c9f4716a201321bece18 --- /dev/null +++ b/bin/dm-get-upload-info @@ -0,0 +1,17 @@ +#!/bin/sh + +# Run command + +if [ -z $DM_ROOT_DIR ]; then + cd `dirname $0` && myDir=`pwd` + setupFile=$myDir/../setup.sh + if [ ! -f $setupFile ]; then + echo "Cannot find setup file: $setupFile" + exit 1 + fi + source $setupFile > /dev/null +fi + +$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/getUploadInfoCli.py $@ + + diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index 0c2169ce2e9a2af7e4611ed940bf348bc6c295fe..c053b6341ce420b81abdfed6d97ff10f3f919d06 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,3 +1,9 @@ +Release 0.7 () +============================= + +- Introduced framework for tracking progress of file uploads in DAQ service + + Release 0.6 (11/6/2015) ============================= diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py new file mode 100755 index 0000000000000000000000000000000000000000..ffae07d57a3bfd8334701182db73b7fa4048da28 --- /dev/null +++ b/src/python/dm/common/objects/uploadInfo.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from dmObject import DmObject + +class UploadInfo(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'percentageComplete' ] + + def __init__(self, dict={}): + DmObject.__init__(self, dict) + diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py index 8e3c433b87cc73e1e36bf29d96024d60575b726a..75e2720a75cb8b7205449c90a32f73f8eb41c790 100755 --- a/src/python/dm/daq_web_service/api/experimentRestApi.py +++ b/src/python/dm/daq_web_service/api/experimentRestApi.py @@ -7,6 +7,7 @@ import urllib from dm.common.utility.encoder import Encoder from dm.common.exceptions.dmException import DmException from dm.common.objects.experiment import Experiment +from dm.common.objects.uploadInfo import UploadInfo from daqRestApi import DaqRestApi class ExperimentRestApi(DaqRestApi): @@ -51,7 +52,15 @@ class ExperimentRestApi(DaqRestApi): url += '&dataDirectory=%s' % Encoder.encode(dataDirectory) url += '&daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo))) responseDict = self.sendSessionRequest(url=url, method='POST') - return Experiment(responseDict) + return UploadInfo(responseDict) + + @DaqRestApi.execute + def getUploadInfo(self, id): + url = '%s/experiments/upload/%s' % (self.getContextRoot(),id) + if not id: + raise InvalidRequest('Upload id must be provided.') + responseDict = self.sendSessionRequest(url=url, method='GET') + return UploadInfo(responseDict) ####################################################################### # Testing. diff --git a/src/python/dm/daq_web_service/cli/getUploadInfoCli.py b/src/python/dm/daq_web_service/cli/getUploadInfoCli.py new file mode 100755 index 0000000000000000000000000000000000000000..6900caba25560303af3b506872013817d74cec57 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/getUploadInfoCli.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +from daqWebServiceSessionCli import DaqWebServiceSessionCli +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest + +class GetUploadInfoCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) + self.addOption('', '--id', dest='id', help='Upload id.') + + def checkArgs(self): + if self.options.id is None: + raise InvalidRequest('Upload id must be provided.') + + def getId(self): + return self.options.id + + def runCommand(self): + self.parseArgs(usage=""" + dm-get-upload-status --id=ID + +Description: + Retrieves detailed information for a given data upload. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + uploadInfo = api.getUploadInfo(self.getId()) + print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = GetUploadInfoCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/service/daqWebService.py b/src/python/dm/daq_web_service/service/daqWebService.py index e4c33731f434980f858de7c1b47af75d9dfc6175..788c3da6c285283634e39ab2de3853b05709a35a 100755 --- a/src/python/dm/daq_web_service/service/daqWebService.py +++ b/src/python/dm/daq_web_service/service/daqWebService.py @@ -8,6 +8,8 @@ from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase from dm.common.utility.dmModuleManager import DmModuleManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.processing.fileProcessingManager import FileProcessingManager +from dm.daq_web_service.service.impl.daqProcessingCompleteNotificationPlugin import DaqProcessingCompleteNotificationPlugin + from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver from daqWebServiceRouteMapper import DaqWebServiceRouteMapper @@ -23,6 +25,10 @@ class DaqWebService(DmRestWebServiceBase): moduleManager = DmModuleManager.getInstance() moduleManager.addModule(FileSystemObserver.getInstance()) moduleManager.addModule(FileProcessingManager.getInstance()) + + # Requred processing plugin + notificationPlugin = DaqProcessingCompleteNotificationPlugin() + FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin) self.logger.debug('Initialized dm modules') def getDefaultServerHost(self): diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py index d4bbd9d0bab3393e2a2c01e6b96ed5c5ea88705c..84d18d509ee3bcc4f6beac72750a75dc6c1b4792 100755 --- a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py +++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py @@ -46,6 +46,14 @@ class ExperimentRouteDescriptor: 'method' : ['POST'] }, + # Get upload info + { + 'name' : 'getUploadInfo', + 'path' : '%s/experiments/upload/:(id)' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'getUploadInfo', + 'method' : ['GET'] + }, ] return routes diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py index 0140f7713366abc994ec402c64ce65ab7cf9349a..6878b62ebc45ce9fe6f37aa432d1d63612403d53 100755 --- a/src/python/dm/daq_web_service/service/experimentSessionController.py +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -68,3 +68,11 @@ class ExperimentSessionController(DmSessionController): self.logger.debug('Returning: %s' % response) return response + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def getUploadInfo(self, id, **kwargs): + response = self.experimentSessionControllerImpl.getUploadInfo(id).getFullJsonRep() + self.logger.debug('Returning: %s' % response) + return response + diff --git a/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py new file mode 100755 index 0000000000000000000000000000000000000000..5e36dc7282683f66f25c429631a52e9410d98e8a --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/daqProcessingCompleteNotificationPlugin.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +import os +from dm.common.utility.loggingManager import LoggingManager +from dm.common.processing.plugins.fileProcessor import FileProcessor +from uploadTracker import UploadTracker + +class DaqProcessingCompleteNotificationPlugin(FileProcessor): + + def __init__(self): + FileProcessor.__init__(self) + self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + + def processFile(self, fileInfo): + uploadId = fileInfo.get('uploadId') + filePath = fileInfo.get('filePath') + if uploadId != None: + self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId)) + uploadInfo = UploadTracker.getInstance().get(uploadId) + if uploadInfo != None: + fileDict = uploadInfo.get('fileDict', {}) + uploadFileInfo = fileDict.get(filePath) + if uploadFileInfo: + uploadFileInfo['processed'] = True + else: + self.logger.error('Upload tracker does not have upload id %s' %(uploadId)) + + +####################################################################### +# Testing. +if __name__ == '__main__': + pass diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index ecc2d56b5fdd82934009bfc4c07f051d8f802723..f556bf639fb5891b9aee9d8103453db759bc9e86 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -6,14 +6,19 @@ import os import time +import uuid from dm.common.objects.experiment import Experiment from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest +from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile +from dm.common.objects.uploadInfo import UploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager +from dm.common.utility.fileUtility import FileUtility from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from experimentTracker import ExperimentTracker +from uploadTracker import UploadTracker from fileSystemObserver import FileSystemObserver class ExperimentSessionControllerImpl(DmObjectManager): @@ -68,9 +73,49 @@ class ExperimentSessionControllerImpl(DmObjectManager): dataDirectory = daqInfo.get('dataDirectory') filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() + uploadId = str(uuid.uuid4()) + self.logger.debug('Starting upload id %s' % uploadId) + uploadInfo = UploadInfo() + uploadInfo['id'] = uploadId + uploadInfo['experiment'] = experimentName + uploadInfo['dataDirectory'] = dataDirectory + fileDict = {} for filePath in filePaths: fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo['uploadId'] = uploadId + fileUploadInfo = { 'processed' : False } + FileUtility.statFile(filePath, fileUploadInfo) + fileDict[filePath] = fileUploadInfo fileProcessingManager.processFile(fileInfo) + uploadInfo['fileDict'] = fileDict + #self.logger.debug('Upload info %s' % uploadInfo) ExperimentTracker.getInstance().put(experimentName, experiment) - return experiment + UploadTracker.getInstance().put(uploadId, uploadInfo) + return uploadInfo + + def getUploadInfo(self, id): + uploadInfo = UploadTracker.getInstance().get(id) + if not uploadInfo: + raise ObjectNotFound('Upload id %s not found.' % id) + uploadStatus = uploadInfo.get('status', 'running') + if uploadStatus == 'complete': + return uploadInfo + fileDict = uploadInfo.get('fileDict') + nFiles = len(fileDict) + nProcessedFiles = 0 + for (filePath,uploadFileInfo) in fileDict.items(): + if uploadFileInfo.get('processed'): + nProcessedFiles += 1 + + # need to handle 'failed' uploads + if nProcessedFiles == nFiles: + uploadStatus = 'complete' + uploadInfo['status'] = uploadStatus + uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles) + percentageComplete = 100.0 + if nFiles > 0: + percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0 + uploadInfo['percentageComplete'] = '%.2f' % percentageComplete + return uploadInfo + diff --git a/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py deleted file mode 100755 index f397d56652dde41830f6b58ba202dd037010fe59..0000000000000000000000000000000000000000 --- a/src/python/dm/daq_web_service/service/impl/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python - -import os -from dm.common.utility.loggingManager import LoggingManager -from dm.common.utility.osUtility import OsUtility -from dm.common.utility.fileUtility import FileUtility -from dm.common.exceptions.fileProcessingError import FileProcessingError -from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin -from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory - -class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): - - COMMAND = 'rsync -arvlPR' - - def __init__(self, src=None, dest=None): - FileTransferPlugin.__init__(self, self.COMMAND, src, dest) - self.dsFileApi = DsRestApiFactory.getFileRestApi() - self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) - - def processFile(self, fileInfo): - filePath = fileInfo.get('filePath') - dataDirectory = fileInfo.get('dataDirectory') - experiment = fileInfo.get('experiment') - experimentName = experiment.get('name') - experimentFilePath = fileInfo.get('experimentFilePath') - storageHost = experiment.get('storageHost') - storageDirectory = experiment.get('storageDirectory') - destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) - # Use relative path with respect to data directory as a source - os.chdir(dataDirectory) - srcUrl = self.getSrcUrl(filePath, dataDirectory) - - # Calculate checksum - FileUtility.statFile(filePath, fileInfo) - FileUtility.getMd5Sum(filePath, fileInfo) - self.logger.debug('File info before transfer: %s' % fileInfo) - - # Transfer file - self.start(srcUrl, destUrl) - - # Get remote checksum - fileInfo2 = {} - fileInfo2['experimentFilePath'] = experimentFilePath - fileInfo2['experimentName'] = experimentName - fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) - self.logger.debug('File stat after transfer: %s' % fileMetadata) - - # Verify checksum - if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): - self.logger.error('Checksum mismatch for file: %s' % filePath) - raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) - - # Remove file - self.logger.debug('Checksum test passed, deleting file %s' % filePath) - OsUtility.removeFile(srcUrl) - -####################################################################### -# Testing. -if __name__ == '__main__': - ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') - ft.start() - print 'StdOut: ', ft.getStdOut() - print 'StdErr: ', ft.getStdErr() - print 'Exit Status: ', ft.getExitStatus() diff --git a/src/python/dm/daq_web_service/service/impl/uploadTracker.py b/src/python/dm/daq_web_service/service/impl/uploadTracker.py new file mode 100755 index 0000000000000000000000000000000000000000..3f3adc935f1a4fa5e1b148389474f517a7670dfd --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/uploadTracker.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +from dm.common.objects.uploadInfo import UploadInfo +from dm.common.utility.objectTracker import ObjectTracker + +class UploadTracker(ObjectTracker): + + # Cache configuration + objectClass = UploadInfo + +#################################################################### +# Testing + +if __name__ == '__main__': + pass + + +