Skip to content
Snippets Groups Projects
rsyncWithChecksumAndDeleteFileTransferPlugin.py 2.69 KiB
Newer Older
#!/usr/bin/env python

import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.processing.plugins.fileTransferPlugin import FileTransferPlugin
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory

class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):

    COMMAND = 'rsync -arvlPR'

    def __init__(self, src=None, dest=None):
        FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
        self.dsFileApi = DsRestApiFactory.getFileRestApi()
        self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

    def processFile(self, fileInfo):
        filePath = fileInfo.get('filePath')
        dataDirectory = fileInfo.get('dataDirectory')
        experiment = fileInfo.get('experiment')
        experimentName = experiment.get('name')
        experimentFilePath = fileInfo.get('experimentFilePath')
        storageHost = experiment.get('storageHost')
        storageDirectory = experiment.get('storageDirectory')
        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
        # Use relative path with respect to data directory as a source
        os.chdir(dataDirectory)
        srcUrl = self.getSrcUrl(filePath, dataDirectory)

        # Calculate checksum
        FileUtility.statFile(filePath, fileInfo)
        FileUtility.getMd5Sum(filePath, fileInfo)
        self.logger.debug('File info before transfer: %s' % fileInfo)

        # Transfer file
        self.start(srcUrl, destUrl)

        # Get remote checksum
        fileInfo2 = {}
        fileInfo2['experimentFilePath'] = experimentFilePath
        fileInfo2['experimentName'] = experimentName
        fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
        self.logger.debug('File stat after transfer: %s' % fileMetadata)

        # Verify checksum
        if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
            self.logger.error('Checksum mismatch for file: %s' % filePath)
            raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)

        # Remove file
        self.logger.debug('Checksum test passed, deleting file %s' % filePath)
        OsUtility.removeFile(srcUrl)

#######################################################################
# Testing.
if __name__ == '__main__':
    ft = RsyncWithChecksumAndDeleteFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
    ft.start()
    print 'StdOut: ', ft.getStdOut()
    print 'StdErr: ', ft.getStdErr()
    print 'Exit Status: ', ft.getExitStatus()