#!/usr/bin/env python import os import copy from fileTransferPlugin import FileTransferPlugin from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility from dm.common.utility.sftpUtility import SftpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class GridftpFileTransferPlugin(FileTransferPlugin): #DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2' #DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2' DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8' DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8' DEFAULT_PORT = 2811 def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal self.directoryTransferCommand = directoryTransferCommand self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) if scheme: srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath) elif self.src is None: srcUrl = 'file://%s' % filePath else: srcUrl = '%s/%s' % (self.src,filePath) return srcUrl def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) if self.dest: destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) else: destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): if not self.pluginMustProcessFiles: return {} storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] self.logger.debug('Upload info: %s' % uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) self.logger.debug('Replacement dir path: %s' % replacementDirPath) self.logger.debug('Number of original files: %s' % len(filePathsDict)) self.logger.debug('Looking for existing files in %s' % storageDirectory) ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory)) # Remove file from plugin dict if we do not need to transfer it for (filePath,storageFilePathDict) in storageFilePathsDict.items(): filePathDict = filePathsDict.get(filePath) if filePathDict is None: # We are not attempting to transfer this file # No need to change plugin file dict continue # Check size fSize = filePathDict.get('fileSize') sfSize = storageFilePathDict.get('fileSize') if fSize is None or sfSize is None or fSize != sfSize: # Sizes differ, need to transfer file continue # Sizes are the same, check modify time mTime = filePathDict.get('fileModificationTime') smTime = storageFilePathDict.get('fileModificationTime') if not mTime or not smTime or mTime > smTime: # Source time is later than storage time, need to transfer file continue # No need to transfer file del filePathsDict[filePath] self.logger.debug('Number of files that require processing: %s' % len(filePathsDict)) return filePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum statUtility = self.statUtility if not statUtility: (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) statUtility = FtpUtility(host, port) if not fileInfo.get('fileSize'): statUtility.statFile(filePath, fileInfo) if self.localMd5Sum: statUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo) # Get remote checksum if self.remoteMd5Sum: fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) # If we have not done md5 locally, update file info if not self.localMd5Sum: fileInfo['md5Sum'] = fileMetadata.get('md5Sum') # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) self.logger.debug('Checksum test passed for file %s' % filePath) # Remove file #if self.deleteOriginal: # self.logger.debug('Deleting file %s' % filePath) # OsUtility.removeFile(srcUrl) def getSrcDirUrl(self, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) if scheme: srcUrl = '%s/' % (dataDirectory) elif self.src is None: srcUrl = 'file://%s/' % (dataDirectory) else: srcUrl = '%s/%s/' % (self.src,dataDirectory) return srcUrl def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory): if self.dest: destUrl = '%s/' % (self.dest) else: destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory) return destUrl def processDirectory(self, directoryInfo): uploadInfo = directoryInfo.get('uploadInfo') dataDirectory = uploadInfo.get('dataDirectory') experimentName = uploadInfo.get('experimentName') storageHost = uploadInfo.get('storageHost') storageDirectory = uploadInfo.get('storageDirectory') destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcDirUrl(dataDirectory) # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo) ####################################################################### # Testing. if __name__ == '__main__': ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') ft.start() print 'StdOut: ', ft.getStdOut() print 'StdErr: ', ft.getStdErr() print 'Exit Status: ', ft.getExitStatus()