Forked from
DM / dm-docs
261 commits behind, 664 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gridftpFileTransferPlugin.py 7.77 KiB
#!/usr/bin/env python
import os
import copy
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 5 -sync -sync-level 2'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
elif self.src is None:
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath)
if self.dest:
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return destUrl
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
self.logger.debug('Looking for existing files in %s' % storageDirectory)
ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath)
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory))
# Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath)
if filePathDict is None:
# We are not attempting to transfer this file
# No need to change plugin file dict
continue
# Check size
fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('fileSize')
if fSize is None or sfSize is None or fSize != sfSize:
# Sizes differ, need to transfer file
continue
# Sizes are the same, check modify time
mTime = filePathDict.get('fileModificationTime')
smTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not smTime or mTime > smTime:
# Source time is later than storage time, need to transfer file
continue
# No need to transfer file
del filePathsDict[filePath]
self.logger.debug('Number of files that require processing: %s' % len(filePathsDict))
return filePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(host, port)
if not fileInfo.get('fileSize'):
ftpUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
ftpUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.localMd5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
#if self.deleteOriginal:
# self.logger.debug('Deleting file %s' % filePath)
# OsUtility.removeFile(srcUrl)
def getSrcDirUrl(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s/' % (dataDirectory)
elif self.src is None:
srcUrl = 'file://%s/' % (dataDirectory)
else:
srcUrl = '%s/%s/' % (self.src,dataDirectory)
return srcUrl
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
return destUrl
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()