Skip to content
Snippets Groups Projects
Commit a6e57418 authored by sveseli's avatar sveseli
Browse files

introduced framework for tracking uploads

parent 5ba0f2ff
No related branches found
No related tags found
No related merge requests found
Showing
with 198 additions and 66 deletions
#!/bin/sh
# Run command
if [ -z $DM_ROOT_DIR ]; then
cd `dirname $0` && myDir=`pwd`
setupFile=$myDir/../setup.sh
if [ ! -f $setupFile ]; then
echo "Cannot find setup file: $setupFile"
exit 1
fi
source $setupFile > /dev/null
fi
$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/getUploadInfoCli.py $@
Release 0.7 ()
=============================
- Introduced framework for tracking progress of file uploads in DAQ service
Release 0.6 (11/6/2015)
=============================
......
#!/usr/bin/env python
from dmObject import DmObject
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'percentageComplete' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
......@@ -7,6 +7,7 @@ import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.uploadInfo import UploadInfo
from daqRestApi import DaqRestApi
class ExperimentRestApi(DaqRestApi):
......@@ -51,7 +52,15 @@ class ExperimentRestApi(DaqRestApi):
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
url += '&daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
return UploadInfo(responseDict)
@DaqRestApi.execute
def getUploadInfo(self, id):
url = '%s/experiments/upload/%s' % (self.getContextRoot(),id)
if not id:
raise InvalidRequest('Upload id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return UploadInfo(responseDict)
#######################################################################
# Testing.
......
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetUploadInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-upload-status --id=ID
Description:
Retrieves detailed information for a given data upload.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.getUploadInfo(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetUploadInfoCli()
cli.run()
......@@ -8,6 +8,8 @@ from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.daq_web_service.service.impl.daqProcessingCompleteNotificationPlugin import DaqProcessingCompleteNotificationPlugin
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
......@@ -23,6 +25,10 @@ class DaqWebService(DmRestWebServiceBase):
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
# Requred processing plugin
notificationPlugin = DaqProcessingCompleteNotificationPlugin()
FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin)
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
......
......@@ -46,6 +46,14 @@ class ExperimentRouteDescriptor:
'method' : ['POST']
},
# Get upload info
{
'name' : 'getUploadInfo',
'path' : '%s/experiments/upload/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getUploadInfo',
'method' : ['GET']
},
]
return routes
......
......@@ -68,3 +68,11 @@ class ExperimentSessionController(DmSessionController):
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getUploadInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getUploadInfo(id).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from uploadTracker import UploadTracker
class DaqProcessingCompleteNotificationPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
uploadId = fileInfo.get('uploadId')
filePath = fileInfo.get('filePath')
if uploadId != None:
self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId))
uploadInfo = UploadTracker.getInstance().get(uploadId)
if uploadInfo != None:
fileDict = uploadInfo.get('fileDict', {})
uploadFileInfo = fileDict.get(filePath)
if uploadFileInfo:
uploadFileInfo['processed'] = True
else:
self.logger.error('Upload tracker does not have upload id %s' %(uploadId))
#######################################################################
# Testing.
if __name__ == '__main__':
pass
......@@ -6,14 +6,19 @@
import os
import time
import uuid
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
......@@ -68,9 +73,49 @@ class ExperimentSessionControllerImpl(DmObjectManager):
dataDirectory = daqInfo.get('dataDirectory')
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo()
uploadInfo['id'] = uploadId
uploadInfo['experiment'] = experimentName
uploadInfo['dataDirectory'] = dataDirectory
fileDict = {}
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
FileUtility.statFile(filePath, fileUploadInfo)
fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadStatus = uploadInfo.get('status', 'running')
if uploadStatus == 'complete':
return uploadInfo
fileDict = uploadInfo.get('fileDict')
nFiles = len(fileDict)
nProcessedFiles = 0
for (filePath,uploadFileInfo) in fileDict.items():
if uploadFileInfo.get('processed'):
nProcessedFiles += 1
# need to handle 'failed' uploads
if nProcessedFiles == nFiles:
uploadStatus = 'complete'
uploadInfo['status'] = uploadStatus
uploadInfo['nProcessedFiles'] = '%s/%s' % (nProcessedFiles,nFiles)
percentageComplete = 100.0
if nFiles > 0:
percentageComplete = float(nProcessedFiles)/float(nFiles)*100.0
uploadInfo['percentageComplete'] = '%.2f' % percentageComplete
return uploadInfo
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):
COMMAND = 'rsync -arvlPR'
def __init__(self, src=None, dest=None):
FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experiment = fileInfo.get('experiment')
experimentName = experiment.get('name')
experimentFilePath = fileInfo.get('experimentFilePath')
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
self.logger.debug('File info before transfer: %s' % fileInfo)
# Transfer file
self.start(srcUrl, destUrl)
# Get remote checksum
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
self.logger.debug('File stat after transfer: %s' % fileMetadata)
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
# Remove file
self.logger.debug('Checksum test passed, deleting file %s' % filePath)
OsUtility.removeFile(srcUrl)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.utility.objectTracker import ObjectTracker
class UploadTracker(ObjectTracker):
# Cache configuration
objectClass = UploadInfo
####################################################################
# Testing
if __name__ == '__main__':
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment