Skip to content
Snippets Groups Projects
Commit 691ee9fc authored by sveseli's avatar sveseli
Browse files

merged fixes from 0.8

parents d98298f4 eaa5ad5c
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,8 @@ from dm.common.utility import loggingManager
class DmObject(UserDict.UserDict):
""" Base dm object class. """
ALL_KEYS = '__all__'
DEFAULT_KEYS = '__default__'
ALL_KEYS = 'ALL'
DEFAULT_KEYS = 'DEFAULT'
DICT_DISPLAY_FORMAT = 'dict'
TEXT_DISPLAY_FORMAT = 'text'
......
......@@ -4,6 +4,7 @@ import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
......@@ -43,30 +44,30 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s', uploadInfo)
#(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath)
pluginFilePathsDict = {}
filePaths = filePathsDict.keys()
for filePath in filePaths:
filePathDict = filePathsDict.get(filePath)
experimentFilePath = os.path.relpath(filePath, dataDirectory)
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
storageFilePathDict = storageFilePathsDict.get(storageFilePath)
storageFilePathDict = storageFilePathsDict.get(filePath)
if not storageFilePathDict:
# remote directory does not have the file
pluginFilePathsDict[filePath] = filePathDict
else:
fSize = filePathDict.get('Size')
sfSize = storageFilePathDict.get('Size')
fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('fileSize')
# check size
if not fSize or not sfSize or fSize != sfSize:
pluginFilePathsDict[filePath] = filePathDict
else:
# sizes are the same, check modify time
mTime = filePathDict.get('Modify')
sfTime = storageFilePathDict.get('Modify')
if not mTime or not sfTime or mTime > sfTime:
mTime = filePathDict.get('fileModificationTime')
smTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not smTime or mTime > smTime:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
......
......@@ -10,12 +10,13 @@ import urlparse
class FtpUtility:
def __init__(self, host, port, username=None, password=None):
def __init__(self, host, port, username=None, password=None, serverUsesUtcTime=True):
self.host = host
self.port = port
self.username = username
self.password = password
self.ftpClient = None
self.serverUsesUtcTime = serverUsesUtcTime
self.mlsdFileStatDict = {}
@classmethod
......@@ -47,9 +48,11 @@ class FtpUtility:
return logger
def __parseKeyValue(cls, keyValue, outputDict={}):
key,value = keyValue.split('=')
def __parseKeyValue(cls, origKeyValue, outputDict={}, newKey=None):
key,value = origKeyValue.split('=')
value = value.strip()
if newKey is not None:
key = newKey
outputDict[key] = value
return outputDict
......@@ -59,7 +62,7 @@ class FtpUtility:
parseDict = {}
self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict)
self.__parseKeyValue(parts[2], parseDict, 'fileSize')
name = parts[-1].strip()
parseDict['Name'] = name
type = parseDict.get('Type', '')
......@@ -74,23 +77,39 @@ class FtpUtility:
parseDict = {}
self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict)
self.__parseKeyValue(parts[2], parseDict, 'fileSize')
name = parts[-1].strip()
self.mlsdFileStatDict[name] = parseDict
def getFiles(self, dirPath, fileDict={}):
def __processFileStatDict(self, fileStatDict):
modifyTime = fileStatDict.get('Modify')
modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S'))
if self.serverUsesUtcTime:
modifyTime = TimeUtility.utcToLocalTime(modifyTime)
fileStatDict['fileModificationTime'] = modifyTime
fileStatDict['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime)
fileStatDict['fileSize'] = int(fileStatDict.get('fileSize'))
del fileStatDict['Modify']
del fileStatDict['Type']
def getFiles(self, dirPath, fileDict={}, replacementDirPath=None):
if not self.ftpClient:
self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password)
# Need these to be class members for the callback function
self.mlsdFileDict = {}
self.mlsdDirList = []
self.ftpClient.retrlines('MLSD %s' % dirPath, self.__parseMlsdOutput)
if not replacementDirPath:
replacementDirPath = dirPath
for (fileName,fileInfo) in self.mlsdFileDict.items():
filePath = '%s/%s' % (dirPath, fileName)
self.__processFileStatDict(fileInfo)
del fileInfo['Name']
filePath = '%s/%s' % (replacementDirPath, fileName)
fileDict[filePath] = fileInfo
for d in copy.copy(self.mlsdDirList):
dirPath2 = '%s/%s' % (dirPath,d)
self.getFiles(dirPath2,fileDict)
replacementDirPath2 = '%s/%s' % (replacementDirPath,d)
self.getFiles(dirPath2,fileDict, replacementDirPath2)
return fileDict
def getMd5Sum(self, filePath, fileInfo={}):
......@@ -108,11 +127,8 @@ class FtpUtility:
self.ftpClient.retrlines('MLSD %s' % filePath, self.__parseMlsdFileStat)
fileStatDict = self.mlsdFileStatDict.get(fileName)
if fileStatDict:
fileInfo['fileSize'] = fileStatDict.get('Size')
modifyTime = fileStatDict.get('Modify')
modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S'))
fileInfo['fileModificationTime'] = modifyTime
fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime)
self.__processFileStatDict(fileStatDict)
fileInfo.update(fileStatDict)
del self.mlsdFileStatDict[fileName]
return fileInfo
......@@ -123,5 +139,7 @@ if __name__ == '__main__':
ftpUtility = FtpUtility('s8dserv', 2811)
files = ftpUtility.getFiles('/export/8-id-i/test')
print files
files = ftpUtility.getFiles('/export/8-id-i/test', replacementDirPath='/data/testing/8-id-i')
print files
print ftpUtility.getMd5Sum('/export/8-id-i/test/testfile01')
print ftpUtility.statFile('/export/8-id-i/test/testfile01')
......@@ -45,6 +45,10 @@ class TimeUtility:
def utcToLocalTime(cls, utc):
if cls.UTC_MINUS_LOCAL_TIME is None:
cls.UTC_MINUS_LOCAL_TIME = (datetime.datetime.utcnow()-datetime.datetime.now()).total_seconds()
if cls.UTC_MINUS_LOCAL_TIME > 0:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME+0.5)
else:
cls.UTC_MINUS_LOCAL_TIME = int(cls.UTC_MINUS_LOCAL_TIME-0.5)
localTime = utc - cls.UTC_MINUS_LOCAL_TIME
return localTime
......
......@@ -38,9 +38,9 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent):
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('Modify', '')
oldModifyTime = oldFileInfo.get('fileModificationTime', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('Modify')
modifyTime = fileInfo.get('fileModificationTime')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment