Skip to content
Snippets Groups Projects
Commit 22a58e0e authored by sveseli's avatar sveseli
Browse files

modifications to eliminate processing of hidden files and of unchanged files...

modifications to eliminate processing of hidden files and of unchanged files that were processed already; checksum is now done by default
parent d870a9e0
No related branches found
No related tags found
No related merge requests found
......@@ -2,10 +2,12 @@
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
......@@ -67,6 +69,33 @@ class FileProcessingManager(threading.Thread,Singleton):
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
# Remove hidden files from dictionary of files to be processed
def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
for filePath in filePathsDict.keys():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processAllFiles')):
del uploadInfo['processAllFiles']
return filePathsDict
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
# Processor will return list of files it must process
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
return checkedFilePathsDict
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
......
......@@ -12,7 +12,10 @@ class FileProcessor:
@abc.abstractmethod
def processFile(self, fileInfo):
return NotImplemented
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
return {}
def configure(self):
# Use this method for processor configuration
pass
......
......@@ -19,6 +19,9 @@ class FileTransferPlugin(FileProcessor):
self.command = command
self.subprocess = None
def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
pass
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
......
......@@ -2,7 +2,11 @@
import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
......@@ -10,8 +14,12 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
FileTransferPlugin.__init__(self, command, src, dest)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
......@@ -30,6 +38,80 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return destUrl
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
#(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
pluginFilePathsDict = {}
filePaths = filePathsDict.keys()
for filePath in filePaths:
filePathDict = filePathsDict.get(filePath)
experimentFilePath = os.path.relpath(filePath, dataDirectory)
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
storageFilePathDict = storageFilePathsDict.get(storageFilePath)
if not storageFilePathDict:
# remote directory does not have the file
pluginFilePathsDict[filePath] = filePathDict
else:
fSize = filePathDict.get('Size')
sfSize = storageFilePathDict.get('Size')
# check size
if not fSize or not sfSize or fSize != sfSize:
pluginFilePathsDict[filePath] = filePathDict
else:
# sizes are the same, check modify time
mTime = filePathDict.get('Modify')
sfTime = storageFilePathDict.get('Modify')
if not mTime or not sfTime or mTime > sfTime:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
return pluginFilePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
if self.localMd5Sum:
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s' % fileInfo)
self.start(srcUrl, destUrl)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.md5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
OsUtility.removeFile(srcUrl)
#######################################################################
# Testing.
if __name__ == '__main__':
......
#!/usr/bin/env python
import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin):
COMMAND = 'rsync -arvlPR'
DEFAULT_COMMAND = 'rsync -arvlPR'
DRY_RUN_COMMAND = 'rsync -arvlP'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
FileTransferPlugin.__init__(self, command, src, dest)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run()
lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {}
pathBase = dataDirectory
filePaths = filePathsDict.keys()
for line in lines:
if line.endswith(os.sep):
continue
filePath = os.path.join(pathBase, line)
if filePath in filePaths:
pluginFilePathsDict[filePath] = filePathsDict.get(filePath)
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
return pluginFilePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
os.chdir(dataDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
def __init__(self, src=None, dest=None):
FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
# Calculate checksum
if self.localMd5Sum:
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s' % fileInfo)
self.start(srcUrl, destUrl)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.md5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
OsUtility.removeFile(srcUrl)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})
......@@ -20,8 +20,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment