Skip to content
Snippets Groups Projects
Commit a08e668a authored by sveseli's avatar sveseli
Browse files

added directory processing mode for uploads

parent 5a154835
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,11 @@ import cherrypy
import json
import os
from dm.common.constants import dmProcessingMode
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.encoder import Encoder
from dm.common.utility.dictUtility import DictUtility
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
......@@ -77,7 +79,13 @@ class ExperimentSessionController(DmSessionController):
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
response = self.experimentSessionControllerImpl.upload(experimentName, dataDirectory, daqInfo).getFullJsonRep()
processingMode = DictUtility.getAndRemoveKey(daqInfo, 'processingMode', dmProcessingMode.DM_PROCESSING_MODE_FILES)
if processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Allowed processing modes: %s' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
if processingMode == dmProcessingMode.DM_PROCESSING_MODE_FILES:
response = self.experimentSessionControllerImpl.uploadFiles(experimentName, dataDirectory, daqInfo).getFullJsonRep()
else:
response = self.experimentSessionControllerImpl.uploadDirectory(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Returning upload info for directory %s' % dataDirectory)
return response
......@@ -96,3 +104,10 @@ class ExperimentSessionController(DmSessionController):
response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep()
self.logger.debug('Stopped upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getProcessingPlugins(self, **kwargs):
return self.listToJson(self.experimentSessionControllerImpl.getProcessingPlugins())
......@@ -8,8 +8,8 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class DsProcessFileNotificationPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
def __init__(self, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
......@@ -30,6 +30,19 @@ class DsProcessFileNotificationPlugin(FileProcessor):
self.logger.debug('File info sent to DS service: %s' % (str(fileInfo2)))
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
experimentName = uploadInfo.get('experimentName')
experimentDirectoryPath = ''
daqInfo = directoryInfo.get('daqInfo')
directoryInfo2 = {}
directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath
directoryInfo2['experimentName'] = experimentName
directoryInfo2['daqInfo'] = daqInfo
self.logger.debug('Directory info sent to DS service: %s' % (str(directoryInfo2)))
self.dsFileApi.processDirectory(experimentDirectoryPath, experimentName, directoryInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
......
......@@ -16,6 +16,8 @@ from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
......@@ -30,6 +32,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
def __init__(self):
DmObjectManager.__init__(self)
......@@ -45,21 +48,21 @@ class ExperimentSessionControllerImpl(DmObjectManager):
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
return daqInfo
def stopDaq(self, experimentName, dataDirectory, includeFileDetails=False):
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqInfo.updateStatus()
return daqInfo
def getDaqInfo(self, id, includeFileDetails=False):
def getDaqInfo(self, id):
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo
def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False):
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
......@@ -101,12 +104,13 @@ class ExperimentSessionControllerImpl(DmObjectManager):
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = 'running'
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict):
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
......@@ -147,4 +151,129 @@ class ExperimentSessionControllerImpl(DmObjectManager):
uploadInfo.updateStatus()
return uploadInfo
def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipProcessing = DictUtility.getAndRemoveKey(daqInfo, 'skipProcessing', '')
skipProcessingList = skipProcessing.split(',')
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipProcessingList:
processingInfo[processorName] = {'status' : 'skipped'}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : 'pending'}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = 'running'
return uploadInfo
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
processorName = processor.name
processingInfo = uploadInfo.get('processingInfo')
self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName))
try:
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == 'aborting':
processingInfo[processorName]['status'] = 'aborted'
return
# Check that processor can proceed
canProcess = False
if not len(dependsOn):
canProcess = True
for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in ['skipped', 'aborted', 'failed']:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
return
elif depProcessorStatus in ['pending', 'running']:
# Do nothing
pass
elif depProcessorStatus == 'done':
# We can proceed
canProcess = True
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = 'skipped'
return
# Process directory if we can
if canProcess:
directoryInfo = {'uploadInfo' : uploadInfo,
'daqInfo' : daqInfo,
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = 'running'
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == 'running':
processingInfo[processorName]['status'] = 'done'
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
break
# Wait a bit longer
time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = 'failed'
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
processingInfo[processorName]['processingEndTime'] = processingEndTime
processingInfo[processorName]['processingStartTime'] = processingStartTime
processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime
def getProcessingPlugins(self):
pluginList = []
fileProcessingManager = FileProcessingManager.getInstance()
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
pluginList.append(PluginInfo(pluginInfo))
return pluginList
......@@ -9,6 +9,7 @@ from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.fileMetadata import FileMetadata
from dm.common.objects.directoryMetadata import DirectoryMetadata
from dm.common.objects.experiment import Experiment
from dm.common.utility.rsyncFileTransfer import RsyncFileTransfer
from dsRestApi import DsRestApi
......@@ -44,6 +45,17 @@ class FileRestApi(DsRestApi):
responseDict = self.sendSessionRequest(url=url, method='POST')
return FileMetadata(responseDict)
@DsRestApi.execute
def processDirectory(self, experimentDirectoryPath, experimentName, directoryInfo={}):
url = '%s/files/processDirectory' % (self.getContextRoot())
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath
directoryInfo['experimentName'] = experimentName
url += '?directoryInfo=%s' % (Encoder.encode(json.dumps(directoryInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DirectoryMetadata(responseDict)
@DsRestApi.execute
def download(self, experimentName, experimentFilePath='', destDirectory='.'):
username = getpass.getuser()
......
......@@ -37,6 +37,14 @@ class FileRouteDescriptor:
'method' : ['POST']
},
# Process directory
{
'name' : 'processDirectory',
'path' : '%s/files/processDirectory' % contextRoot,
'controller' : fileSessionController,
'action' : 'processDirectory',
'method' : ['POST']
},
]
return routes
......
......@@ -49,3 +49,19 @@ class FileSessionController(DmSessionController):
response = self.fileSessionControllerImpl.statFile(fileInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def processDirectory(self, **kwargs):
encodedDirectoryInfo = kwargs.get('directoryInfo')
if not encodedDirectoryInfo:
raise InvalidRequest('Invalid directory info provided.')
directoryInfo = json.loads(Encoder.decode(encodedDirectoryInfo))
if not directoryInfo.has_key('experimentName'):
raise InvalidRequest('Experiment name is missing.')
response = self.fileSessionControllerImpl.processDirectory(directoryInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
......@@ -220,6 +220,35 @@ class ExperimentManager(Singleton):
self.logger.debug('File path %s does not exist' % filePath)
raise ObjectNotFound('File %s does not exist' % filePath)
@ThreadingUtility.synchronize
def processExperimentDirectory(self, experimentDirectoryPath, experiment, directoryInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
directoryPath = os.path.join(storageDirectory, experimentDirectoryPath)
directoryInfo['directoryPath'] = directoryPath
directoryInfo['experiment'] = experiment
if os.path.exists(directoryPath):
self.logger.debug('Processing directory path %s (directoryInfo: %s)' % (directoryPath, directoryInfo))
if self.manageStoragePermissions:
self.logger.debug('Modifying permissions for directory %s' % directoryPath)
OsUtility.chmodPath(directoryPath, fileMode=self.FILE_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (directoryPath, experimentName))
self.platformUtility.recursivelyChangePathGroupOwner(directoryPath, experimentName)
# Recursively modify subdirectory permissions
dirPath = os.path.dirname(directoryPath)
while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
if self.pathTracker.get(dirPath) is None:
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(dirPath, ownerUpdateTime)
else:
self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
dirPath = os.path.dirname(dirPath)
else:
self.logger.debug('Directory path %s does not exist' % directoryPath)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Started experiment manager')
......
......@@ -9,6 +9,7 @@ import time
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.fileMetadata import FileMetadata
from dm.common.objects.directoryMetadata import DirectoryMetadata
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.ds_web_service.service.impl.experimentManager import ExperimentManager
......@@ -32,3 +33,12 @@ class FileSessionControllerImpl(DmObjectManager):
experiment = self.experimentDbApi.getExperimentByName(experimentName)
ExperimentManager.getInstance().statExperimentFile(experimentFilePath, experiment, fileInfo)
return FileMetadata(fileInfo)
def processDirectory(self, directoryInfo):
experimentDirectoryPath = directoryInfo.get('experimentDirectoryPath', '')
experimentName = directoryInfo.get('experimentName')
experiment = self.experimentDbApi.getExperimentByName(experimentName)
ExperimentManager.getInstance().processExperimentDirectory(experimentDirectoryPath, experiment, directoryInfo)
return DirectoryMetadata(directoryInfo)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment