From d3b99159217c4ea15e6762bca232184ef48f39b7 Mon Sep 17 00:00:00 2001
From: Sinisa Veseli <sveseli@aps.anl.gov>
Date: Sat, 7 Nov 2015 05:56:20 +0000
Subject: [PATCH] fixed few issues found while testing gridftp plugin and file
observer; made observer more generic by shifting some functionality to agent
---
.../common/processing/plugins/fileTransferPlugin.py | 4 ++--
.../processing/plugins/gridftpFileTransferPlugin.py | 5 +++--
.../service/impl/experimentSessionControllerImpl.py | 7 ++-----
.../service/impl/fileSystemObserver.py | 9 +++++++++
.../service/impl/fileSystemObserverAgent.py | 6 ++++++
.../service/impl/watchdogFileSystemObserverAgent.py | 13 +++++++++++++
6 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
index 196377f8..0c230983 100755
--- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
@@ -27,12 +27,12 @@ class FileTransferPlugin(FileProcessor):
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
destUrl = self.getDestUrl(storageHost, storageDirectory)
- # Use relative path with respect to data directory as a source
- os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(srcUrl, destUrl)
def getSrcUrl(self, filePath, dataDirectory):
+ # Use relative path with respect to data directory as a source
+ os.chdir(dataDirectory)
srcUrl = os.path.relpath(filePath, dataDirectory)
return srcUrl
diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
index abf5f2cd..6999ccdf 100755
--- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
@@ -1,18 +1,19 @@
#!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin
+
class GridftpFileTransferPlugin(FileTransferPlugin):
COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
- def __init__(self, src=None, dest=None, command=GridftpFileTransferPlugin.COMMAND):
+ def __init__(self, src=None, dest=None, command=COMMAND):
FileTransferPlugin.__init__(self, command, src, dest)
def getSrcUrl(self, filePath, dataDirectory):
if self.src is None:
srcUrl = 'file://%s' % filePath
else:
- srcUrl = '%s/%s' % filePath
+ srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl
def getDestUrl(self, storageHost, storageDirectory):
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index 5f42e727..ecc2d56b 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -10,7 +10,6 @@ import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
-from dm.common.utility.osUtility import OsUtility
from dm.common.objects.observedFile import ObservedFile
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
@@ -20,14 +19,12 @@ from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
- DAQ_PERMISSIONS_MODE = 0777
-
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, experimentName, dataDirectory, daqInfo):
- OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
+ FileSystemObserver.getInstance().createDirectory(dataDirectory)
if daqInfo is None:
daqInfo={}
daqInfo['experimentName'] = experimentName
@@ -69,7 +66,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
- filePaths = OsUtility.findFiles(dataDirectory)
+ filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index c9e4bfda..6613f820 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -63,6 +63,15 @@ class FileSystemObserver(threading.Thread,Singleton):
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
+ @ThreadingUtility.synchronize
+ def createDirectory(self, dataDirectory):
+ self.fileSystemObserverAgent.createDirectory(dataDirectory)
+
+ @ThreadingUtility.synchronize
+ def getFiles(self, dataDirectory):
+ self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
+ return self.fileSystemObserverAgent.getFiles(dataDirectory)
+
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py
index 3603f19d..6077a804 100644
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserverAgent.py
@@ -11,6 +11,12 @@ class FileSystemObserverAgent:
def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver
+ def createDirectory(self, dataDirectory):
+ pass
+
+ def getFiles(self, dataDirectory):
+ pass
+
def startObservingPath(self, dataDirectory, experiment):
pass
diff --git a/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py
index 81def989..c7093676 100644
--- a/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py
+++ b/src/python/dm/daq_web_service/service/impl/watchdogFileSystemObserverAgent.py
@@ -1,16 +1,29 @@
#!/usr/bin/env python
+from dm.common.utility.osUtility import OsUtility
+
from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
+ DAQ_PERMISSIONS_MODE = 0777
+
def __init__(self):
FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver()
self.observedWatchDict = {}
+ def createDirectory(self, dataDirectory):
+ try:
+ OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
+ except Exception, ex:
+ self.logger.warn('Unable to create directory %s: %s' % (dataDirectory, ex))
+
+ def getFiles(self, dataDirectory):
+ return OsUtility.findFiles(dataDirectory)
+
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
--
GitLab