diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index d5728ffc0726aca2bf3e279a290c73b6bdcc1382..d1b3431e6416564bea7b1122e0e71c2f11f2fde0 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,7 +1,8 @@ -Release 0.13 (05/31/2016) +Release 0.13 (05/27/2016) ============================= - Added SFTP file system observer agent +- Enhanced MongoDB plugin with file md5 sum calculation Release 0.12 (05/06/2016) ============================= diff --git a/etc/dm.deploy.conf b/etc/dm.deploy.conf index 25b6e55f31e03d4ad5646e1fb9b8388b4c8aa3d5..757f36267251d588e3f54e1ddd756bf081a7969e 100644 --- a/etc/dm.deploy.conf +++ b/etc/dm.deploy.conf @@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME DM_DAQ_WEB_SERVICE_PORT=33336 DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME DM_CAT_WEB_SERVICE_PORT=44436 -DM_SOFTWARE_VERSION="0.13 (DM_DATE)" +DM_SOFTWARE_VERSION="0.14 (DM_DATE)" diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py index 16317eac2df340749b47ecd487d7739795e08c6b..f024d30ee0540f374a9d103799c897379f732b69 100644 --- a/src/python/dm/__init__.py +++ b/src/python/dm/__init__.py @@ -1 +1 @@ -__version__ = "0.11 (2016.04.28)" +__version__ = "0.13 (2016.05.27)" diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index dde6b4754be052ae81392ab89eb77b7e04077f58..76aebefb7becf2c27cda684b870186b8326d460b 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -18,6 +18,7 @@ class FileProcessingManager(threading.Thread,Singleton): NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads' DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries' DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds' + STAT_UTILITY_KEY = 'statutility' FILE_PROCESSOR_KEY = 'fileprocessor' # Singleton. @@ -51,6 +52,11 @@ class FileProcessingManager(threading.Thread,Singleton): self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY)) self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY)) self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY)) + statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY) + if statUtility: + (moduleName,className,constructor) = cm.getModuleClassConstructorTuple(statUtility) + self.logger.debug('Creating stat utility of class %s' % className) + statUtility = ObjectUtility.createObjectInstance(moduleName, className, constructor) # Create processors for (key,value) in configItems: @@ -61,6 +67,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Configuring file processor %s' % fileProcessor) fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries) fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds) + fileProcessor.statUtility = statUtility fileProcessor.configure() self.fileProcessorDict[key] = fileProcessor self.fileProcessorKeyList = self.fileProcessorDict.keys() diff --git a/src/python/dm/common/processing/plugins/fileProcessor.py b/src/python/dm/common/processing/plugins/fileProcessor.py index 435c81598a445b80a697d13f023e58f4741bf39b..853452ffa5f1542acb584442b5867aa9de4c26a8 100755 --- a/src/python/dm/common/processing/plugins/fileProcessor.py +++ b/src/python/dm/common/processing/plugins/fileProcessor.py @@ -10,6 +10,7 @@ class FileProcessor: self.configDict = {} self.processorName = self.__class__.__name__ self.dependsOn = dependsOn + self.statUtility = None @abc.abstractmethod def processDirectory(self, directoryInfo): diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 40f1c7baf8bfe25c405c3ac35ab51b6e29abbbfb..8c2d1062cfe43f61cf5e3610d5a32450ff7a587c 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -12,17 +12,20 @@ from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory class GridftpFileTransferPlugin(FileTransferPlugin): - DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 1 -sync -sync-level 2' - DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -vb -tcp-bs 512K -p 5 -sync -sync-level 2' + #DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2' + #DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2' + DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8' + DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8' DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]): FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.directoryTransferCommand = directoryTransferCommand self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): @@ -99,12 +102,14 @@ class GridftpFileTransferPlugin(FileTransferPlugin): srcUrl = self.getSrcUrl(filePath, dataDirectory) # Calculate checksum - (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) - ftpUtility = FtpUtility(host, port) + statUtility = self.statUtility + if not statUtility: + (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) + statUtility = FtpUtility(host, port) if not fileInfo.get('fileSize'): - ftpUtility.statFile(filePath, fileInfo) + statUtility.statFile(filePath, fileInfo) if self.localMd5Sum: - ftpUtility.getMd5Sum(filePath, fileInfo) + statUtility.getMd5Sum(filePath, fileInfo) # Transfer file self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) @@ -160,7 +165,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin): # Transfer directory self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) - self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo) + self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo) ####################################################################### # Testing. diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 64ea7638e351b7a8e3801a263eadf51718aff871..f6b6106973d6dbd0820c3e6c6306993f24b69fdf 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -13,12 +13,27 @@ class MongoDbFileCatalogPlugin(FileProcessor): DEFAULT_HDF5_METADATA_COMMAND = None - def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, dependsOn=[]): + def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]): FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.hdf5MetadataCommand = hdf5MetadataCommand + self.md5Sum = md5Sum self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + def processStat(self, filePath, fileInfo={}): + if not self.statUtility: + return fileInfo + if not fileInfo.has_key('fileSize'): + self.statUtility.statFile(filePath) + return fileInfo + + def processMd5Sum(self, filePath, fileInfo={}): + if not self.md5Sum or not self.statUtility: + return fileInfo + if not fileInfo.has_key('md5Sum'): + fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath) + return fileInfo + def processHdf5Metadata(self, filePath, fileInfo={}): if not self.hdf5MetadataCommand: return fileInfo @@ -42,6 +57,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): fileInfo[key] = value else: self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName)) + return fileInfo def processFile(self, fileInfo): filePath = fileInfo.get('filePath') @@ -84,7 +100,9 @@ class MongoDbFileCatalogPlugin(FileProcessor): if fileInfo2.has_key(key): del fileInfo2[key] + self.processStat(filePath, fileInfo2) self.processHdf5Metadata(filePath, fileInfo2) + self.processMd5Sum(filePath, fileInfo2) self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) diff --git a/src/python/dm/common/utility/fileUtility.py b/src/python/dm/common/utility/fileUtility.py index f4f4c7d6c2aba41515f91a6271337be481f3d197..55b49ea3dd222aef8d37d905fe5526caa9c51870 100755 --- a/src/python/dm/common/utility/fileUtility.py +++ b/src/python/dm/common/utility/fileUtility.py @@ -22,11 +22,8 @@ class FileUtility: fileInfo['filePath'] = filePath fileInfo['fileSize'] = statResult[stat.ST_SIZE] fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME] - fileInfo['fileCreationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_CTIME]) fileInfo['fileAccessTime'] = statResult[stat.ST_ATIME] - fileInfo['fileAccessTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_ATIME]) fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME] - fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_MTIME]) return fileInfo ####################################################################### diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index 64d9881cba55e874241e74e9cd41a40cf96d8678..2b4ef86816126c0aec3bc27c180dd981ae6599ea 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -69,7 +69,6 @@ class SftpUtility: filePath = '%s/%s' % (replacementDirPath, fileName) fileInfo = {'fileSize' : attr.st_size, 'fileModificationTime' : attr.st_mtime } - fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(attr.st_mtime) fileDict[filePath] = fileInfo return fileDict @@ -80,6 +79,14 @@ class SftpUtility: fileInfo['md5Sum'] = md5Sum return md5Sum + def statFile(self, filePath, fileInfo={}): + if not self.sftpClient: + self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password, self.privateKey) + attr = self.sftpClient.stat(filePath) + fileInfo['fileSize'] = attr.st_size + fileInfo['fileModificationTime'] = attr.st_mtime + return fileInfo + ####################################################################### # Testing. @@ -89,3 +96,4 @@ if __name__ == '__main__': files = sftpUtility.getFiles('/export/dm/test') print files print sftpUtility.getMd5Sum('/export/dm/test/testfile01') + print sftpUtility.statFile('/export/dm/test/testfile01')