diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py
index 23638d8c55010a8fbcc9490187c6550d0705fcab..74dc397d26385f116f213501d83b6bccb8e16469 100755
--- a/src/python/dm/common/processing/fileProcessingManager.py
+++ b/src/python/dm/common/processing/fileProcessingManager.py
@@ -2,10 +2,12 @@
 
 import threading
 import time
+import os
 
 from dm.common.utility.loggingManager import LoggingManager
 from dm.common.utility.configurationManager import ConfigurationManager
 from dm.common.utility.objectUtility import ObjectUtility
+from dm.common.utility.valueUtility import ValueUtility
 from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
 from dm.common.utility.singleton import Singleton
 from fileProcessingThread import FileProcessingThread
@@ -67,6 +69,33 @@ class FileProcessingManager(threading.Thread,Singleton):
         self.fileProcessorKeyList.sort()
         self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
 
+    # Remove hidden files from dictionary of files to be processed
+    def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
+        if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
+            del uploadInfo['processHiddenFiles']
+            return filePathsDict
+        for filePath in filePathsDict.keys():
+            fileName = os.path.basename(filePath)
+            if fileName.startswith('.'):
+                self.logger.debug('File path %s is hidden file, will not process it' % filePath)
+                del filePathsDict[filePath]
+        return filePathsDict
+
+    # Each plugin calculates list of files that need to be processed
+    # Final result is union of all plugins
+    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
+        if ValueUtility.toBoolean(uploadInfo.get('processAllFiles')):
+            del uploadInfo['processAllFiles']
+            return filePathsDict
+        checkedFilePathsDict = {}
+        for processorKey in self.fileProcessorKeyList:
+            processor = self.fileProcessorDict.get(processorKey)
+            # Processor will return list of files it must process
+            pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
+            if len(pluginFilePathsDict):
+                checkedFilePathsDict.update(pluginFilePathsDict)
+        return checkedFilePathsDict
+
     def processFile(self, fileInfo):
         self.fileProcessingQueue.push(fileInfo)
         self.eventFlag.set()
diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py
index 823ddadbde44e45db989a8072aa10f56e4bcac03..c13fceda9eafe32d23af92cbf33f48d0368370fa 100755
--- a/src/python/dm/common/processing/plugins/fileProcessor.py
+++ b/src/python/dm/common/processing/plugins/fileProcessor.py
@@ -12,7 +12,10 @@ class FileProcessor:
     @abc.abstractmethod
     def processFile(self, fileInfo):
         return NotImplemented
-        
+    
+    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
+        return {}
+            
     def configure(self):
         # Use this method for processor configuration
         pass
diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
index b1e94f24db7566ca0c51bf693cd0007e5693b942..ea5e579a138b339a081c57811c1685f56f0047e6 100755
--- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py
@@ -19,6 +19,9 @@ class FileTransferPlugin(FileProcessor):
         self.command = command
         self.subprocess = None
 
+    def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
+        pass
+
     def processFile(self, fileInfo):
         filePath = fileInfo.get('filePath')
         dataDirectory = fileInfo.get('dataDirectory')
diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
index 594ebd66db0921a04c96a3a3feffb3a54f176762..89ebabc3b1a14da1b8cb8c7113530387653c5dda 100755
--- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
@@ -2,7 +2,11 @@
 
 import os
 from fileTransferPlugin import FileTransferPlugin
+from dm.common.utility.fileUtility import FileUtility
 from dm.common.utility.ftpUtility import FtpUtility
+from dm.common.exceptions.fileProcessingError import FileProcessingError
+from dm.common.utility.dmSubprocess import DmSubprocess
+from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
 
 class GridftpFileTransferPlugin(FileTransferPlugin):
 
@@ -10,8 +14,12 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
     DEFAULT_PORT = 2811
 
 
-    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND):
+    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
         FileTransferPlugin.__init__(self, command, src, dest)
+        self.dsFileApi = DsRestApiFactory.getFileRestApi()
+        self.localMd5Sum = localMd5Sum
+        self.remoteMd5Sum = remoteMd5Sum
+        self.deleteOriginal = deleteOriginal
 
     def getSrcUrl(self, filePath, dataDirectory):
         (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
@@ -30,6 +38,80 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
         destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
         return destUrl
 
+    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
+        storageHost = uploadInfo['storageHost']
+        storageDirectory = uploadInfo['storageDirectory']
+        dataDirectory = uploadInfo['dataDirectory']
+        #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
+        ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT)
+        storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
+        pluginFilePathsDict = {}
+        filePaths = filePathsDict.keys()
+        for filePath in filePaths:
+            filePathDict = filePathsDict.get(filePath)
+            experimentFilePath = os.path.relpath(filePath, dataDirectory)
+            storageFilePath = os.path.join(storageDirectory, experimentFilePath)
+            storageFilePathDict = storageFilePathsDict.get(storageFilePath)
+            if not storageFilePathDict:
+                # remote directory does not have the file
+                pluginFilePathsDict[filePath] = filePathDict
+            else:
+                fSize = filePathDict.get('Size') 
+                sfSize = storageFilePathDict.get('Size') 
+                # check size
+                if not fSize or not sfSize or fSize != sfSize:
+                    pluginFilePathsDict[filePath] = filePathDict
+                else:
+                    # sizes are the same, check modify time
+                    mTime = filePathDict.get('Modify') 
+                    sfTime = storageFilePathDict.get('Modify') 
+                    if not mTime or not sfTime or mTime > sfTime:
+                        pluginFilePathsDict[filePath] = filePathDict
+
+        self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
+        return pluginFilePathsDict
+
+    def processFile(self, fileInfo):
+        filePath = fileInfo.get('filePath')
+        dataDirectory = fileInfo.get('dataDirectory')
+        experimentFilePath = fileInfo.get('experimentFilePath')
+        experimentName = fileInfo.get('experimentName')
+        storageHost = fileInfo.get('storageHost')
+        storageDirectory = fileInfo.get('storageDirectory')
+
+        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
+        srcUrl = self.getSrcUrl(filePath, dataDirectory)
+
+        # Calculate checksum
+        if self.localMd5Sum:
+            FileUtility.statFile(filePath, fileInfo)
+            FileUtility.getMd5Sum(filePath, fileInfo)
+
+        # Transfer file
+        self.logger.debug('Starting transfer: %s' % fileInfo)
+        self.start(srcUrl, destUrl)
+
+        # Get remote checksum
+        if self.remoteMd5Sum:
+            fileInfo2 = {}
+            fileInfo2['experimentFilePath'] = experimentFilePath
+            fileInfo2['experimentName'] = experimentName
+            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
+            # If we have not done md5 locally, update file info
+            if not self.md5Sum:
+                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
+
+            # Verify checksum
+            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
+                self.logger.error('Checksum mismatch for file: %s' % filePath)
+                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
+            self.logger.debug('Checksum test passed for file %s' % filePath)
+
+        # Remove file
+        if self.deleteOriginal:
+            self.logger.debug('Deleting file %s' % filePath)
+            OsUtility.removeFile(srcUrl)
+
 #######################################################################
 # Testing.
 if __name__ == '__main__':
diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
index 3550d0cf22b5ec2cca5671f56e488182d0cb05c5..f4d1b8a0219224b217c0cbffe7e651522b9543b7 100755
--- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
@@ -1,18 +1,91 @@
 #!/usr/bin/env python
 
+import os
+
 from fileTransferPlugin import FileTransferPlugin
+from dm.common.utility.osUtility import OsUtility
+from dm.common.utility.fileUtility import FileUtility
+from dm.common.exceptions.fileProcessingError import FileProcessingError
+from dm.common.utility.dmSubprocess import DmSubprocess
+from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
+
 class RsyncFileTransferPlugin(FileTransferPlugin):
 
-    COMMAND = 'rsync -arvlPR'
+    DEFAULT_COMMAND = 'rsync -arvlPR'
+    DRY_RUN_COMMAND = 'rsync -arvlP'
+
+    def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False):
+        FileTransferPlugin.__init__(self, command, src, dest)
+        self.dsFileApi = DsRestApiFactory.getFileRestApi()
+        self.localMd5Sum = localMd5Sum
+        self.remoteMd5Sum = remoteMd5Sum
+        self.deleteOriginal = deleteOriginal
+
+    def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
+        storageDirectory = uploadInfo['storageDirectory']
+        storageHost = uploadInfo['storageHost']
+        dataDirectory = uploadInfo['dataDirectory']
+        dryRunCommand = '%s --dry-run %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
+        subprocess = DmSubprocess.getSubprocess(dryRunCommand)
+        subprocess.run()
+        lines = subprocess.getStdOut().split('\n')
+        pluginFilePathsDict = {}
+        pathBase = dataDirectory
+        filePaths = filePathsDict.keys()
+        for line in lines:
+            if line.endswith(os.sep):
+                continue
+            filePath = os.path.join(pathBase, line)
+            if filePath in filePaths:
+                pluginFilePathsDict[filePath] = filePathsDict.get(filePath)
+        self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
+        return pluginFilePathsDict
+
+    def processFile(self, fileInfo):
+        filePath = fileInfo.get('filePath')
+        dataDirectory = fileInfo.get('dataDirectory')
+        experimentFilePath = fileInfo.get('experimentFilePath')
+        experimentName = fileInfo.get('experimentName')
+        storageHost = fileInfo.get('storageHost')
+        storageDirectory = fileInfo.get('storageDirectory')
+        destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
+        # Use relative path with respect to data directory as a source
+        os.chdir(dataDirectory)
+        srcUrl = self.getSrcUrl(filePath, dataDirectory)
 
-    def __init__(self, src=None, dest=None):
-        FileTransferPlugin.__init__(self, self.COMMAND, src, dest)
+        # Calculate checksum
+        if self.localMd5Sum:
+            FileUtility.statFile(filePath, fileInfo)
+            FileUtility.getMd5Sum(filePath, fileInfo)
+
+        # Transfer file
+        self.logger.debug('Starting transfer: %s' % fileInfo)
+        self.start(srcUrl, destUrl)
+
+        # Get remote checksum
+        if self.remoteMd5Sum:
+            fileInfo2 = {}
+            fileInfo2['experimentFilePath'] = experimentFilePath
+            fileInfo2['experimentName'] = experimentName
+            fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
+            # If we have not done md5 locally, update file info
+            if not self.md5Sum:
+                fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
+
+            # Verify checksum
+            if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
+                self.logger.error('Checksum mismatch for file: %s' % filePath)
+                raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
+            self.logger.debug('Checksum test passed for file %s' % filePath)
+
+        # Remove file
+        if self.deleteOriginal:
+            self.logger.debug('Deleting file %s' % filePath)
+            OsUtility.removeFile(srcUrl)
 
 #######################################################################
 # Testing.
 if __name__ == '__main__':
-    ft = RsyncFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
-    ft.start()
-    print 'StdOut: ', ft.getStdOut()
-    print 'StdErr: ', ft.getStdErr()
-    print 'Exit Status: ', ft.getExitStatus()
+    ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
+    ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})
+
diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py
index 87a5125ea6194276c33cfc1af726d21760947bb6..f821f5e742ce21b4b41aa390d00d7e844b59e6f5 100755
--- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py
@@ -20,8 +20,8 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin):
     def processFile(self, fileInfo):
         filePath = fileInfo.get('filePath')
         dataDirectory = fileInfo.get('dataDirectory')
-	experimentFilePath = fileInfo.get('experimentFilePath')
-	experimentName = fileInfo.get('experimentName')
+        experimentFilePath = fileInfo.get('experimentFilePath')
+        experimentName = fileInfo.get('experimentName')
         storageHost = fileInfo.get('storageHost')
         storageDirectory = fileInfo.get('storageDirectory')
         destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)