Skip to content
Snippets Groups Projects
Commit 89fe5bfd authored by sveseli's avatar sveseli
Browse files

merged from 0.13

parents 18b2d010 42c4902d
No related branches found
No related tags found
No related merge requests found
Release 0.13 (05/31/2016) Release 0.13 (05/27/2016)
============================= =============================
- Added SFTP file system observer agent - Added SFTP file system observer agent
- Enhanced MongoDB plugin with file md5 sum calculation
Release 0.12 (05/06/2016) Release 0.12 (05/06/2016)
============================= =============================
......
...@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME ...@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336 DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436 DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.13 (DM_DATE)" DM_SOFTWARE_VERSION="0.14 (DM_DATE)"
__version__ = "0.11 (2016.04.28)" __version__ = "0.13 (2016.05.27)"
...@@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton):
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads' NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries' DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds' DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
STAT_UTILITY_KEY = 'statutility'
FILE_PROCESSOR_KEY = 'fileprocessor' FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton. # Singleton.
...@@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton):
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY)) self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY)) self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility)
self.logger.debug('Creating stat utility of class %s' % className)
statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor)
# Create processors # Create processors
for (key,value) in configItems: for (key,value) in configItems:
...@@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton): ...@@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton):
self.logger.debug('Configuring file processor %s' % fileProcessor) self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
fileProcessor.statUtility = statUtility
fileProcessor.configure() fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys() self.fileProcessorKeyList = self.fileProcessorDict.keys()
......
...@@ -10,6 +10,7 @@ class FileProcessor: ...@@ -10,6 +10,7 @@ class FileProcessor:
self.configDict = {} self.configDict = {}
self.processorName = self.__class__.__name__ self.processorName = self.__class__.__name__
self.dependsOn = dependsOn self.dependsOn = dependsOn
self.statUtility = None
@abc.abstractmethod @abc.abstractmethod
def processDirectory(self, directoryInfo): def processDirectory(self, directoryInfo):
......
...@@ -12,17 +12,20 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory ...@@ -12,17 +12,20 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin): class GridftpFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2' #DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 5 -sync -sync-level 2' #DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
DEFAULT_PORT = 2811 DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi() self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal self.deleteOriginal = deleteOriginal
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles self.pluginMustProcessFiles = pluginMustProcessFiles
def getSrcUrl(self, filePath, dataDirectory): def getSrcUrl(self, filePath, dataDirectory):
...@@ -99,12 +102,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -99,12 +102,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = self.getSrcUrl(filePath, dataDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum # Calculate checksum
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) statUtility = self.statUtility
ftpUtility = FtpUtility(host, port) if not statUtility:
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
statUtility = FtpUtility(host, port)
if not fileInfo.get('fileSize'): if not fileInfo.get('fileSize'):
ftpUtility.statFile(filePath, fileInfo) statUtility.statFile(filePath, fileInfo)
if self.localMd5Sum: if self.localMd5Sum:
ftpUtility.getMd5Sum(filePath, fileInfo) statUtility.getMd5Sum(filePath, fileInfo)
# Transfer file # Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
...@@ -160,7 +165,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ...@@ -160,7 +165,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
# Transfer directory # Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo) self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
####################################################################### #######################################################################
# Testing. # Testing.
......
...@@ -13,12 +13,27 @@ class MongoDbFileCatalogPlugin(FileProcessor): ...@@ -13,12 +13,27 @@ class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]): def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn) FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi() self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processStat(self, filePath, fileInfo={}):
if not self.statUtility:
return fileInfo
if not fileInfo.has_key('fileSize'):
self.statUtility.statFile(filePath)
return fileInfo
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}): def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand: if not self.hdf5MetadataCommand:
return fileInfo return fileInfo
...@@ -42,6 +57,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): ...@@ -42,6 +57,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
fileInfo[key] = value fileInfo[key] = value
else: else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName)) self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo): def processFile(self, fileInfo):
filePath = fileInfo.get('filePath') filePath = fileInfo.get('filePath')
...@@ -84,7 +100,9 @@ class MongoDbFileCatalogPlugin(FileProcessor): ...@@ -84,7 +100,9 @@ class MongoDbFileCatalogPlugin(FileProcessor):
if fileInfo2.has_key(key): if fileInfo2.has_key(key):
del fileInfo2[key] del fileInfo2[key]
self.processStat(filePath, fileInfo2)
self.processHdf5Metadata(filePath, fileInfo2) self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
......
...@@ -22,11 +22,8 @@ class FileUtility: ...@@ -22,11 +22,8 @@ class FileUtility:
fileInfo['filePath'] = filePath fileInfo['filePath'] = filePath
fileInfo['fileSize'] = statResult[stat.ST_SIZE] fileInfo['fileSize'] = statResult[stat.ST_SIZE]
fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME] fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME]
fileInfo['fileCreationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_CTIME])
fileInfo['fileAccessTime'] = statResult[stat.ST_ATIME] fileInfo['fileAccessTime'] = statResult[stat.ST_ATIME]
fileInfo['fileAccessTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_ATIME])
fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME] fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME]
fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_MTIME])
return fileInfo return fileInfo
####################################################################### #######################################################################
......
...@@ -69,7 +69,6 @@ class SftpUtility: ...@@ -69,7 +69,6 @@ class SftpUtility:
filePath = '%s/%s' % (replacementDirPath, fileName) filePath = '%s/%s' % (replacementDirPath, fileName)
fileInfo = {'fileSize' : attr.st_size, fileInfo = {'fileSize' : attr.st_size,
'fileModificationTime' : attr.st_mtime } 'fileModificationTime' : attr.st_mtime }
fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(attr.st_mtime)
fileDict[filePath] = fileInfo fileDict[filePath] = fileInfo
return fileDict return fileDict
...@@ -80,6 +79,14 @@ class SftpUtility: ...@@ -80,6 +79,14 @@ class SftpUtility:
fileInfo['md5Sum'] = md5Sum fileInfo['md5Sum'] = md5Sum
return md5Sum return md5Sum
def statFile(self, filePath, fileInfo={}):
if not self.sftpClient:
self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey)
attr = self.sftpClient.stat(filePath)
fileInfo['fileSize'] = attr.st_size
fileInfo['fileModificationTime'] = attr.st_mtime
return fileInfo
####################################################################### #######################################################################
# Testing. # Testing.
...@@ -89,3 +96,4 @@ if __name__ == '__main__': ...@@ -89,3 +96,4 @@ if __name__ == '__main__':
files = sftpUtility.getFiles('/export/dm/test') files = sftpUtility.getFiles('/export/dm/test')
print files print files
print sftpUtility.getMd5Sum('/export/dm/test/testfile01') print sftpUtility.getMd5Sum('/export/dm/test/testfile01')
print sftpUtility.statFile('/export/dm/test/testfile01')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment