Commit e19a3e71 authored by sveseli's avatar sveseli
Browse files

Merge branch '0.9.4' into release/0.9

parents bf405af6 fc0e720f
Release 0.13 (05/27/2016)
=============================
- Added SFTP file system observer agent
- Enhanced MongoDB plugin with file md5 sum calculation
Release 0.12 (05/06/2016)
=============================
......
......@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.12 (DM_DATE)"
DM_SOFTWARE_VERSION="0.13 (DM_DATE)"
__version__ = "0.11 (2016.04.28)"
__version__ = "0.13 (2016.05.27)"
......@@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton):
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
STAT_UTILITY_KEY = 'statutility'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
......@@ -51,6 +52,9 @@ class FileProcessingManager(threading.Thread,Singleton):
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility)
# Create processors
for (key,value) in configItems:
......@@ -61,6 +65,11 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
statUtilityObject = None
if statUtility:
statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
self.logger.debug('Using stat utility object %s' % str(statUtilityObject))
fileProcessor.statUtility = statUtilityObject
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
......
......@@ -10,6 +10,7 @@ class FileProcessor:
self.configDict = {}
self.processorName = self.__class__.__name__
self.dependsOn = dependsOn
self.statUtility = None
@abc.abstractmethod
def processDirectory(self, directoryInfo):
......
......@@ -12,19 +12,32 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 5 -sync -sync-level 2'
#DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
#DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles
def replaceSpecialCharacters(self, url):
replacementMap = {
'#' : '%23',
' ' : '%20',
'~' : '%7E',
}
for (original, replacement) in replacementMap.items():
url = url.replace(original,replacement)
return url
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
......@@ -33,7 +46,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % (self.src,filePath)
return srcUrl
return self.replaceSpecialCharacters(srcUrl)
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
......@@ -43,7 +56,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return destUrl
return self.replaceSpecialCharacters(destUrl)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
......@@ -54,6 +67,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Replacement dir path: %s' % replacementDirPath)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
self.logger.debug('Looking for existing files in %s' % storageDirectory)
ftpUtility = SftpUtility(storageHost)
......@@ -99,12 +113,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(host, port)
statUtility = self.statUtility
if not statUtility:
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(srcUrl, defaultPort=self.DEFAULT_PORT)
statUtility = FtpUtility(host, port)
if not fileInfo.get('fileSize'):
ftpUtility.statFile(filePath, fileInfo)
statUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
ftpUtility.getMd5Sum(filePath, fileInfo)
statUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
......@@ -160,7 +176,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo)
self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
#######################################################################
# Testing.
......
......@@ -13,12 +13,27 @@ class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]):
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processStat(self, filePath, fileInfo={}):
if not self.statUtility:
return fileInfo
if not fileInfo.has_key('fileSize'):
self.statUtility.statFile(filePath)
return fileInfo
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand:
return fileInfo
......@@ -42,6 +57,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
fileInfo[key] = value
else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
......@@ -84,7 +100,9 @@ class MongoDbFileCatalogPlugin(FileProcessor):
if fileInfo2.has_key(key):
del fileInfo2[key]
self.processStat(filePath, fileInfo2)
self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
......
......@@ -60,7 +60,6 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
if not fileInfo.get('fileSize'):
FileUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
......
......@@ -22,11 +22,8 @@ class FileUtility:
fileInfo['filePath'] = filePath
fileInfo['fileSize'] = statResult[stat.ST_SIZE]
fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME]
fileInfo['fileCreationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_CTIME])
fileInfo['fileAccessTime'] = statResult[stat.ST_ATIME]
fileInfo['fileAccessTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_ATIME])
fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME]
fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_MTIME])
return fileInfo
#######################################################################
......
......@@ -167,11 +167,12 @@ class FtpUtility:
if __name__ == '__main__':
print "Round 1: "
ftpUtility = FtpUtility('s7dserv', 2811)
ftpUtility = FtpUtility('s33dserv', 2811)
#files = ftpUtility.getFiles2('/export/7IDSprayimage/Cummins/Data')
files = ftpUtility.getFiles2('/export/dm/test')
print files
files = ftpUtility.getFiles('/export/dm/test')
print ftpUtility.parseFtpUrl('/export/dm/test')
print files
#files = ftpUtility.getFiles('/export/7IDSprayimage/Cummins/Data')
#files = ftpUtility.getFiles2('/export/8-id-i/test', replacementDirPath='/data/testing/8-id-i')
......@@ -187,7 +188,7 @@ if __name__ == '__main__':
#print "Round 2: "
#ftpUtility = FtpUtility('s7dserv', 2811)
#files = ftpUtility.getFiles2('/export/7IDSprayimage/Cummins/Data')
#print ftpUtility.getMd5Sum('/export/8-id-i/test/testfile01')
print ftpUtility.getMd5Sum('/export/dm/test/testfile01')
#print ftpUtility.statFile('/export/8-id-i/test/testfile01')
#ftpUtility = FtpUtility('xstor-devel', 22)
#files = ftpUtility.getFiles2('/data/testing')
......
#!/usr/bin/env python
import threading
import copy
import stat
import pysftp
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.loggingManager import LoggingManager
from dm.common.exceptions.commandFailed import CommandFailed
import urlparse
class SftpUtility:
DEFAULT_PORT = 22
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None):
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None):
self.host = host
self.port = port
self.username = username
self.password = password
self.privateKey = privateKey
self.sftpClient = None
self.lock = threading.RLock()
@classmethod
def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
......@@ -35,8 +39,8 @@ class SftpUtility:
return (scheme, host, port, dirPath)
@classmethod
def getSftpClient(cls, host, port=DEFAULT_PORT, username=None, password=None):
sftp = pysftp.Connection(host, username=username, password=password, port=port)
def getSftpClient(cls, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None):
sftp = pysftp.Connection(host, username=username, password=password, port=port, private_key=privateKey)
return sftp
@classmethod
......@@ -52,11 +56,21 @@ class SftpUtility:
return outputDict
def getFiles(self, dirPath, fileDict={}, replacementDirPath=None):
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password)
if not replacementDirPath:
replacementDirPath = dirPath
attrs = self.sftpClient.listdir_attr(dirPath)
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
if not replacementDirPath:
replacementDirPath = dirPath
try:
attrs = self.sftpClient.listdir_attr(dirPath)
except Exception, ex:
self.getLogger().error('Could not retrieve files from %s: %s' % (dirPath,ex))
self.closeConnection()
raise
finally:
self.lock.release()
for attr in attrs:
fileName = attr.filename
mode = attr.st_mode
......@@ -68,16 +82,73 @@ class SftpUtility:
filePath = '%s/%s' % (replacementDirPath, fileName)
fileInfo = {'fileSize' : attr.st_size,
'fileModificationTime' : attr.st_mtime }
fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(attr.st_mtime)
fileDict[filePath] = fileInfo
return fileDict
def getMd5Sum(self, filePath, fileInfo={}):
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
#md5Sum = self.sftpClient.execute('md5sum "%s"' % filePath)[0].split()[0]
output = self.sftpClient.execute('md5sum "%s"' % filePath)[0].strip()
if not output.endswith(filePath):
raise CommandFailed(output)
md5Sum = output.split()[0]
fileInfo['md5Sum'] = md5Sum
except CommandFailed, ex:
self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
raise
except Exception, ex:
self.getLogger().error('Could not get md5sum for file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return md5Sum
finally:
self.lock.release()
def statFile(self, filePath, fileInfo={}):
self.lock.acquire()
try:
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
try:
attr = self.sftpClient.stat(filePath)
fileInfo['fileSize'] = attr.st_size
fileInfo['fileModificationTime'] = attr.st_mtime
except Exception, ex:
self.getLogger().error('Could not get stat file %s: %s' % (filePath,ex))
self.closeConnection()
raise
return fileInfo
finally:
self.lock.release()
def closeConnection(self):
logger = self.getLogger()
self.lock.acquire()
try:
try:
if self.sftpClient:
logger.warn('Closing SFTP connection to host %s' % self.host)
self.sftpClient.close()
except Exception, ex:
logger.error('Could not close SFTP connection to host %s: %s' % (self.host, ex))
self.sftpClient = None
finally:
self.lock.release()
#######################################################################
# Testing.
if __name__ == '__main__':
sftpUtility = SftpUtility('xstor-devel', username='dmadmin')
files = sftpUtility.getFiles('/data/testing/test1')
print files
files = sftpUtility.getFiles('/data/testing/test1', replacementDirPath='/xyz/ccc')
print files
#sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12')
sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
#files = sftpUtility.getFiles('/export/dm/test')
#print files
print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
#print 'Closing connection'
#sftpUtility.closeConnection()
#print sftpUtility.statFile('/export/dm/test/testfile01')
......@@ -84,7 +84,7 @@ class ExperimentSessionController(DmSessionController):
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path.')
raise InvalidRequest('Data directory must be an absolute path: %s' % dataDirectory)
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
......
......@@ -6,7 +6,7 @@ from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
......
......@@ -6,7 +6,7 @@ from dm.common.utility.osUtility import OsUtility
class PollingFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
......
#!/usr/bin/env python
from threading import Timer
from pollingFileSystemObserverAgent import PollingFileSystemObserverAgent
from dm.common.utility.sftpUtility import SftpUtility
class SftpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_PORT = 22
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
self.host = host
self.port = port
self.username = username
self.password = password
self.privateKey = privateKey
def getFiles(self, dataDirectory):
(scheme, host, port, dirPath) = SftpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
self.logger.debug('Retrieving files from SFTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
sftpUtility = SftpUtility(host, port, self.username, self.password, self.privateKey)
return sftpUtility.getFiles(dirPath, {})
####################################################################
# Testing
if __name__ == '__main__':
import time
dirPath='/export/beams12/S1IDUSER/mnt/orthros/park_apr16_rec_reduced'
agent = SftpFileSystemObserverAgent('s1dserv', privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
print 'TIME1: ', time.time()
print 'ORIGINAL FILES: ', len(agent.getFiles(dirPath))
print 'TIME2: ', time.time()
#agent.startObservingPath('/export/dm/test', 'e1')
#time.sleep(100)
#agent.stopObservingPath('/export/dm/test', 'e1')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment