Skip to content
Snippets Groups Projects
Commit 6fc82483 authored by sveseli's avatar sveseli
Browse files

fixes for sftp vs ftp

parent bf40e7d5
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,7 @@ import os ...@@ -4,6 +4,7 @@ import os
from fileTransferPlugin import FileTransferPlugin from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
...@@ -44,7 +45,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -44,7 +45,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
dataDirectory = uploadInfo['dataDirectory'] dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s', uploadInfo) self.logger.debug('Upload info: %s', uploadInfo)
#(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT) #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT) ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
pluginFilePathsDict = {} pluginFilePathsDict = {}
filePaths = filePathsDict.keys() filePaths = filePathsDict.keys()
...@@ -57,15 +58,15 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -57,15 +58,15 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
# remote directory does not have the file # remote directory does not have the file
pluginFilePathsDict[filePath] = filePathDict pluginFilePathsDict[filePath] = filePathDict
else: else:
fSize = filePathDict.get('Size') fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('Size') sfSize = storageFilePathDict.get('fileSize')
# check size # check size
if not fSize or not sfSize or fSize != sfSize: if not fSize or not sfSize or fSize != sfSize:
pluginFilePathsDict[filePath] = filePathDict pluginFilePathsDict[filePath] = filePathDict
else: else:
# sizes are the same, check modify time # sizes are the same, check modify time
mTime = filePathDict.get('Modify') mTime = filePathDict.get('fileModificationTime')
sfTime = storageFilePathDict.get('Modify') sfTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not sfTime or mTime > sfTime: if not mTime or not sfTime or mTime > sfTime:
pluginFilePathsDict[filePath] = filePathDict pluginFilePathsDict[filePath] = filePathDict
......
...@@ -10,13 +10,13 @@ import urlparse ...@@ -10,13 +10,13 @@ import urlparse
class FtpUtility: class FtpUtility:
def __init__(self, host, port, username=None, password=None, utcTimeStamps=True): def __init__(self, host, port, username=None, password=None, serverUsesUtcTime=True):
self.host = host self.host = host
self.port = port self.port = port
self.username = username self.username = username
self.password = password self.password = password
self.ftpClient = None self.ftpClient = None
self.utcTimeStamps = utcTimeStamps self.serverUsesUtcTime = serverUsesUtcTime
self.mlsdFileStatDict = {} self.mlsdFileStatDict = {}
@classmethod @classmethod
...@@ -48,9 +48,11 @@ class FtpUtility: ...@@ -48,9 +48,11 @@ class FtpUtility:
return logger return logger
def __parseKeyValue(cls, keyValue, outputDict={}): def __parseKeyValue(cls, origKeyValue, outputDict={}, newKey=None):
key,value = keyValue.split('=') key,value = origKeyValue.split('=')
value = value.strip() value = value.strip()
if newKey is not None:
key = newKey
outputDict[key] = value outputDict[key] = value
return outputDict return outputDict
...@@ -60,7 +62,7 @@ class FtpUtility: ...@@ -60,7 +62,7 @@ class FtpUtility:
parseDict = {} parseDict = {}
self.__parseKeyValue(parts[0], parseDict) self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict) self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict) self.__parseKeyValue(parts[2], parseDict, 'fileSize')
name = parts[-1].strip() name = parts[-1].strip()
parseDict['Name'] = name parseDict['Name'] = name
type = parseDict.get('Type', '') type = parseDict.get('Type', '')
...@@ -75,10 +77,21 @@ class FtpUtility: ...@@ -75,10 +77,21 @@ class FtpUtility:
parseDict = {} parseDict = {}
self.__parseKeyValue(parts[0], parseDict) self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict) self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict) self.__parseKeyValue(parts[2], parseDict, 'fileSize')
name = parts[-1].strip() name = parts[-1].strip()
self.mlsdFileStatDict[name] = parseDict self.mlsdFileStatDict[name] = parseDict
def __processFileStatDict(self, fileStatDict):
modifyTime = fileStatDict.get('Modify')
modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S'))
if self.serverUsesUtcTime:
modifyTime = TimeUtility.utcToLocalTime(modifyTime)
fileStatDict['fileModificationTime'] = modifyTime
fileStatDict['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime)
fileStatDict['fileSize'] = int(fileStatDict.get('fileSize'))
del fileStatDict['Modify']
del fileStatDict['Type']
def getFiles(self, dirPath, fileDict={}): def getFiles(self, dirPath, fileDict={}):
if not self.ftpClient: if not self.ftpClient:
self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password) self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password)
...@@ -87,6 +100,8 @@ class FtpUtility: ...@@ -87,6 +100,8 @@ class FtpUtility:
self.mlsdDirList = [] self.mlsdDirList = []
self.ftpClient.retrlines('MLSD %s' % dirPath, self.__parseMlsdOutput) self.ftpClient.retrlines('MLSD %s' % dirPath, self.__parseMlsdOutput)
for (fileName,fileInfo) in self.mlsdFileDict.items(): for (fileName,fileInfo) in self.mlsdFileDict.items():
self.__processFileStatDict(fileInfo)
del fileInfo['Name']
filePath = '%s/%s' % (dirPath, fileName) filePath = '%s/%s' % (dirPath, fileName)
fileDict[filePath] = fileInfo fileDict[filePath] = fileInfo
for d in copy.copy(self.mlsdDirList): for d in copy.copy(self.mlsdDirList):
...@@ -109,13 +124,8 @@ class FtpUtility: ...@@ -109,13 +124,8 @@ class FtpUtility:
self.ftpClient.retrlines('MLSD %s' % filePath, self.__parseMlsdFileStat) self.ftpClient.retrlines('MLSD %s' % filePath, self.__parseMlsdFileStat)
fileStatDict = self.mlsdFileStatDict.get(fileName) fileStatDict = self.mlsdFileStatDict.get(fileName)
if fileStatDict: if fileStatDict:
fileInfo['fileSize'] = fileStatDict.get('Size') self.__processFileStatDict(fileStatDict)
modifyTime = fileStatDict.get('Modify') fileInfo.update(fileStatDict)
modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S'))
if self.utcTimeStamps:
modifyTime = TimeUtility.utcToLocalTime(modifyTime)
fileInfo['fileModificationTime'] = modifyTime
fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime)
del self.mlsdFileStatDict[fileName] del self.mlsdFileStatDict[fileName]
return fileInfo return fileInfo
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import copy import copy
import stat import stat
import pysftp import pysftp
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.loggingManager import LoggingManager
import urlparse import urlparse
...@@ -63,7 +64,9 @@ class SftpUtility: ...@@ -63,7 +64,9 @@ class SftpUtility:
if stat.S_ISDIR(mode): if stat.S_ISDIR(mode):
self.getFiles(fullPath, fileDict) self.getFiles(fullPath, fileDict)
elif stat.S_ISREG(mode): elif stat.S_ISREG(mode):
fileInfo = {'filePath' : fullPath, 'fileSize' : attr.st_size, } fileInfo = {'filePath' : fullPath, 'fileSize' : attr.st_size,
'fileModificationTime' : attr.st_mtime }
fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(attr.st_mtime)
fileDict[fullPath] = fileInfo fileDict[fullPath] = fileInfo
return fileDict return fileDict
...@@ -71,6 +74,6 @@ class SftpUtility: ...@@ -71,6 +74,6 @@ class SftpUtility:
# Testing. # Testing.
if __name__ == '__main__': if __name__ == '__main__':
sftpUtility = SftpUtility('dmstorage', username='dm') sftpUtility = SftpUtility('xstor-devel', username='dmadmin')
files = sftpUtility.getFiles('/opt/DM/data/ESAF/e1') files = sftpUtility.getFiles('/data/testing/test1')
print files print files
...@@ -38,9 +38,9 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent): ...@@ -38,9 +38,9 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent):
else: else:
# old file, check timestamp # old file, check timestamp
oldFileInfo = oldFileDict.get(filePath) oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('Modify', '') oldModifyTime = oldFileInfo.get('fileModificationTime', '')
fileInfo = fileDict.get(filePath) fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('Modify') modifyTime = fileInfo.get('fileModificationTime')
if modifyTime != oldModifyTime: if modifyTime != oldModifyTime:
# file has been modified, need to process it # file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath) self.logger.debug('Modified file path detected: %s' % filePath)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment