diff --git a/src/python/dm/common/constants/dmProcessingMode.py b/src/python/dm/common/constants/dmProcessingMode.py new file mode 100755 index 0000000000000000000000000000000000000000..c9b1fad6c388d279d6d1caaf00eaf585b7276b88 --- /dev/null +++ b/src/python/dm/common/constants/dmProcessingMode.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +####################################################################### + +DM_PROCESSING_MODE_FILES = 'files' +DM_PROCESSING_MODE_DIRECTORY = 'directory' + +DM_ALLOWED_PROCESSING_MODE_LIST = [DM_PROCESSING_MODE_FILES, DM_PROCESSING_MODE_DIRECTORY] diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index 2a2af92776d35d8d861fcfaa84399e5d0035b6c3..fffd5a5911cc00479260b7f57ac5948e4943c6bc 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -52,7 +52,7 @@ class DaqInfo(DmObject): def updateStatus(self): now = time.time() daqStatus = self.get('status', 'running') - if daqStatus == 'done': + if daqStatus in ['done', 'failed']: return nFiles = self.get('nFiles', 0) nProcessedFiles = self.get('nProcessedFiles', 0) @@ -77,6 +77,8 @@ class DaqInfo(DmObject): if self.get('endTime') and nCompletedFiles == nFiles: daqStatus = 'done' + if nProcessingErrors: + daqStatus = 'failed' lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') lastFileProcessedTime = self.get('lastFileProcessedTime') endTime = lastFileProcessedTime diff --git a/src/python/dm/common/objects/directoryMetadata.py b/src/python/dm/common/objects/directoryMetadata.py new file mode 100755 index 0000000000000000000000000000000000000000..1acf4e9e24d9c255199fe25a288d880d049ab73a --- /dev/null +++ b/src/python/dm/common/objects/directoryMetadata.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from dmObject import DmObject + +class DirectoryMetadata(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'directoryPath', 'experimentDirectoryPath' ] + + def __init__(self, dict): + DmObject.__init__(self, dict) + diff --git a/src/python/dm/common/objects/directoryUploadInfo.py b/src/python/dm/common/objects/directoryUploadInfo.py new file mode 100755 index 0000000000000000000000000000000000000000..b39e551d8b62a47bae12bc27c9ba91d6cc79c337 --- /dev/null +++ b/src/python/dm/common/objects/directoryUploadInfo.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +import time +import threading + +from dmObject import DmObject +from dm.common.utility.dictUtility import DictUtility +from dm.common.utility.timeUtility import TimeUtility + +class DirectoryUploadInfo(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] + + def __init__(self, dict={}): + DmObject.__init__(self, dict) + self.lock = threading.RLock() + + def updateStatus(self): + now = time.time() + uploadStatus = self.get('status', 'running') + if uploadStatus in ['done', 'aborted', 'failed']: + return + + startTime = self.get('startTime', now) + runTime = now - startTime + + processingInfo = self.get('processingInfo') + endTime = 0 + uploadStatus = 'done' + for processorName in processingInfo.keys(): + processingEndTime = processingInfo[processorName].get('processingEndTime') + status = processingInfo[processorName].get('status') + if status in ['aborted', 'failed']: + uploadStatus = status + + if not processingEndTime: + endTime = None + break + + if processingEndTime > endTime: + endTime = processingEndTime + + if endTime: + runTime = endTime - startTime + self['endTime'] = endTime + self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) + self['status'] = uploadStatus + self['runTime'] = runTime + + + diff --git a/src/python/dm/common/objects/pluginInfo.py b/src/python/dm/common/objects/pluginInfo.py new file mode 100755 index 0000000000000000000000000000000000000000..4f944dcb2e920c3853a9edb44df4db79bcdf7b7e --- /dev/null +++ b/src/python/dm/common/objects/pluginInfo.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +from dmObject import DmObject + +class PluginInfo(DmObject): + + DEFAULT_KEY_LIST = [ 'id', 'name', 'dependsOn' ] + + def __init__(self, dict): + DmObject.__init__(self, dict) + diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index c05337f6c833273bb03b4e22608d006941793514..1b0e096b42f4c6ed79ad0512a7346e346a2c7ed6 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -59,7 +59,7 @@ class UploadInfo(DmObject): def updateStatus(self): now = time.time() uploadStatus = self.get('status', 'running') - if uploadStatus == 'done' or uploadStatus == 'aborted': + if uploadStatus in ['done', 'aborted', 'failed']: return nFiles = self.get('nFiles', 0) nProcessedFiles = self.get('nProcessedFiles', 0) @@ -73,6 +73,8 @@ class UploadInfo(DmObject): endTime = None if nCompletedFiles == nFiles: uploadStatus = 'done' + if nProcessingErrors: + uploadStatus = 'failed' lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime') lastFileProcessedTime = self.get('lastFileProcessedTime') endTime = lastFileProcessedTime diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index faec3eb246ef0a7ab15ada1d656f7d00dcbb0b85..dde6b4754be052ae81392ab89eb77b7e04077f58 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -67,6 +67,26 @@ class FileProcessingManager(threading.Thread,Singleton): self.fileProcessorKeyList.sort() self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList) + # Assign processor names + processorNumber = 0 + for processorKey in self.fileProcessorKeyList: + processorNumber += 1 + processor = self.fileProcessorDict.get(processorKey) + processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) + processor.name = processorName + + # Corect processor dependenciens + for processorKey in self.fileProcessorKeyList: + self.logger.debug('Determining dependencies for processor %s' % (processorKey)) + processor = self.fileProcessorDict.get(processorKey) + dependsOn = [] + for depProcessorKey in processor.dependsOn: + depProcessor = self.fileProcessorDict.get(depProcessorKey.lower()) + if depProcessor: + dependsOn.append(depProcessor.name) + processor.dependsOn = dependsOn + self.logger.debug('Processor %s depends on: %s' % (processor.name, processor.dependsOn)) + # Remove hidden files from dictionary of files to be processed def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo): if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')): diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 25d227ea6c8261ee870c28f82971e8be55454fc7..9c58a16aa0be52bccada0c796d44f407792664c3 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -45,7 +45,7 @@ class FileProcessingThread(threading.Thread): for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) - processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) + processorName = processor.name if processedByDict.has_key(processorName): self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index ab19c03747c40e6bf4b4d17dbae8527101fd9f1f..435c81598a445b80a697d13f023e58f4741bf39b 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -6,8 +6,14 @@ class FileProcessor: DEFAULT_NUMBER_OF_RETRIES = 0 DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS = 60 - def __init__(self): + def __init__(self, dependsOn=[]): self.configDict = {} + self.processorName = self.__class__.__name__ + self.dependsOn = dependsOn + + @abc.abstractmethod + def processDirectory(self, directoryInfo): + return NotImplemented @abc.abstractmethod def processFile(self, fileInfo): diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index 80df827e985f2bb187a06d43dd5872b261fdf767..25158a41fc9b58f44908c6e73b058646677cd87c 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -10,8 +10,8 @@ from fileProcessor import FileProcessor class FileTransferPlugin(FileProcessor): - def __init__(self, command, src=None, dest=None): - FileProcessor.__init__(self) + def __init__(self, command, src=None, dest=None, dependsOn=[]): + FileProcessor.__init__(self, dependsOn) self.src = src self.dest = dest self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) @@ -31,7 +31,7 @@ class FileTransferPlugin(FileProcessor): storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) - self.start(srcUrl, destUrl, fileInfo) + self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) def getSrcUrl(self, filePath, dataDirectory): # Use relative path with respect to data directory as a source @@ -45,8 +45,21 @@ class FileTransferPlugin(FileProcessor): destUrl = '%s:%s' % (storageHost, storageDirectory) return destUrl - def getFullCommand(self, src, dest): - return '%s "%s" "%s"' % (self.command, src, dest) + def getSrcDirUrl(self, dataDirectory): + return '%s/' % dataDirectory + + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): + if self.dest: + destUrl = '%s/' % (self.dest) + else: + destUrl = '%s:%s/' % (storageHost, storageDirectory) + return destUrl + + def getFullCommand(self, src, dest, command=None): + if command: + return '%s "%s" "%s"' % (command, src, dest) + else: + return '%s "%s" "%s"' % (self.command, src, dest) def setSrc(self, src): self.src = src @@ -54,7 +67,7 @@ class FileTransferPlugin(FileProcessor): def setDest(self, dest): self.dest = dest - def start(self, src=None, dest=None, fileInfo={}, cwd=None): + def start(self, src=None, dest=None, command=None, templateInfo={}, cwd=None): # Use preconfigured source if provided source is None fileSrc = src if src is None: @@ -65,17 +78,17 @@ class FileTransferPlugin(FileProcessor): # If destination is local, attempt to create it if self.dest is not None and self.dest.find(':') < 0: - destDir = self.replaceTemplates(self.dest, fileInfo) + destDir = self.replaceTemplates(self.dest, templateInfo) try: OsUtility.createDir(destDir) except Exception, ex: self.logger.warn('Transfer may fail due to failure to create destination directory %s: %s' % (destDir, str(ex))) - fileSrc = self.replaceTemplates(fileSrc, fileInfo) - fileDest = self.replaceTemplates(fileDest, fileInfo) + fileSrc = self.replaceTemplates(fileSrc, templateInfo) + fileDest = self.replaceTemplates(fileDest, templateInfo) if not fileSrc or not fileDest: raise InvalidRequest('Both source and destination must be non-empty strings.') - self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest), cwd=cwd) + self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest, command), cwd=cwd) return self.subprocess.run() def wait(self): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index b07cc50413a43766784c924c12f67b6161a20dbd..90ac727db198f7bbc4eb18ce7a7935b7725e8dfd 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -16,8 +16,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): - FileTransferPlugin.__init__(self, command, src, dest) + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): + FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum @@ -107,7 +107,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, fileInfo=fileInfo, cwd=dataDirectory) + self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum if self.remoteMd5Sum: @@ -130,6 +130,39 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # self.logger.debug('Deleting file %s' % filePath) # OsUtility.removeFile(srcUrl) + def getSrcDirUrl(self, dataDirectory): + (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) + if scheme: + srcUrl = '%s/' % (dataDirectory) + elif self.src is None: + srcUrl = 'file://%s/' % (dataDirectory) + else: + srcUrl = '%s/%s/' % (self.src,dataDirectory) + return srcUrl + + def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): + (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) + dirName = os.path.dirname(os.path.relpath(dataDirectory, dirPath)).strip() + if self.dest: + destUrl = '%s/%s/' % (self.dest, dirPath) + else: + destUrl = 'sshftp://%s/%s/%s/' % (storageHost, storageDirectory, dirName) + return destUrl + + def processDirectory(self, directoryInfo): + uploadInfo = directoryInfo.get('uploadInfo') + dataDirectory = uploadInfo.get('dataDirectory') + experimentName = uploadInfo.get('experimentName') + storageHost = uploadInfo.get('storageHost') + storageDirectory = uploadInfo.get('storageDirectory') + + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) + srcUrl = self.getSrcDirUrl(dataDirectory) + + # Transfer directory + self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) + self.start(src=srcUrl, dest=destUrl, templateInfo=uploadInfo, cwd=dataDirectory) + ####################################################################### # Testing. if __name__ == '__main__': diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 13da37709c38da56b7eed1f888244f339ab2090e..eb3e8e4e385763aaadd38c3774dd55308b8b728c 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -3,14 +3,15 @@ import os import time from dm.common.utility.loggingManager import LoggingManager +from dm.common.objects.observedFile import ObservedFile from dm.common.utility.timeUtility import TimeUtility from dm.common.processing.plugins.fileProcessor import FileProcessor from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi class MongoDbFileCatalogPlugin(FileProcessor): - def __init__(self): - FileProcessor.__init__(self) + def __init__(self, dependsOn=[]): + FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) @@ -57,6 +58,32 @@ class MongoDbFileCatalogPlugin(FileProcessor): self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) + def processDirectory(self, directoryInfo): + uploadInfo = directoryInfo.get('uploadInfo') + daqInfo = directoryInfo.get('daqInfo') + experiment = directoryInfo.get('experiment') + filePathsDict = directoryInfo.get('filePathsDict') + + uploadId = uploadInfo.get('id') + dataDirectory = uploadInfo.get('dataDirectory') + + nProcessedFiles = 0 + nFiles = len(filePathsDict) + for (filePath,filePathDict) in filePathsDict.items(): + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo.update(filePathDict) + fileInfo['daqInfo'] = daqInfo + fileInfo['uploadId'] = uploadId + + if uploadInfo.get('status') != 'aborting': + self.processFile(fileInfo) + nProcessedFiles += 1 + else: + nCancelledFiles = nFiles - nProcessedFiles + self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) + processingInfo = uploadInfo.get('processingInfo') + processingInfo[self.name]['status'] = 'aborted' + break ####################################################################### # Testing. diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 5f50169dc873281933be03f75dcc84e2ca574511..e088f2cce6c02f7378efb82b90805b5803dd7aaa 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -12,11 +12,11 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR' + DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP' DRY_RUN_COMMAND = 'rsync -arvlP' - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, - remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): - FileTransferPlugin.__init__(self, command, src, dest) + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): + FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum @@ -65,7 +65,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin): # Transfer file self.logger.debug('Starting transfer: %s' % fileInfo) - self.start(src=srcUrl, dest=destUrl, fileInfo=fileInfo, cwd=dataDirectory) + self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum if self.remoteMd5Sum: @@ -88,6 +88,20 @@ class RsyncFileTransferPlugin(FileTransferPlugin): self.logger.debug('Deleting file %s' % filePath) OsUtility.removeFile(srcUrl) + def processDirectory(self, directoryInfo): + uploadInfo = directoryInfo.get('uploadInfo') + dataDirectory = uploadInfo.get('dataDirectory') + experimentName = uploadInfo.get('experimentName') + storageHost = uploadInfo.get('storageHost') + storageDirectory = uploadInfo.get('storageDirectory') + + destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) + srcUrl = self.getSrcDirUrl(dataDirectory) + + # Transfer directory + self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) + self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo, cwd=dataDirectory) + ####################################################################### # Testing. if __name__ == '__main__': diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py index 855d8f39e90f6ba6eb787d59c1d764ef0a520984..515bff88f9f09666609d49b51cb44d4e61187d37 100755 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -34,7 +34,7 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): self.logger.debug('File info before transfer: %s' % fileInfo) # Transfer file - self.start(src=srcUrl, dest=destUrl, fileInfo=fileInfo, cwd=dataDirectory) + self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory) # Get remote checksum fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath diff --git a/src/python/dm/common/utility/dictUtility.py b/src/python/dm/common/utility/dictUtility.py index 9dc224db069b0e9e9a889778223b7d3b069219bb..beffed803bcd2327390d83d8bb30b5925b9af875 100755 --- a/src/python/dm/common/utility/dictUtility.py +++ b/src/python/dm/common/utility/dictUtility.py @@ -4,6 +4,13 @@ import copy class DictUtility: + @classmethod + def getAndRemoveKey(cls, dict, key, default=None): + value = dict.get(key, default) + if dict.has_key(key): + del dict[key] + return value + @classmethod def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]): dict2 = {} diff --git a/src/python/dm/common/utility/ldapLinuxPlatformUtility.py b/src/python/dm/common/utility/ldapLinuxPlatformUtility.py index 58f32b1a5a5fe2a8ce78a5ba37da39844d4d8a3a..76097df6b04ff964571930eaebcee114e480be2d 100755 --- a/src/python/dm/common/utility/ldapLinuxPlatformUtility.py +++ b/src/python/dm/common/utility/ldapLinuxPlatformUtility.py @@ -303,6 +303,13 @@ class LdapLinuxPlatformUtility: cmd = '%s \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) cls.executeSudoCommand(cmd) + @classmethod + def recursivelyChangePathGroupOwner(cls, path, groupName): + logger = cls.getLogger() + logger.debug('Recursively changing group owner to %s for path %s' % (groupName, path)) + cmd = '%s -R \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) + cls.executeSudoCommand(cmd) + @classmethod def refreshNscdGroupCache(cls): logger = cls.getLogger() diff --git a/src/python/dm/common/utility/linuxUtility.py b/src/python/dm/common/utility/linuxUtility.py index 3517eea34ed28c398ba5583240ebf1bfc02091a4..0fd3ccdfdf4a414283e8934aa40ee4b0a54c71f3 100755 --- a/src/python/dm/common/utility/linuxUtility.py +++ b/src/python/dm/common/utility/linuxUtility.py @@ -85,6 +85,13 @@ class LinuxUtility: cmd = '%s \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) cls.executeSudoCommand(cmd) + @classmethod + def recursivelyChangePathGroupOwner(cls, path, groupName): + logger = cls.getLogger() + logger.debug('Recursively changing group owner to %s for path %s' % (groupName, path)) + cmd = '%s -R \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) + cls.executeSudoCommand(cmd) + ####################################################################### # Testing. diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py index 241dfccf0782f1af48e3e5905f966aed229c176f..6f5f16301e73e84093e2f9f08f1a0b2d8a2622dc 100755 --- a/src/python/dm/daq_web_service/api/experimentRestApi.py +++ b/src/python/dm/daq_web_service/api/experimentRestApi.py @@ -8,6 +8,7 @@ from dm.common.utility.encoder import Encoder from dm.common.exceptions.dmException import DmException from dm.common.objects.experiment import Experiment from dm.common.objects.uploadInfo import UploadInfo +from dm.common.objects.pluginInfo import PluginInfo from dm.common.objects.daqInfo import DaqInfo from daqRestApi import DaqRestApi @@ -72,6 +73,12 @@ class ExperimentRestApi(DaqRestApi): responseDict = self.sendSessionRequest(url=url, method='POST') return UploadInfo(responseDict) + @DaqRestApi.execute + def getProcessingPlugins(self): + url = '%s/processingPlugins' % (self.getContextRoot()) + responseData = self.sendSessionRequest(url=url, method='GET') + return self.toDmObjectList(responseData, PluginInfo) + ####################################################################### # Testing. diff --git a/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py b/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py new file mode 100755 index 0000000000000000000000000000000000000000..07afe11daf5d42a883a1f75f1e92b72c43e1547c --- /dev/null +++ b/src/python/dm/daq_web_service/cli/getProcessingPluginsCli.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi +from daqWebServiceSessionCli import DaqWebServiceSessionCli + +class GetProcessingPluginsCli(DaqWebServiceSessionCli): + def __init__(self): + DaqWebServiceSessionCli.__init__(self) + + def runCommand(self): + self.parseArgs(usage=""" + dm-get-processing-plugins + +Description: + Retrieves list of known processing plugins. + """) + api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) + plugins = api.getProcessingPlugins() + for plugin in plugins: + print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) + + +####################################################################### +# Run command. +if __name__ == '__main__': + cli = GetProcessingPluginsCli() + cli.run() diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py index 04c019e41941b39133b1a1858c0c723d826b531b..51956cf4a56d3f984acd883acb2e05e6021087c4 100755 --- a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py +++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py @@ -72,6 +72,16 @@ class ExperimentRouteDescriptor: 'action' : 'stopUpload', 'method' : ['POST'] }, + + # Get processing plugins + { + 'name' : 'getProcessingPlugins', + 'path' : '%s/processingPlugins' % contextRoot, + 'controller' : experimentSessionController, + 'action' : 'getProcessingPlugins', + 'method' : ['GET'] + }, + ] return routes diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py index 80036d9a52ed23bab3d2bbc17ab49d3ea14064b1..b9f02f2dade235710ee6a372f8c61b66f1d5dabc 100755 --- a/src/python/dm/daq_web_service/service/experimentSessionController.py +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -4,9 +4,11 @@ import cherrypy import json import os +from dm.common.constants import dmProcessingMode from dm.common.service.dmSessionController import DmSessionController from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.utility.encoder import Encoder +from dm.common.utility.dictUtility import DictUtility from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl @@ -77,7 +79,13 @@ class ExperimentSessionController(DmSessionController): encodedDaqInfo = kwargs.get('daqInfo') if encodedDaqInfo: daqInfo = json.loads(Encoder.decode(encodedDaqInfo)) - response = self.experimentSessionControllerImpl.upload(experimentName, dataDirectory, daqInfo).getFullJsonRep() + processingMode = DictUtility.getAndRemoveKey(daqInfo, 'processingMode', dmProcessingMode.DM_PROCESSING_MODE_FILES) + if processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST: + raise InvalidRequest('Allowed processing modes: %s' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST) + if processingMode == dmProcessingMode.DM_PROCESSING_MODE_FILES: + response = self.experimentSessionControllerImpl.uploadFiles(experimentName, dataDirectory, daqInfo).getFullJsonRep() + else: + response = self.experimentSessionControllerImpl.uploadDirectory(experimentName, dataDirectory, daqInfo).getFullJsonRep() self.logger.debug('Returning upload info for directory %s' % dataDirectory) return response @@ -96,3 +104,10 @@ class ExperimentSessionController(DmSessionController): response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep() self.logger.debug('Stopped upload id %s' % id) return response + + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def getProcessingPlugins(self, **kwargs): + return self.listToJson(self.experimentSessionControllerImpl.getProcessingPlugins()) + diff --git a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py index 35cd9bbb1eb1348f33f1f1fbd5e02e57848fd3f2..68cf9d02fdad25c075f6133b091ff14a5bead522 100755 --- a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py +++ b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py @@ -8,8 +8,8 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class DsProcessFileNotificationPlugin(FileProcessor): - def __init__(self): - FileProcessor.__init__(self) + def __init__(self, dependsOn=[]): + FileProcessor.__init__(self, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) @@ -30,6 +30,19 @@ class DsProcessFileNotificationPlugin(FileProcessor): self.logger.debug('File info sent to DS service: %s' % (str(fileInfo2))) self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2) + def processDirectory(self, directoryInfo): + uploadInfo = directoryInfo.get('uploadInfo') + experimentName = uploadInfo.get('experimentName') + experimentDirectoryPath = '' + daqInfo = directoryInfo.get('daqInfo') + + directoryInfo2 = {} + directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath + directoryInfo2['experimentName'] = experimentName + directoryInfo2['daqInfo'] = daqInfo + self.logger.debug('Directory info sent to DS service: %s' % (str(directoryInfo2))) + self.dsFileApi.processDirectory(experimentDirectoryPath, experimentName, directoryInfo2) + ####################################################################### # Testing. if __name__ == '__main__': diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 651e5bed4f80ed11c9da5c0901bb36bf1944d0fe..c05d1f5d37cb9ec5420c683e55bd6a2820e6e242 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -16,6 +16,8 @@ from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.objects.observedFile import ObservedFile from dm.common.objects.uploadInfo import UploadInfo +from dm.common.objects.pluginInfo import PluginInfo +from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.utility.fileUtility import FileUtility from dm.common.utility.timeUtility import TimeUtility @@ -30,6 +32,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 + DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 def __init__(self): DmObjectManager.__init__(self) @@ -45,21 +48,21 @@ class ExperimentSessionControllerImpl(DmObjectManager): FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment) return daqInfo - def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False): + def stopDaq(self, experimentName, dataDirectory): experiment = self.dsExperimentApi.getExperimentByName(experimentName) daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() return daqInfo - def getDaqInfo(self, id, includeFileDetails=False): + def getDaqInfo(self, id): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() return daqInfo - def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): + def uploadFiles(self, experimentName, dataDirectory, daqInfo): experiment = self.dsExperimentApi.getExperimentByName(experimentName) UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) @@ -101,12 +104,13 @@ class ExperimentSessionControllerImpl(DmObjectManager): UploadTracker.getInstance().startUpload(uploadId, uploadInfo) uploadInfo['nFiles'] = len(filePathsDict) + uploadInfo['status'] = 'running' self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) - timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo - def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): + def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') @@ -147,4 +151,129 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadInfo.updateStatus() return uploadInfo + def uploadDirectory(self, experimentName, dataDirectory, daqInfo): + experiment = self.dsExperimentApi.getExperimentByName(experimentName) + UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory) + + experiment['daqInfo'] = daqInfo + storageDirectory = experiment.get('storageDirectory') + if storageDirectory is None: + raise InvalidRequest('Experiment %s has not been started.' % experimentName) + filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) + uploadId = str(uuid.uuid4()) + self.logger.debug('Starting upload id %s' % uploadId) + uploadInfo = DirectoryUploadInfo(daqInfo) + uploadInfo['id'] = uploadId + uploadInfo['experimentName'] = experimentName + uploadInfo['storageDirectory'] = experiment.get('storageDirectory') + uploadInfo['storageHost'] = experiment.get('storageHost') + uploadInfo['storageUrl'] = experiment.get('storageUrl') + uploadInfo['dataDirectory'] = dataDirectory + + startTime = time.time() + uploadInfo['startTime'] = startTime + uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime) + daqInfo['experimentName'] = experimentName + daqInfo['storageDirectory'] = experiment.get('storageDirectory') + daqInfo['storageHost'] = experiment.get('storageHost') + daqInfo['storageUrl'] = experiment.get('storageUrl') + daqInfo['dataDirectory'] = dataDirectory + daqInfo['uploadId'] = uploadId + + skipProcessing = DictUtility.getAndRemoveKey(daqInfo, 'skipProcessing', '') + skipProcessingList = skipProcessing.split(',') + fileProcessingManager = FileProcessingManager.getInstance() + processingInfo = {} + uploadInfo['processingInfo'] = processingInfo + for processorKey in fileProcessingManager.fileProcessorKeyList: + processor = fileProcessingManager.fileProcessorDict.get(processorKey) + processorName = processor.name + if processorName in skipProcessingList: + processingInfo[processorName] = {'status' : 'skipped'} + else: + self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory)) + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict]) + processingInfo[processorName] = {'status' : 'pending'} + timer.start() + + UploadTracker.getInstance().startUpload(uploadId, uploadInfo) + uploadInfo['nFiles'] = len(filePathsDict) + uploadInfo['status'] = 'running' + return uploadInfo + def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict): + uploadId = uploadInfo.get('id') + dataDirectory = uploadInfo.get('dataDirectory') + processorName = processor.name + processingInfo = uploadInfo.get('processingInfo') + self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName)) + try: + dependsOn = processor.dependsOn + while True: + # Check status + if uploadInfo['status'] == 'aborting': + processingInfo[processorName]['status'] = 'aborted' + return + + # Check that processor can proceed + canProcess = False + if not len(dependsOn): + canProcess = True + for depProcessorName in dependsOn: + depProcessorStatus = processingInfo.get(depProcessorName).get('status') + if depProcessorStatus in ['skipped', 'aborted', 'failed']: + # We must skip processing + self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) + processingInfo[processorName]['status'] = 'skipped' + return + elif depProcessorStatus in ['pending', 'running']: + # Do nothing + pass + elif depProcessorStatus == 'done': + # We can proceed + canProcess = True + else: + # This should not happen + self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus)) + processingInfo[processorName]['status'] = 'skipped' + return + # Process directory if we can + if canProcess: + directoryInfo = {'uploadInfo' : uploadInfo, + 'daqInfo' : daqInfo, + 'experiment' : experiment, + 'filePathsDict' : filePathsDict + } + processingInfo[processorName]['status'] = 'running' + processingStartTime = time.time() + processor.processDirectory(directoryInfo) + if processingInfo[processorName]['status'] == 'running': + processingInfo[processorName]['status'] = 'done' + self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName)) + else: + self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status'])) + break + + # Wait a bit longer + time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS) + + except Exception, ex: + self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex))) + processingInfo[processorName]['status'] = 'failed' + processingInfo[processorName]['processingError'] = str(ex) + + processingEndTime = time.time() + processingInfo[processorName]['processingEndTime'] = processingEndTime + processingInfo[processorName]['processingStartTime'] = processingStartTime + processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime + + + def getProcessingPlugins(self): + pluginList = [] + fileProcessingManager = FileProcessingManager.getInstance() + for processorKey in fileProcessingManager.fileProcessorKeyList: + processor = fileProcessingManager.fileProcessorDict.get(processorKey) + pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn} + pluginList.append(PluginInfo(pluginInfo)) + return pluginList + diff --git a/src/python/dm/ds_web_service/api/fileRestApi.py b/src/python/dm/ds_web_service/api/fileRestApi.py index 52cf596eef2f7d795662d86159cc81539b69d7ba..1a00c3ed03bd824a10871fa1e7a75aaebd2e0571 100755 --- a/src/python/dm/ds_web_service/api/fileRestApi.py +++ b/src/python/dm/ds_web_service/api/fileRestApi.py @@ -9,6 +9,7 @@ from dm.common.utility.encoder import Encoder from dm.common.exceptions.dmException import DmException from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.objects.fileMetadata import FileMetadata +from dm.common.objects.directoryMetadata import DirectoryMetadata from dm.common.objects.experiment import Experiment from dm.common.utility.rsyncFileTransfer import RsyncFileTransfer from dsRestApi import DsRestApi @@ -44,6 +45,17 @@ class FileRestApi(DsRestApi): responseDict = self.sendSessionRequest(url=url, method='POST') return FileMetadata(responseDict) + @DsRestApi.execute + def processDirectory(self, experimentDirectoryPath, experimentName, directoryInfo={}): + url = '%s/files/processDirectory' % (self.getContextRoot()) + if not experimentName: + raise InvalidRequest('Experiment name must be provided.') + directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath + directoryInfo['experimentName'] = experimentName + url += '?directoryInfo=%s' % (Encoder.encode(json.dumps(directoryInfo))) + responseDict = self.sendSessionRequest(url=url, method='POST') + return DirectoryMetadata(responseDict) + @DsRestApi.execute def download(self, experimentName, experimentFilePath='', destDirectory='.'): username = getpass.getuser() diff --git a/src/python/dm/ds_web_service/service/fileRouteDescriptor.py b/src/python/dm/ds_web_service/service/fileRouteDescriptor.py index 29955c5ada0b4e9a646e9633e6e5e327fad8bbee..1d323741c7f323a5b0dc4d5009928f6e7548aea7 100755 --- a/src/python/dm/ds_web_service/service/fileRouteDescriptor.py +++ b/src/python/dm/ds_web_service/service/fileRouteDescriptor.py @@ -37,6 +37,14 @@ class FileRouteDescriptor: 'method' : ['POST'] }, + # Process directory + { + 'name' : 'processDirectory', + 'path' : '%s/files/processDirectory' % contextRoot, + 'controller' : fileSessionController, + 'action' : 'processDirectory', + 'method' : ['POST'] + }, ] return routes diff --git a/src/python/dm/ds_web_service/service/fileSessionController.py b/src/python/dm/ds_web_service/service/fileSessionController.py index b28d89872ddb8700e33c6648837778d70177203d..914cc2677f8bf928b76a185f40b1c4933427d19c 100755 --- a/src/python/dm/ds_web_service/service/fileSessionController.py +++ b/src/python/dm/ds_web_service/service/fileSessionController.py @@ -49,3 +49,19 @@ class FileSessionController(DmSessionController): response = self.fileSessionControllerImpl.statFile(fileInfo).getFullJsonRep() self.logger.debug('Returning: %s' % response) return response + + @cherrypy.expose + @DmSessionController.require(DmSessionController.isAdministrator()) + @DmSessionController.execute + def processDirectory(self, **kwargs): + encodedDirectoryInfo = kwargs.get('directoryInfo') + if not encodedDirectoryInfo: + raise InvalidRequest('Invalid directory info provided.') + directoryInfo = json.loads(Encoder.decode(encodedDirectoryInfo)) + + if not directoryInfo.has_key('experimentName'): + raise InvalidRequest('Experiment name is missing.') + response = self.fileSessionControllerImpl.processDirectory(directoryInfo).getFullJsonRep() + self.logger.debug('Returning: %s' % response) + return response + diff --git a/src/python/dm/ds_web_service/service/impl/experimentManager.py b/src/python/dm/ds_web_service/service/impl/experimentManager.py index d12061216e39ff8db6157de9330f5a3d60d61a35..3cc84f195f809f4556ce3ed6c711f201bbe90904 100755 --- a/src/python/dm/ds_web_service/service/impl/experimentManager.py +++ b/src/python/dm/ds_web_service/service/impl/experimentManager.py @@ -220,6 +220,35 @@ class ExperimentManager(Singleton): self.logger.debug('File path %s does not exist' % filePath) raise ObjectNotFound('File %s does not exist' % filePath) + @ThreadingUtility.synchronize + def processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}): + experimentName = experiment.get('name') + self.updateExperimentWithStorageDataDirectory(experiment) + storageDirectory = experiment.get('storageDirectory') + directoryPath = os.path.join(storageDirectory, experimentDirectoryPath) + directoryInfo['directoryPath'] = directoryPath + directoryInfo['experiment'] = experiment + if os.path.exists(directoryPath): + self.logger.debug('Processing directory path %s (directoryInfo: %s)' % (directoryPath, directoryInfo)) + if self.manageStoragePermissions: + self.logger.debug('Modifying permissions for directory %s' % directoryPath) + OsUtility.chmodPath(directoryPath, fileMode=self.FILE_PERMISSIONS_MODE) + self.logger.debug('Changing group owner for %s to %s' % (directoryPath, experimentName)) + self.platformUtility.recursivelyChangePathGroupOwner(directoryPath, experimentName) + # Recursively modify subdirectory permissions + dirPath = os.path.dirname(directoryPath) + while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)): + if self.pathTracker.get(dirPath) is None: + self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName)) + self.platformUtility.changePathGroupOwner(dirPath, experimentName) + ownerUpdateTime = time.time() + self.pathTracker.put(dirPath, ownerUpdateTime) + else: + self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName)) + dirPath = os.path.dirname(dirPath) + else: + self.logger.debug('Directory path %s does not exist' % directoryPath) + @ThreadingUtility.synchronize def start(self): self.logger.debug('Started experiment manager') diff --git a/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py b/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py index f527e6e1fc67555f8cd2cd9767e293fbb7eca920..0c9d9ac21922f56047b8c9437ebb3a0170d2dd9d 100755 --- a/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py +++ b/src/python/dm/ds_web_service/service/impl/fileSessionControllerImpl.py @@ -9,6 +9,7 @@ import time from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.objects.fileMetadata import FileMetadata +from dm.common.objects.directoryMetadata import DirectoryMetadata from dm.common.db.api.experimentDbApi import ExperimentDbApi from dm.ds_web_service.service.impl.experimentManager import ExperimentManager @@ -32,3 +33,12 @@ class FileSessionControllerImpl(DmObjectManager): experiment = self.experimentDbApi.getExperimentByName(experimentName) ExperimentManager.getInstance().statExperimentFile(experimentFilePath, experiment, fileInfo) return FileMetadata(fileInfo) + + def processDirectory(self, directoryInfo): + experimentDirectoryPath = directoryInfo.get('experimentDirectoryPath', '') + experimentName = directoryInfo.get('experimentName') + experiment = self.experimentDbApi.getExperimentByName(experimentName) + ExperimentManager.getInstance().processExperimentDirectory(experimentDirectoryPath, experiment, directoryInfo) + return DirectoryMetadata(directoryInfo) + +