Newer
Older
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
#DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
#DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileDsApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal

sveseli
committed
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles
if nRetries is not None:
self.setNumberOfRetries(nRetries)
self.setSkipOnFailure(skipOnFailure)
def replaceSpecialCharacters(self, url):
replacementMap = {
'#' : '%23',
' ' : '%20',
'~' : '%7E',
}
for (original, replacement) in replacementMap.items():
url = url.replace(original,replacement)
return url
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
elif self.src is None:
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % (self.src,filePath)
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath)
if self.dest:
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
if destDirectory:
destUrl = 'sshftp://%s/%s/%s/%s/%s' % (storageHost, storageDirectory, destDirectory, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
destDirectory = uploadInfo.get('destDirectory')
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Replacement dir path: %s' % replacementDirPath)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
targetDirectory = '%s/%s/' % (storageDirectory, destDirectory)
storageFilePathsDict = {}
try:
self.logger.debug('Looking for existing files in %s' % targetDirectory)
storageFilePathsDict = ftpUtility.getFiles(targetDirectory, {}, replacementDirPath)
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), targetDirectory))
except Exception, ex:
self.logger.warn('Could not find existing files in %s, assuming there are none (got error: %s)' % (targetDirectory,ex))
# Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath)
if filePathDict is None:
# We are not attempting to transfer this file
# No need to change plugin file dict
continue
# Check size
fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('fileSize')
# Sizes differ, need to transfer file
continue
# Sizes are the same, check modify time
mTime = filePathDict.get('fileModificationTime')
smTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not smTime or mTime > smTime:
# Source time is later than storage time, need to transfer file
continue
# No need to transfer file
del filePathsDict[filePath]
self.logger.debug('Number of files that require processing: %s' % len(filePathsDict))
return filePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destDirectory = fileInfo.get('destDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
statUtility = self.statUtility
if not statUtility:
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(srcUrl, defaultPort=self.DEFAULT_PORT)
if not fileInfo.get('fileSize'):
if self.localMd5Sum:
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
#if self.deleteOriginal:
# self.logger.debug('Deleting file %s' % filePath)
# OsUtility.removeFile(srcUrl)
def getSrcDirUrl(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s/' % (dataDirectory)
elif self.src is None:
srcUrl = 'file://%s/' % (dataDirectory)
else:
srcUrl = '%s/%s/' % (self.src,dataDirectory)
return srcUrl
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None):
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
if destDirectory:
destUrl = '%s/%s/' % (destUrl, destDirectory)
return destUrl
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destDirectory = uploadInfo.get('destDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))

sveseli
committed
self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()