Skip to content
Snippets Groups Projects
Commit d3b99159 authored by sveseli's avatar sveseli
Browse files

fixed few issues found while testing gridftp plugin and file observer; made...

fixed few issues found while testing gridftp plugin and file observer; made observer more generic by shifting some functionality to agent
parent 768e47fe
No related branches found
No related tags found
No related merge requests found
...@@ -27,12 +27,12 @@ class FileTransferPlugin(FileProcessor): ...@@ -27,12 +27,12 @@ class FileTransferPlugin(FileProcessor):
storageHost = experiment.get('storageHost') storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory') storageDirectory = experiment.get('storageDirectory')
destUrl = self.getDestUrl(storageHost, storageDirectory) destUrl = self.getDestUrl(storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(srcUrl, destUrl) self.start(srcUrl, destUrl)
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = os.path.relpath(filePath, dataDirectory) srcUrl = os.path.relpath(filePath, dataDirectory)
return srcUrl return srcUrl
......
#!/usr/bin/env python #!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin from fileTransferPlugin import FileTransferPlugin
class GridftpFileTransferPlugin(FileTransferPlugin): class GridftpFileTransferPlugin(FileTransferPlugin):
COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2' COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
def __init__(self, src=None, dest=None, command=GridftpFileTransferPlugin.COMMAND): def __init__(self, src=None, dest=None, command=COMMAND):
FileTransferPlugin.__init__(self, command, src, dest) FileTransferPlugin.__init__(self, command, src, dest)
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
if self.src is None: if self.src is None:
srcUrl = 'file://%s' % filePath srcUrl = 'file://%s' % filePath
else: else:
srcUrl = '%s/%s' % filePath srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl return srcUrl
def getDestUrl(self, storageHost, storageDirectory): def getDestUrl(self, storageHost, storageDirectory):
......
...@@ -10,7 +10,6 @@ import time ...@@ -10,7 +10,6 @@ import time
from dm.common.objects.experiment import Experiment from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.osUtility import OsUtility
from dm.common.objects.observedFile import ObservedFile from dm.common.objects.observedFile import ObservedFile
from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
...@@ -20,14 +19,12 @@ from fileSystemObserver import FileSystemObserver ...@@ -20,14 +19,12 @@ from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager): class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """ """ Experiment session controller implementation class. """
DAQ_PERMISSIONS_MODE = 0777
def __init__(self): def __init__(self):
DmObjectManager.__init__(self) DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi() self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, experimentName, dataDirectory, daqInfo): def startDaq(self, experimentName, dataDirectory, daqInfo):
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE) FileSystemObserver.getInstance().createDirectory(dataDirectory)
if daqInfo is None: if daqInfo is None:
daqInfo={} daqInfo={}
daqInfo['experimentName'] = experimentName daqInfo['experimentName'] = experimentName
...@@ -69,7 +66,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): ...@@ -69,7 +66,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if storageDirectory is None: if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName) raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory') dataDirectory = daqInfo.get('dataDirectory')
filePaths = OsUtility.findFiles(dataDirectory) filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance() fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePaths: for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
......
...@@ -63,6 +63,15 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -63,6 +63,15 @@ class FileSystemObserver(threading.Thread,Singleton):
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor) self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self) self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def createDirectory(self, dataDirectory):
self.fileSystemObserverAgent.createDirectory(dataDirectory)
@ThreadingUtility.synchronize
def getFiles(self, dataDirectory):
self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
return self.fileSystemObserverAgent.getFiles(dataDirectory)
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment): def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory) self.logger.debug('Agent is starting observer for %s' % dataDirectory)
......
...@@ -11,6 +11,12 @@ class FileSystemObserverAgent: ...@@ -11,6 +11,12 @@ class FileSystemObserverAgent:
def setFileSystemObserver(self, fileSystemObserver): def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver self.fileSystemObserver = fileSystemObserver
def createDirectory(self, dataDirectory):
pass
def getFiles(self, dataDirectory):
pass
def startObservingPath(self, dataDirectory, experiment): def startObservingPath(self, dataDirectory, experiment):
pass pass
......
#!/usr/bin/env python #!/usr/bin/env python
from dm.common.utility.osUtility import OsUtility
from watchdog.observers.polling import PollingObserver from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent): class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
DAQ_PERMISSIONS_MODE = 0777
def __init__(self): def __init__(self):
FileSystemObserverAgent.__init__(self) FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver() self.observer = PollingObserver()
self.observedWatchDict = {} self.observedWatchDict = {}
def createDirectory(self, dataDirectory):
try:
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
except Exception, ex:
self.logger.warn('Unable to create directory %s: %s' % (dataDirectory, ex))
def getFiles(self, dataDirectory):
return OsUtility.findFiles(dataDirectory)
def startObservingPath(self, dataDirectory, experiment): def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory) self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment) eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment