Skip to content
Snippets Groups Projects
Commit 27aacb57 authored by sveseli's avatar sveseli
Browse files

changes related to file processing framework

parent 8860bb05
No related branches found
No related tags found
No related merge requests found
Showing
with 132 additions and 101 deletions
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.utility.encoder import Encoder
......@@ -14,35 +15,37 @@ class ExperimentRestApi(DaqRestApi):
DaqRestApi.__init__(self, username, password, host, port, protocol)
@DaqRestApi.execute
def startDaq(self, name, dataDirectory):
def startDaq(self, experimentName, dataDirectory, daqInfo={}):
url = '%s/experiments/startDaq' % (self.getContextRoot())
if name is None or not len(name):
if experimentName is None or not len(experimentName):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Experiment data directory must be provided.')
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
daqInfo['experimentName'] = experimentName
daqInfo['dataDirectory'] = dataDirectory
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
@DaqRestApi.execute
def stopDaq(self, name):
def stopDaq(self, experimentName):
url = '%s/experiments/stopDaq' % (self.getContextRoot())
if name is None or not len(name):
if experimentName is None or not len(experimentName):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
url += '?experimentName=%s' % Encoder.encode(experimentName)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
@DaqRestApi.execute
def upload(self, name, dataDirectory):
def upload(self, experimentName, dataDirectory, daqInfo={}):
url = '%s/experiments/upload' % (self.getContextRoot())
if name is None or not len(name):
if experimentName is None or not len(experimentName):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Experiment data directory must be provided.')
url += '&dataDirectory=%s' % Encoder.encode(dataDirectory)
daqInfo['experimentName'] = experimentName
daqInfo['dataDirectory'] = dataDirectory
url += '?daqInfo=%s' % (Encoder.encode(json.dumps(daqInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
......
#!/usr/bin/env python
import cherrypy
import json
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
......@@ -19,15 +20,16 @@ class ExperimentSessionController(DmSessionController):
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def startDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
dataDirectory = kwargs.get('dataDirectory')
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Missing experiment data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.startDaq(name, dataDirectory).getFullJsonRep()
encodedDaqInfo = kwargs.get('daqInfo')
if not encodedDaqInfo:
raise InvalidRequest('Invalid DAQ info provided.')
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
if not daqInfo.has_key('experimentName'):
raise InvalidRequest('Experiment name is missing.')
if not daqInfo.has_key('dataDirectory'):
raise InvalidRequest('Data directory is missing.')
response = self.experimentSessionControllerImpl.startDaq(daqInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
......@@ -35,11 +37,11 @@ class ExperimentSessionController(DmSessionController):
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def stopDaq(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
experimentName = kwargs.get('experimentName')
if not experimentName:
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
response = self.experimentSessionControllerImpl.stopDaq(name).getFullJsonRep()
experimentName = Encoder.decode(experimentName)
response = self.experimentSessionControllerImpl.stopDaq(experimentName).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
......@@ -47,15 +49,16 @@ class ExperimentSessionController(DmSessionController):
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def upload(self, **kwargs):
name = kwargs.get('name')
if name is None or not len(name):
raise InvalidRequest('Missing experiment name.')
name = Encoder.decode(name)
dataDirectory = kwargs.get('dataDirectory')
if dataDirectory is None or not len(dataDirectory):
raise InvalidRequest('Missing experiment data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.upload(name, dataDirectory).getFullJsonRep()
encodedDaqInfo = kwargs.get('daqInfo')
if not encodedDaqInfo:
raise InvalidRequest('Invalid DAQ info provided.')
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
if not daqInfo.has_key('experimentName'):
raise InvalidRequest('Experiment name is missing.')
if not daqInfo.has_key('dataDirectory'):
raise InvalidRequest('Data directory is missing.')
response = self.experimentSessionControllerImpl.upload(daqInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
......@@ -8,11 +8,11 @@ from dm.common.utility.loggingManager import LoggingManager
class DmFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, fileSystemObserver, daqPath, experiment):
def __init__(self, fileSystemObserver, dataDirectory, experiment):
FileSystemEventHandler.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = fileSystemObserver
self.daqPath = daqPath
self.dataDirectory = dataDirectory
self.experiment = experiment
def dispatch(self, event):
......@@ -32,7 +32,7 @@ class DmFileSystemEventHandler(FileSystemEventHandler):
filePath = event.src_path
self.logger.debug('File system modified event: %s' % (event.__dict__))
if not event.is_directory:
self.fileSystemObserver.observedFileUpdated(filePath, self.daqPath, self.experiment)
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
......
......@@ -12,14 +12,18 @@ class DsProcessFileNotificationPlugin(FileProcessor):
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, filePath, daqPath, experiment):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experiment = fileInfo.get('experiment')
experimentName = experiment.get('name')
storageDirectory = experiment.get('storageDirectory')
fileName = os.path.basename(filePath)
daqFilePath = os.path.join(storageDirectory, fileName)
self.logger.debug('Processing file %s' % filePath)
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
self.dsFileApi.processFile(fileName, daqFilePath, experimentName)
# Prepare dictionary for processing. Only send needed data.
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['daqInfo'] = experiment.get('daqInfo')
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
#######################################################################
# Testing.
......
......@@ -11,10 +11,10 @@ from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.osUtility import OsUtility
from dm.common.objects.observedFile import ObservedFile
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from fileSystemObserver import FileSystemObserver
from fileProcessingManager import FileProcessingManager
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
......@@ -23,37 +23,51 @@ class ExperimentSessionControllerImpl(DmObjectManager):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, name, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(name)
def startDaq(self, daqInfo):
experimentName = daqInfo.get('experimentName')
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
oldDaqInfo = experiment.get('daqInfo')
if oldDaqInfo.get('daqEndTime') is None:
raise InvalidRequest('DAQ for experiment %s is already active in directory %s.' % (experimentName,oldDaqInfo.get('dataDirectory')))
dataDirectory = daqInfo.get('dataDirectory')
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % name)
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
startTime = time.time()
experiment['daqDataDirectory'] = dataDirectory
experiment['daqStartTime'] = startTime
daqInfo['daqStartTime'] = startTime
experiment['daqInfo'] = daqInfo
self.logger.debug('Starting DAQ %s' % daqInfo)
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
ExperimentTracker.getInstance().put(name, experiment)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
def stopDaq(self, name):
experiment = ExperimentTracker.getInstance().get(name)
if experiment is None or experiment.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % name)
dataDirectory = experiment.get('dataDirectory')
experiment['daqEndTime'] = time.time()
def stopDaq(self, experimentName):
experiment = ExperimentTracker.getInstance().get(experimentName)
if experiment is not None:
daqInfo = experiment.get('daqInfo')
if experiment is None or daqInfo.get('daqEndTime') is not None:
raise InvalidRequest('Experiment %s is not active.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
daqInfo['daqEndTime'] = time.time()
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return experiment
def upload(self, name, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(name)
def upload(self, daqInfo):
experimentName = daqInfo.get('experimentName')
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % name)
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
filePaths = OsUtility.findFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePaths:
observedFile = ObservedFile(filePath=filePath, daqPath=dataDirectory, experiment=experiment)
fileProcessingManager.processObservedFile(observedFile)
ExperimentTracker.getInstance().put(name, experiment)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileProcessingManager.processFile(fileInfo)
ExperimentTracker.getInstance().put(experimentName, experiment)
return experiment
......@@ -56,25 +56,25 @@ class FileSystemObserver(threading.Thread,Singleton):
self.fileSystemTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
@ThreadingUtility.synchronize
def startObservingPath(self, daqPath, experiment):
self.logger.debug('Starting observer for %s' % daqPath)
eventHandler = DmFileSystemEventHandler(self, daqPath, experiment)
observedWatch = self.observer.schedule(eventHandler, daqPath, recursive=True)
self.observedWatchDict[daqPath] = observedWatch
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self, dataDirectory, experiment)
observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
self.observedWatchDict[dataDirectory] = observedWatch
@ThreadingUtility.synchronize
def stopObservingPath(self, daqPath, experiment):
observedWatch = self.observedWatchDict.get(daqPath)
def stopObservingPath(self, dataDirectory, experiment):
observedWatch = self.observedWatchDict.get(dataDirectory)
if observedWatch:
self.logger.debug('Stopping observer for %s' % daqPath)
self.logger.debug('Stopping observer for %s' % dataDirectory)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[daqPath]
del self.observedWatchDict[dataDirectory]
else:
self.logger.debug('Observer for %s is not active' % daqPath)
self.logger.debug('Observer for %s is not active' % dataDirectory)
@ThreadingUtility.synchronize
def observedFileUpdated(self, filePath, daqPath, experiment):
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, daqPath=daqPath, experiment=experiment))
def fileUpdated(self, filePath, dataDirectory, experiment):
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
observedFile.setLastUpdatedTimestampToNow()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
......@@ -92,12 +92,12 @@ class FileSystemObserver(threading.Thread,Singleton):
return filePathsForProcessing
@ThreadingUtility.synchronize
def processObservedFile(self, filePath):
def processFile(self, filePath):
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.processObservedFile(observedFile)
self.fileProcessingManager.processFile(observedFile)
@ThreadingUtility.synchronize
def start(self):
......@@ -118,7 +118,7 @@ class FileSystemObserver(threading.Thread,Singleton):
filePathsForProcessing = self.checkObservedFilesForProcessing()
for filePath in filePathsForProcessing:
self.processObservedFile(filePath)
self.processFile(filePath)
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemTimeoutInSeconds)
......
......@@ -2,6 +2,7 @@
import os
import urllib
import json
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
......@@ -14,17 +15,15 @@ class FileRestApi(DsRestApi):
DsRestApi.__init__(self, username, password, host, port, protocol)
@DsRestApi.execute
def processFile(self, fileName, filePath, experimentName):
def processFile(self, filePath, experimentName, fileInfo={}):
url = '%s/files/processFile' % (self.getContextRoot())
if not fileName:
raise InvalidRequest('File name must be provided.')
url += '?fileName=%s' % Encoder.encode(fileName)
if not filePath:
raise InvalidRequest('File path must be provided.')
url += '&filePath=%s' % Encoder.encode(filePath)
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
url += '&experimentName=%s' % Encoder.encode(experimentName)
fileInfo['filePath'] = filePath
fileInfo['experimentName'] = experimentName
url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return FileMetadata(responseDict)
......
#!/usr/bin/env python
import cherrypy
import json
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
......@@ -19,20 +20,16 @@ class FileSessionController(DmSessionController):
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def processFile(self, **kwargs):
fileName = kwargs.get('fileName')
if not fileName:
raise InvalidRequest('Missing file name.')
fileName = Encoder.decode(fileName)
filePath = kwargs.get('filePath')
if not filePath:
raise InvalidRequest('Missing file path.')
filePath = Encoder.decode(filePath)
experimentName = kwargs.get('experimentName')
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
response = self.fileSessionControllerImpl.processFile(fileName, filePath, experimentName).getFullJsonRep()
encodedFileInfo = kwargs.get('fileInfo')
if not encodedFileInfo:
raise InvalidRequest('Invalid file info provided.')
fileInfo = json.loads(Encoder.decode(encodedFileInfo))
if not fileInfo.has_key('experimentFilePath'):
raise InvalidRequest('Experiment file path is missing.')
if not fileInfo.has_key('experimentName'):
raise InvalidRequest('Experiment name is missing.')
response = self.fileSessionControllerImpl.processFile(fileInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
......@@ -12,6 +12,7 @@ from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.common.processing.fileProcessingManager import FileProcessingManager
class ExperimentManager(Singleton):
......@@ -40,6 +41,7 @@ class ExperimentManager(Singleton):
self.experimentDbApi = ExperimentDbApi()
self.platformUtility = None
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
ExperimentManager.__instanceLock.release()
......@@ -94,13 +96,20 @@ class ExperimentManager(Singleton):
self.platformUtility.addUserToGroup(username, experimentName)
@ThreadingUtility.synchronize
def processExperimentFile(self, fileName, filePath, experiment):
def processExperimentFile(self, experimentFilePath, experiment, fileInfo={}):
experimentName = experiment.get('name')
self.updateExperimentWithStorageDataDirectory(experiment)
storageDirectory = experiment.get('storageDirectory')
filePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['filePath'] = filePath
fileInfo['experiment'] = experiment
if os.path.exists(filePath):
self.logger.debug('Processing file %s' % filePath)
self.logger.debug('Processing file path %s (fileInfo: %s)' % (filePath, fileInfo))
if self.manageStoragePermissions:
self.logger.debug('Modifying permissions for %s' % filePath)
OsUtility.chmodPath(filePath, fileMode=self.FILE_PERMISSIONS_MODE)
self.logger.debug('Processing file %s' % filePath)
self.fileProcessingManager.processFile(fileInfo)
else:
self.logger.debug('File path %s does not exist' % filePath)
......
......@@ -19,7 +19,9 @@ class FileSessionControllerImpl(DmObjectManager):
DmObjectManager.__init__(self)
self.experimentDbApi = ExperimentDbApi()
def processFile(self, fileName, filePath, experimentName, **kwargs):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
experiment = self.experimentDbApi.getExperimentByName(experimentName)
ExperimentManager.getInstance().processExperimentFile(fileName, filePath, experiment)
return FileMetadata({'fileName' : fileName})
ExperimentManager.getInstance().processExperimentFile(experimentFilePath, experiment, fileInfo)
return FileMetadata(fileInfo)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment