Skip to content
Snippets Groups Projects
Commit 3f41a079 authored by sveseli's avatar sveseli
Browse files

merged gridftp-related fixes from release 0.6

No related branches found
No related tags found
No related merge requests found
......@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.5 (DM_DATE)"
DM_SOFTWARE_VERSION="0.6 (DM_DATE)"
__version__ = "0.3 (2015.09.21)"
__version__ = "0.6 (2015.11.06)"
......@@ -27,12 +27,12 @@ class FileTransferPlugin(FileProcessor):
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
destUrl = self.getDestUrl(storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(srcUrl, destUrl)
def getSrcUrl(self, filePath, dataDirectory):
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = os.path.relpath(filePath, dataDirectory)
return srcUrl
......
#!/usr/bin/env python
from fileTransferPlugin import FileTransferPlugin
class GridftpFileTransferPlugin(FileTransferPlugin):
COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
def __init__(self, src=None, dest=None, command=GridftpFileTransferPlugin.COMMAND):
def __init__(self, src=None, dest=None, command=COMMAND):
FileTransferPlugin.__init__(self, command, src, dest)
def getSrcUrl(self, filePath, dataDirectory):
if self.src is None:
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % filePath
srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl
def getDestUrl(self, storageHost, storageDirectory):
......
......@@ -10,7 +10,6 @@ import time
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.osUtility import OsUtility
from dm.common.objects.observedFile import ObservedFile
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
......@@ -20,14 +19,12 @@ from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
DAQ_PERMISSIONS_MODE = 0777
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, experimentName, dataDirectory, daqInfo):
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
FileSystemObserver.getInstance().createDirectory(dataDirectory)
if daqInfo is None:
daqInfo={}
daqInfo['experimentName'] = experimentName
......@@ -69,7 +66,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
dataDirectory = daqInfo.get('dataDirectory')
filePaths = OsUtility.findFiles(dataDirectory)
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
......
......@@ -63,6 +63,15 @@ class FileSystemObserver(threading.Thread,Singleton):
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def createDirectory(self, dataDirectory):
self.fileSystemObserverAgent.createDirectory(dataDirectory)
@ThreadingUtility.synchronize
def getFiles(self, dataDirectory):
self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
return self.fileSystemObserverAgent.getFiles(dataDirectory)
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
......
......@@ -11,6 +11,12 @@ class FileSystemObserverAgent:
def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver
def createDirectory(self, dataDirectory):
pass
def getFiles(self, dataDirectory):
pass
def startObservingPath(self, dataDirectory, experiment):
pass
......
#!/usr/bin/env python
from dm.common.utility.osUtility import OsUtility
from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
DAQ_PERMISSIONS_MODE = 0777
def __init__(self):
FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver()
self.observedWatchDict = {}
def createDirectory(self, dataDirectory):
try:
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
except Exception, ex:
self.logger.warn('Unable to create directory %s: %s' % (dataDirectory, ex))
def getFiles(self, dataDirectory):
return OsUtility.findFiles(dataDirectory)
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
......
......@@ -51,9 +51,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
def updateExperiment(self, name):
experiment = self.experimentDbApi.getExperimentWithUsers(name)
if experiment.get('startDate') is None:
raise InvalidRequest('Experiment %s has not been started.' % name)
ExperimentManager.getInstance().updateExperimentGroupUsers(experiment)
if experiment.get('startDate') is not None:
ExperimentManager.getInstance().updateExperimentGroupUsers(experiment)
return experiment
def stopExperiment(self, name):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment