diff --git a/bin/dm-stop-upload b/bin/dm-stop-upload new file mode 100755 index 0000000000000000000000000000000000000000..c018a7e5fded250297e66164e5877a5b86ecef8b --- /dev/null +++ b/bin/dm-stop-upload @@ -0,0 +1,17 @@ +#!/bin/sh + +# Run command + +if [ -z $DM_ROOT_DIR ]; then + cd `dirname $0` && myDir=`pwd` + setupFile=$myDir/../setup.sh + if [ ! -f $setupFile ]; then + echo "Cannot find setup file: $setupFile" + exit 1 + fi + source $setupFile > /dev/null +fi + +$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/stopUploadCli.py $@ + + diff --git a/bin/dm-upload b/bin/dm-upload index 909be4e6be4f99af82c2b11a0bf830d57f5d1115..7c1c14d2e78ce0798c5159359bf63f1118c551a7 100755 --- a/bin/dm-upload +++ b/bin/dm-upload @@ -11,7 +11,8 @@ if [ -z $DM_ROOT_DIR ]; then fi source $setupFile > /dev/null fi +source dm_command_setup.sh -$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/uploadCli.py $@ +eval "$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/uploadCli.py $DM_COMMAND_ARGS" diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index b400025ab6d99e7856c68b95950017cf85547e43..ae985c9350f51dd249613b5cb98fd87b29e3daaa 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -5,6 +5,7 @@ Release 0.8 (01/26/2016) processed; for uploads system can detect files that had been processed already; improved handling and reporting of processing errors) - Source file checksum is calculated for rsync/gridftp plugins by default +- Added dm-stop-upload command - Resolved globus online user authorization delay issue Release 0.7 (12/08/2015) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 2e127fff03eb54c01185b5cbf60861357bf1c0a1..2a2af92776d35d8d861fcfaa84399e5d0035b6c3 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -29,10 +29,10 @@ class DaqInfo(DmObject): self.lock.acquire() try: self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1 - lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) - if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + lastFileProcessedTime = self.get('lastFileProcessedTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessedTime: self['lastFileProcessed'] = filePath - self['lastFileProcessingEndTime'] = processingEndTime + self['lastFileProcessedTime'] = processingEndTime finally: self.lock.release() @@ -43,9 +43,9 @@ class DaqInfo(DmObject): processingErrors[filePath] = processingError self['processingErrors'] = processingErrors self['nProcessingErrors'] = len(processingErrors) - lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) - if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: - self['lastFileProcessingEndTime'] = processingEndTime + lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: + self['lastFileProcessingErrorTime'] = processingEndTime finally: self.lock.release() @@ -60,10 +60,9 @@ class DaqInfo(DmObject): processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors - startTime = self.get('startTime') - if startTime: - runTime = now - startTime - self['runTime'] = runTime + startTime = self.get('startTime', now) + runTime = now - startTime + endTime = None percentageComplete = 100.0 percentageProcessed = 100.0 @@ -76,18 +75,21 @@ class DaqInfo(DmObject): self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors - runTime = now - self.get('startTime') - self['runTime'] = runTime - if self.get('endTime') and nCompletedFiles == nFiles: daqStatus = 'done' - lastFileProcessingEndTime = self.get('lastFileProcessingEndTime') - if lastFileProcessingEndTime > self['endTime']: - self['endTime'] = lastFileProcessingEndTime + lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') + lastFileProcessedTime = self.get('lastFileProcessedTime') + endTime = lastFileProcessedTime + if not endTime or lastFileProcessingErrorTime > endTime: + endTime = lastFileProcessingErrorTime + if self.get('endTime') > endTime: + endTime = self.get('endTime') + + if endTime: + runTime = endTime - startTime + self['endTime'] = endTime self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) - if startTime: - runTime = endTime - startTime - self['runTime'] = runTime + self['runTime'] = runTime self['status'] = daqStatus def toDictWithOriginalKeys(self): diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 21c9ba9198f9ebe70b6ba49b107a171f27681ea8..236d1a15079ff28ac16ac411c969ccca616ffeac 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -19,10 +19,10 @@ class UploadInfo(DmObject): self.lock.acquire() try: self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1 - lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) - if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + lastFileProcessedTime = self.get('lastFileProcessedTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessedTime: self['lastFileProcessed'] = filePath - self['lastFileProcessingEndTime'] = processingEndTime + self['lastFileProcessedTime'] = processingEndTime finally: self.lock.release() @@ -33,47 +33,76 @@ class UploadInfo(DmObject): processingErrors[filePath] = processingError self['processingErrors'] = processingErrors self['nProcessingErrors'] = len(processingErrors) - lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) - if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: - self['lastFileProcessingEndTime'] = processingEndTime + lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime: + self['lastFileProcessingErrorTime'] = processingEndTime + finally: + self.lock.release() + + def fileProcessingCancelled(self, filePath, processingEndTime): + self.lock.acquire() + try: + self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + 1 + lastFileProcessingCancelledTime = self.get('lastFileProcessingCancelledTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingCancelledTime: + self['lastFileProcessingCancelledTime'] = processingEndTime + finally: + self.lock.release() + + def uploadAborted(self, nCancelledFiles): + self.lock.acquire() + try: + self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + nCancelledFiles finally: self.lock.release() def updateStatus(self): now = time.time() uploadStatus = self.get('status', 'running') - if uploadStatus == 'done': + if uploadStatus == 'done' or uploadStatus == 'aborted': return nFiles = self.get('nFiles', 0) nProcessedFiles = self.get('nProcessedFiles', 0) + nCancelledFiles = self.get('nCancelledFiles', 0) nProcessingErrors = self.get('nProcessingErrors', 0) processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors - startTime = self.get('startTime') - if startTime: - runTime = now - startTime - self['runTime'] = runTime - + startTime = self.get('startTime', now) + runTime = now - startTime + endTime = None if nCompletedFiles == nFiles: uploadStatus = 'done' - endTime = self.get('lastFileProcessingEndTime', now) + lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') + lastFileProcessedTime = self.get('lastFileProcessedTime') + endTime = lastFileProcessedTime + if not endTime or lastFileProcessingErrorTime > endTime: + endTime = lastFileProcessingErrorTime + + if nCancelledFiles > 0 and nCancelledFiles+nCompletedFiles == nFiles: + uploadStatus = 'aborted' + endTime = self.get('lastFileProcessingCancelledTime', now) + + if endTime: + runTime = endTime - startTime self['endTime'] = endTime self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) - if startTime: - runTime = endTime - startTime - self['runTime'] = runTime + self['runTime'] = runTime self['status'] = uploadStatus percentageComplete = 100.0 percentageProcessed = 100.0 + percentageCancelled = 0.0 percentageProcessingErrors = 0.0 if nFiles > 0: - percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0 - percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0 - percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0 + percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0 + percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0 + percentageCancelled = float(nCancelledFiles)/float(nFiles)*100.0 + percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0 self['percentageComplete'] = '%.2f' % percentageComplete self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors + if nCancelledFiles > 0: + self['percentageCancelled'] = '%.2f' % percentageCancelled diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index a132b50adff3f098db02667d12336c1a63fd5be7..999207d97fb120cd699f6a4fb09b6ab9b0c9ef85 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -20,6 +20,70 @@ class FileProcessingThread(threading.Thread): self.fileProcessingQueue = fileProcessingQueue self.logger = LoggingManager.getInstance().getLogger(name) + def processFile(self, fileInfo): + filePath = fileInfo.get('filePath') + try: + statusMonitor = fileInfo['statusMonitor'] + if statusMonitor.get('status') == 'aborting': + self.logger.debug('File %s processing is cancelled' % (filePath)) + endProcessingTime = time.time() + statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) + fileInfo.clear() + return + self.logger.debug('Starting processing file %s' % filePath) + fileInfo['startProcessingTime'] = time.time() + nProcessors = len(self.fileProcessorKeyList) + processedByDict = fileInfo.get('processedByDict', {}) + fileInfo['processedByDict'] = processedByDict + processorNumber = 0 + for processorKey in self.fileProcessorKeyList: + processorNumber += 1 + processor = self.fileProcessorDict.get(processorKey) + processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) + + if processedByDict.has_key(processorName): + self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) + continue + + self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) + try: + processor.processFile(fileInfo) + processedByDict[processorName] = True + self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) + if processorNumber == nProcessors: + self.logger.debug('File %s processing is complete' % (filePath)) + endProcessingTime = time.time() + statusMonitor.fileProcessed(filePath, endProcessingTime) + fileInfo.clear() + except Exception, ex: + self.logger.exception(ex) + processingError = '%s processing error: %s' % (processorName, str(ex)) + self.logger.debug(processingError) + processingDict = fileInfo.get('processingDict', {}) + fileInfo['processingDict'] = processingDict + processorDict = processingDict.get(processorName, {}) + processingDict[processorName] = processorDict + + processorDict['lastError'] = str(ex) + nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) + self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) + processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 + if nRetriesLeft <= 0: + endProcessingTime = time.time() + statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) + fileInfo.clear() + self.logger.debug('No more %s retries left for file %s' % (processorName, filePath)) + else: + retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() + self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) + self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) + # Do not process this file further until + # this plugin is done + break + + except Exception, ex: + self.logger.exception(ex) + def run(self): self.logger.debug('Starting thread: %s' % self.getName()) while True: @@ -29,63 +93,12 @@ class FileProcessingThread(threading.Thread): break while True: - fileInfo = self.fileProcessingQueue.pop() - if fileInfo is None: - break - filePath = fileInfo.get('filePath') - try: - statusMonitor = fileInfo['statusMonitor'] - fileInfo['startProcessingTime'] = time.time() - processorNumber = 0 - nProcessors = len(self.fileProcessorKeyList) - processedByDict = fileInfo.get('processedByDict', {}) - fileInfo['processedByDict'] = processedByDict - for processorKey in self.fileProcessorKeyList: - processorNumber += 1 - processor = self.fileProcessorDict.get(processorKey) - processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) - - if processedByDict.has_key(processorName): - self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) - continue - - self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) - try: - processor.processFile(fileInfo) - processedByDict[processorName] = True - self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) - if processorNumber == nProcessors: - self.logger.debug('File %s processing is complete' % (filePath)) - endProcessingTime = time.time() - statusMonitor.fileProcessed(filePath, endProcessingTime) - fileInfo.clear() - except Exception, ex: - self.logger.exception(ex) - processingError = '%s processing error: %s' % (processorName, str(ex)) - self.logger.debug(processingError) - processingDict = fileInfo.get('processingDict', {}) - fileInfo['processingDict'] = processingDict - processorDict = processingDict.get(processorName, {}) - processingDict[processorName] = processorDict - - processorDict['lastError'] = str(ex) - nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) - self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) - processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 - if nRetriesLeft <= 0: - endProcessingTime = time.time() - statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) - fileInfo.clear() - self.logger.debug('No more %s retries left for file %s' % (processorName, filePath)) - else: - retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() - self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) - self.fileProcessingQueue.push(fileInfo, retryWaitPeriod) - # Do not process this file further until - # this plugin is done - break - + fileInfo = self.fileProcessingQueue.pop() + if fileInfo is None: + break + self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength()) + self.processFile(fileInfo) except Exception, ex: self.logger.exception(ex) diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 42f90fb1f30ab3058071e727aaee66e9e6ccd047..dc6c5740b33aca1b21f97e830e581bdeb71bd319 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -100,7 +100,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # Calculate checksum (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) ftpUtility = FtpUtility(host, port) - ftpUtility.statFile(filePath, fileInfo) + if not fileInfo.get('fileSize'): + ftpUtility.statFile(filePath, fileInfo) if self.localMd5Sum: ftpUtility.getMd5Sum(filePath, fileInfo) diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 7a80d0923e6b8dc0425a4505a17ec55a6c367dfb..6fd2bf9e0d62227098239d706f0d5bce24ae6d16 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -20,48 +20,44 @@ class MongoDbFileCatalogPlugin(FileProcessor): def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') - self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName)) + self.logger.debug('Processing file "%s" for experiment %s: %s' % (experimentFilePath, experimentName, fileInfo)) daqInfo = fileInfo.get('daqInfo') storageDirectory = daqInfo.get('storageDirectory') storageHost = daqInfo.get('storageHost') storageUrl = daqInfo.get('storageUrl') storageFilePath = os.path.join(storageDirectory, experimentFilePath) - fileProcessingTime = time.time() - fileProcessingTimestamp = TimeUtility.formatLocalTimestamp(fileProcessingTime) + fileInfo['fileProcessingTime'] = time.time() # Prepare catalogging entry fileInfo2 = {} for key in ['md5Sum', 'fileSize']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo.get(key, '') - for key in ['fileCreationTime', 'fileModificationTime']: + for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']: if fileInfo.has_key(key): t = fileInfo.get(key, 0) fileInfo2[key] = t - key2 = '%sstamp' % key + key2 = '%sstamp' % key fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t) fileLocations = fileInfo.get('fileLocations', []) fileLocations.append('%s/%s' % (storageUrl, experimentFilePath)) fileInfo2['fileName'] = os.path.basename(experimentFilePath) - fileInfo2['storageDirectory'] = storageDirectory - fileInfo2['storageUrl'] = storageUrl - fileInfo2['storageHost'] = storageHost - fileInfo2['storageFilePath'] = storageFilePath fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName - fileInfo2['fileProcessingTime'] = fileProcessingTime - fileInfo2['fileProcessingTimestamp'] = fileProcessingTimestamp fileInfo2['fileLocations'] = fileLocations self.logger.debug('Daq info: %s' % (daqInfo)) fileInfo2.update(daqInfo) if daqInfo.has_key('id'): fileInfo2['daqId'] = daqInfo.get('id') del fileInfo2['id'] + for key in ['storageDirectory', 'storageUrl', 'storageHost']: + if fileInfo2.has_key(key): + del fileInfo2[key] - self.logger.debug('File %s catalog entry: %s' % (experimentFilePath, str(fileInfo2))) + self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 0522362ec1ee6e25e065ee768dd9222d44503b14..4bea1d19aa6d358ae8dfa4f6316dcbf91746c869 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -58,7 +58,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum - FileUtility.statFile(filePath, fileInfo) + if not fileInfo.get('fileSize'): + FileUtility.statFile(filePath, fileInfo) if self.localMd5Sum: FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) diff --git a/src/python/dm/common/utility/ftpUtility.py b/src/python/dm/common/utility/ftpUtility.py index 4514cf63a74d0246274a7d30f6d1e509e6dad29a..c82f5607c52ad7bc32cfcb14b86c9322013da147 100755 --- a/src/python/dm/common/utility/ftpUtility.py +++ b/src/python/dm/common/utility/ftpUtility.py @@ -87,7 +87,6 @@ class FtpUtility: if self.serverUsesUtcTime: modifyTime = TimeUtility.utcToLocalTime(modifyTime) fileStatDict['fileModificationTime'] = modifyTime - fileStatDict['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(modifyTime) fileStatDict['fileSize'] = int(fileStatDict.get('fileSize')) del fileStatDict['Modify'] del fileStatDict['Type'] @@ -134,11 +133,13 @@ class FtpUtility: fileSize = int(fileInfo.get('fileSize')) #print '%s %s %s' % (filePath, fileSize, modifyTime) fileDict[filePath] = {'fileSize' : fileSize, 'fileModificationTime' : modifyTime} + self.mlsdFileDict.clear() mlsdDirList = copy.copy(self.mlsdDirList) for d in mlsdDirList: dirPath2 = '%s/%s' % (dirPath,d) replacementDirPath2 = '%s/%s' % (replacementDirPath,d) self.getFiles(dirPath2, fileDict, replacementDirPath2) + self.mlsdDirList = [] return fileDict def getMd5Sum(self, filePath, fileInfo={}): diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py index 24ea8101267e1c6257f69142b4dd9dcac3ad4ef5..241dfccf0782f1af48e3e5905f966aed229c176f 100755 --- a/src/python/dm/daq_web_service/api/experimentRestApi.py +++ b/src/python/dm/daq_web_service/api/experimentRestApi.py @@ -64,6 +64,14 @@ class ExperimentRestApi(DaqRestApi): responseDict = self.sendSessionRequest(url=url, method='GET') return UploadInfo(responseDict) + @DaqRestApi.execute + def stopUpload(self, id): + url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id) + if not id: + raise InvalidRequest('Upload id must be provided.') + responseDict = self.sendSessionRequest(url=url, method='POST') + return UploadInfo(responseDict) + ####################################################################### # Testing. diff --git a/src/python/dm/daq_web_service/cli/stopUploadCli.py b/src/python/dm/daq_web_service/cli/stopUploadCli.py new file mode 100755 index 0000000000000000000000000000000000000000..c153555384b77b2f90e555abd74ca05edc6fe466 --- /dev/null +++ b/src/python/dm/daq_web_service/cli/stopUploadCli.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +from daqWebServiceSessionCli import DaqWebServiceSessionCli +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from dm.common.exceptions.invalidRequest import InvalidRequest + +class StopUploadCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) + self.addOption('', '--id', dest='id', help='Upload id.') + + def checkArgs(self): + if self.options.id is None: + raise InvalidRequest('Upload id must be provided.') + + def getId(self): + return self.options.id + + def runCommand(self): + self.parseArgs(usage=""" + dm-stop-upload --id=ID + +Description: + Aborts specified data upload. + """) + self.checkArgs() + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + uploadInfo = api.stopUpload(self.getId()) + print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = StopUploadCli() + cli.run() + diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py index 7fc1ae82498dc43ba177451b8db12956d5e21557..04c019e41941b39133b1a1858c0c723d826b531b 100755 --- a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py +++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py @@ -63,6 +63,15 @@ class ExperimentRouteDescriptor: 'action' : 'getUploadInfo', 'method' : ['GET'] }, + + # Stop upload + { + 'name' : 'stopUpload', + 'controller' : experimentSessionController, + 'path' : '%s/experimentUploads/stopUpload/:(id)' % contextRoot, + 'action' : 'stopUpload', + 'method' : ['POST'] + }, ] return routes diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py index 09786ff61b5aa64c87683864576546599ba1b0ac..80036d9a52ed23bab3d2bbc17ab49d3ea14064b1 100755 --- a/src/python/dm/daq_web_service/service/experimentSessionController.py +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -89,3 +89,10 @@ class ExperimentSessionController(DmSessionController): self.logger.debug('Returning info for upload id %s' % id) return response + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def stopUpload(self, id, **kwargs): + response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep() + self.logger.debug('Stopped upload id %s' % id) + return response diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py index 18bd49baaf407611afc0898f214c7f4b5145b335..d538ad10fff1935baa7c07f5ee0957133cdb1388 100755 --- a/src/python/dm/daq_web_service/service/impl/daqTracker.py +++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py @@ -30,16 +30,20 @@ class DaqTracker(ObjectTracker): raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory)) daqId = str(uuid.uuid4()) daqInfo['id'] = daqId - daqInfo['nFiles'] = 0 - daqInfo['nProcessedFiles'] = 0 - daqInfo['nProcessingErrors'] = 0 daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory + + # Create DaqInfo object with keys that we want to save with file + # metadata, and add other keys later daqInfo2 = DaqInfo(daqInfo) + daqInfo2['nFiles'] = 0 + daqInfo2['nProcessedFiles'] = 0 + daqInfo2['nProcessingErrors'] = 0 + startTime = time.time() daqInfo2['startTime'] = startTime daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index e0c51b6cdde360a8980fb8f7bce6837364fba3a3..651e5bed4f80ed11c9da5c0901bb36bf1944d0fe 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -111,13 +111,23 @@ class ExperimentSessionControllerImpl(DmObjectManager): self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') fileProcessingManager = FileProcessingManager.getInstance() - for filePath in filePathsDict.keys(): + nProcessedFiles = 0 + nFiles = len(filePathsDict) + for (filePath,filePathDict) in filePathsDict.items(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileInfo['statusMonitor'] = uploadInfo try: - fileProcessingManager.processFile(fileInfo) + if uploadInfo.get('status') != 'aborting': + fileProcessingManager.processFile(fileInfo) + nProcessedFiles += 1 + else: + nCancelledFiles = nFiles - nProcessedFiles + uploadInfo.uploadAborted(nCancelledFiles) + self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) + break except Exception, ex: self.logger.error('Processing error: %s', ex) self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) @@ -129,4 +139,12 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadInfo.updateStatus() return uploadInfo + def stopUpload(self, id): + uploadInfo = UploadTracker.getInstance().get(id) + if not uploadInfo: + raise ObjectNotFound('Upload id %s not found.' % id) + uploadInfo['status'] = 'aborting' + uploadInfo.updateStatus() + return uploadInfo +