#!/usr/bin/env python import os import copy from fileTransferPlugin import FileTransferPlugin from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility from dm.common.utility.sftpUtility import SftpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2' DEFAULT_PORT = 2811 def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) if scheme: srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath) elif self.src is None: srcUrl = 'file://%s' % filePath else: srcUrl = '%s/%s' % (self.src,filePath) return srcUrl def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) if self.dest: destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) else: destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): if not self.pluginMustProcessFiles: return {} storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] self.logger.debug('Upload info: %s', uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) self.logger.debug('Number of original files: %s', len(filePathsDict)) self.logger.debug('Looking for existing files in %s', storageDirectory) ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) self.logger.debug('There are %s files in %s', (len(storageFilePathsDict), storageDirectory)) # Remove file from plugin dict if we do not need to transfer it for (filePath,storageFilePathDict) in storageFilePathsDict.items(): filePathDict = filePathsDict.get(filePath) if filePathDict is None: # We are not attempting to transfer this file # No need to change plugin file dict continue # Check size fSize = filePathDict.get('fileSize') sfSize = storageFilePathDict.get('fileSize') if not fSize or not sfSize or fSize != sfSize: # Sizes differ, need to transfer file continue # Sizes are the same, check modify time mTime = filePathDict.get('fileModificationTime') smTime = storageFilePathDict.get('fileModificationTime') if not mTime or not smTime or mTime > smTime: # Source time is later than storage time, need to transfer file continue # No need to transfer file del filePathsDict[filePath] self.logger.debug('Number of files that require processing: %s', len(filePathsDict)) return filePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) ftpUtility = FtpUtility(host, port) ftpUtility.statFile(filePath, fileInfo) if self.localMd5Sum: ftpUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s -> %s (fileInfo: %s)' % (srcUrl, destUrl, fileInfo)) self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) # If we have not done md5 locally, update file info if not self.localMd5Sum: fileInfo['md5Sum'] = fileMetadata.get('md5Sum') # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) self.logger.debug('Checksum test passed for file %s' % filePath) # Remove file #if self.deleteOriginal: # self.logger.debug('Deleting file %s' % filePath) # OsUtility.removeFile(srcUrl) ####################################################################### # Testing. if __name__ == '__main__': ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') ft.start() print 'StdOut: ', ft.getStdOut() print 'StdErr: ', ft.getStdErr() print 'Exit Status: ', ft.getExitStatus()