diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 860293487d420b55715f221ed080b5de51353761..5f338b85f5066233f96c3c42a9c0a96b0d0b2c3e 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -4,6 +4,7 @@ import os from fileTransferPlugin import FileTransferPlugin from dm.common.utility.fileUtility import FileUtility from dm.common.utility.ftpUtility import FtpUtility +from dm.common.utility.sftpUtility import SftpUtility from dm.common.exceptions.fileProcessingError import FileProcessingError from dm.common.utility.dmSubprocess import DmSubprocess from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory @@ -44,7 +45,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): dataDirectory = uploadInfo['dataDirectory'] self.logger.debug('Upload info: %s', uploadInfo) #(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT) - ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT) + ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}) pluginFilePathsDict = {} filePaths = filePathsDict.keys() @@ -57,15 +58,15 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # remote directory does not have the file pluginFilePathsDict[filePath] = filePathDict else: - fSize = filePathDict.get('Size') - sfSize = storageFilePathDict.get('Size') + fSize = filePathDict.get('fileSize') + sfSize = storageFilePathDict.get('fileSize') # check size if not fSize or not sfSize or fSize != sfSize: pluginFilePathsDict[filePath] = filePathDict else: # sizes are the same, check modify time - mTime = filePathDict.get('Modify') - sfTime = storageFilePathDict.get('Modify') + mTime = filePathDict.get('fileModificationTime') + sfTime = storageFilePathDict.get('fileModificationTime') if not mTime or not sfTime or mTime > sfTime: pluginFilePathsDict[filePath] = filePathDict diff --git a/src/python/dm/common/utility/ftpUtility.py b/src/python/dm/common/utility/ftpUtility.py index 8d22e1e48a1a9b2d751be07ea95281dbdecbaa1e..8df82f43ed8b51941c7f70afe14159fefcf163ce 100755 --- a/src/python/dm/common/utility/ftpUtility.py +++ b/src/python/dm/common/utility/ftpUtility.py @@ -10,13 +10,13 @@ import urlparse class FtpUtility: - def __init__(self, host, port, username=None, password=None, utcTimeStamps=True): + def __init__(self, host, port, username=None, password=None, serverUsesUtcTime=True): self.host = host self.port = port self.username = username self.password = password self.ftpClient = None - self.utcTimeStamps = utcTimeStamps + self.serverUsesUtcTime = serverUsesUtcTime self.mlsdFileStatDict = {} @classmethod @@ -48,9 +48,11 @@ class FtpUtility: return logger - def __parseKeyValue(cls, keyValue, outputDict={}): - key,value = keyValue.split('=') + def __parseKeyValue(cls, origKeyValue, outputDict={}, newKey=None): + key,value = origKeyValue.split('=') value = value.strip() + if newKey is not None: + key = newKey outputDict[key] = value return outputDict @@ -60,7 +62,7 @@ class FtpUtility: parseDict = {} self.__parseKeyValue(parts[0], parseDict) self.__parseKeyValue(parts[1], parseDict) - self.__parseKeyValue(parts[2], parseDict) + self.__parseKeyValue(parts[2], parseDict, 'fileSize') name = parts[-1].strip() parseDict['Name'] = name type = parseDict.get('Type', '') @@ -75,10 +77,21 @@ class FtpUtility: parseDict = {} self.__parseKeyValue(parts[0], parseDict) self.__parseKeyValue(parts[1], parseDict) - self.__parseKeyValue(parts[2], parseDict) + self.__parseKeyValue(parts[2], parseDict, 'fileSize') name = parts[-1].strip() self.mlsdFileStatDict[name] = parseDict + def __processFileStatDict(self, fileStatDict): + modifyTime = fileStatDict.get('Modify') + modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S')) + if self.serverUsesUtcTime: + modifyTime = TimeUtility.utcToLocalTime(modifyTime) + fileStatDict['fileModificationTime'] = modifyTime + fileStatDict['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime) + fileStatDict['fileSize'] = int(fileStatDict.get('fileSize')) + del fileStatDict['Modify'] + del fileStatDict['Type'] + def getFiles(self, dirPath, fileDict={}): if not self.ftpClient: self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password) @@ -87,6 +100,8 @@ class FtpUtility: self.mlsdDirList = [] self.ftpClient.retrlines('MLSD %s' % dirPath, self.__parseMlsdOutput) for (fileName,fileInfo) in self.mlsdFileDict.items(): + self.__processFileStatDict(fileInfo) + del fileInfo['Name'] filePath = '%s/%s' % (dirPath, fileName) fileDict[filePath] = fileInfo for d in copy.copy(self.mlsdDirList): @@ -109,13 +124,8 @@ class FtpUtility: self.ftpClient.retrlines('MLSD %s' % filePath, self.__parseMlsdFileStat) fileStatDict = self.mlsdFileStatDict.get(fileName) if fileStatDict: - fileInfo['fileSize'] = fileStatDict.get('Size') - modifyTime = fileStatDict.get('Modify') - modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S')) - if self.utcTimeStamps: - modifyTime = TimeUtility.utcToLocalTime(modifyTime) - fileInfo['fileModificationTime'] = modifyTime - fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime) + self.__processFileStatDict(fileStatDict) + fileInfo.update(fileStatDict) del self.mlsdFileStatDict[fileName] return fileInfo diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index db860c2abc61ed523185a7121dd846397eccb0d5..46598e57017bac0d1799b86fa851e239b23dd78c 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -3,6 +3,7 @@ import copy import stat import pysftp +from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.loggingManager import LoggingManager import urlparse @@ -63,7 +64,9 @@ class SftpUtility: if stat.S_ISDIR(mode): self.getFiles(fullPath, fileDict) elif stat.S_ISREG(mode): - fileInfo = {'filePath' : fullPath, 'fileSize' : attr.st_size, } + fileInfo = {'filePath' : fullPath, 'fileSize' : attr.st_size, + 'fileModificationTime' : attr.st_mtime } + fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(attr.st_mtime) fileDict[fullPath] = fileInfo return fileDict @@ -71,6 +74,6 @@ class SftpUtility: # Testing. if __name__ == '__main__': - sftpUtility = SftpUtility('dmstorage', username='dm') - files = sftpUtility.getFiles('/opt/DM/data/ESAF/e1') + sftpUtility = SftpUtility('xstor-devel', username='dmadmin') + files = sftpUtility.getFiles('/data/testing/test1') print files diff --git a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py index b70cbd70e05df9453715cb18bbec3573d6d15bf6..e7f9c079b94f2f2c086f80235d022141be812dd5 100755 --- a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py +++ b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py @@ -38,9 +38,9 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent): else: # old file, check timestamp oldFileInfo = oldFileDict.get(filePath) - oldModifyTime = oldFileInfo.get('Modify', '') + oldModifyTime = oldFileInfo.get('fileModificationTime', '') fileInfo = fileDict.get(filePath) - modifyTime = fileInfo.get('Modify') + modifyTime = fileInfo.get('fileModificationTime') if modifyTime != oldModifyTime: # file has been modified, need to process it self.logger.debug('Modified file path detected: %s' % filePath)