#!/usr/bin/env python import os from fileTransferPlugin import FileTransferPlugin from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility from dm.common.utility.sftpUtility import SftpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2' DEFAULT_PORT = 2811 def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) if scheme: srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath) elif self.src is None: srcUrl = 'file://%s' % filePath else: srcUrl = '%s/%s' % (self.src,filePath) return srcUrl def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] self.logger.debug('Upload info: %s', uploadInfo) #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT) ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}) pluginFilePathsDict = {} filePaths = filePathsDict.keys() for filePath in filePaths: filePathDict = filePathsDict.get(filePath) experimentFilePath = os.path.relpath(filePath, dataDirectory) storageFilePath = os.path.join(storageDirectory, experimentFilePath) storageFilePathDict = storageFilePathsDict.get(storageFilePath) if not storageFilePathDict: # remote directory does not have the file pluginFilePathsDict[filePath] = filePathDict else: fSize = filePathDict.get('fileSize') sfSize = storageFilePathDict.get('fileSize') # check size if not fSize or not sfSize or fSize != sfSize: pluginFilePathsDict[filePath] = filePathDict else: # sizes are the same, check modify time mTime = filePathDict.get('fileModificationTime') sfTime = storageFilePathDict.get('fileModificationTime') if not mTime or not sfTime or mTime > sfTime: pluginFilePathsDict[filePath] = filePathDict self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') dataDirectory = fileInfo.get('dataDirectory') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') storageHost = fileInfo.get('storageHost') storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) ftpUtility = FtpUtility(host, port) ftpUtility.statFile(filePath, fileInfo) if self.localMd5Sum: ftpUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s' % fileInfo) self.start(srcUrl, destUrl) # Get remote checksum if self.remoteMd5Sum: fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2) # If we have not done md5 locally, update file info if not self.md5Sum: fileInfo['md5Sum'] = fileMetadata.get('md5Sum') # Verify checksum if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'): self.logger.error('Checksum mismatch for file: %s' % filePath) raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath) self.logger.debug('Checksum test passed for file %s' % filePath) # Remove file #if self.deleteOriginal: # self.logger.debug('Deleting file %s' % filePath) # OsUtility.removeFile(srcUrl) ####################################################################### # Testing. if __name__ == '__main__': ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2') ft.start() print 'StdOut: ', ft.getStdOut() print 'StdErr: ', ft.getStdErr() print 'Exit Status: ', ft.getExitStatus()