Skip to content
Snippets Groups Projects
Commit 99f27032 authored by sveseli's avatar sveseli
Browse files

changes to allow specifying file server (gridftp) url from the command line

parent a6e57418
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import time import time
import os import os
from dmObject import DmObject from dmObject import DmObject
import urlparse
class ObservedFile(DmObject): class ObservedFile(DmObject):
...@@ -15,7 +16,8 @@ class ObservedFile(DmObject): ...@@ -15,7 +16,8 @@ class ObservedFile(DmObject):
if dataDirectory: if dataDirectory:
self['dataDirectory'] = dataDirectory self['dataDirectory'] = dataDirectory
if filePath: if filePath:
self['experimentFilePath'] = os.path.relpath(filePath, dataDirectory) parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if experiment: if experiment:
self['experiment'] = experiment self['experiment'] = experiment
......
...@@ -2,23 +2,30 @@ ...@@ -2,23 +2,30 @@
import os import os
from fileTransferPlugin import FileTransferPlugin from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.ftpUtility import FtpUtility
class GridftpFileTransferPlugin(FileTransferPlugin): class GridftpFileTransferPlugin(FileTransferPlugin):
COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 1' DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=COMMAND):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND):
FileTransferPlugin.__init__(self, command, src, dest) FileTransferPlugin.__init__(self, command, src, dest)
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
if self.src is None: (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
elif self.src is None:
srcUrl = 'file://%s' % filePath srcUrl = 'file://%s' % filePath
else: else:
srcUrl = '%s/%s' % (self.src,filePath) srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
dirName = os.path.dirname(os.path.relpath(filePath, dataDirectory)).strip() (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath) fileName = os.path.basename(filePath)
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return destUrl return destUrl
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import copy import copy
from ftplib import FTP from ftplib import FTP
from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.loggingManager import LoggingManager
import urlparse
class FtpUtility: class FtpUtility:
...@@ -13,6 +14,22 @@ class FtpUtility: ...@@ -13,6 +14,22 @@ class FtpUtility:
self.password = password self.password = password
self.ftpClient = None self.ftpClient = None
@classmethod
def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
host = defaultHost
port = defaultPort
scheme = None
dirPath = url
if url.startswith('ftp://'):
parseResult = urlparse.urlparse(url)
scheme = parseResult.scheme
netlocTokens = parseResult.netloc.split(':')
host = netlocTokens[0]
if len(netlocTokens) > 1:
port = int(netlocTokens[1])
dirPath = parseResult.path
return (scheme, host, port, dirPath)
@classmethod @classmethod
def getFtpClient(cls, host, port, username=None, password=None): def getFtpClient(cls, host, port, username=None, password=None):
ftp = FTP() ftp = FTP()
......
...@@ -19,8 +19,10 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent): ...@@ -19,8 +19,10 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent):
self.isDone = False self.isDone = False
def getFiles(self, dataDirectory): def getFiles(self, dataDirectory):
ftpUtility = FtpUtility(self.host, self.port, self.username, self.password) (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
return ftpUtility.getFiles(dataDirectory, {}) self.logger.debug('Retrieving files from FTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
ftpUtility = FtpUtility(host, port, self.username, self.password)
return ftpUtility.getFiles(dirPath, {})
def updateFile(self, filePath, dataDirectory, experiment): def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver: if self.fileSystemObserver:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment