"src/python/dm/ds_web_service/api/experimentDsApi.py" did not exist on "60f9572e4e04e2d670dec853ad6d6a15d05bd5d7"
Newer
Older
import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR --'
RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --'
DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
targetDirectory = uploadInfo.get('targetDirectory')
dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
if targetDirectory:
dryRunCommand = '%s/%s/' % (dryRunCommand, targetDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run()
lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {}
pathBase = dataDirectory
for line in lines:
if line.endswith(os.sep):
continue
filePath = os.path.join(pathBase, line)
filePathDict = filePathsDict.get(filePath)
if filePathDict:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s' % (len(filePathsDict), len(pluginFilePathsDict)))
return pluginFilePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
targetDirectory = fileInfo.get('targetDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, targetDirectory)
# Use relative path with respect to data directory as a source
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
if not fileInfo.get('fileSize'):
FileUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
command = self.command
if targetDirectory:
destDirectory = '%s/%s' % (storageDirectory,targetDirectory)
command = self.RSYNC_WITH_MKDIR_COMMAND % destDirectory
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, targetDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
command = self.DIRECTORY_TRANSFER_COMMAND
if targetDirectory:
destDirectory = '%s/%s' % (storageDirectory,targetDirectory)
command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % destDirectory
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})