#!/usr/bin/env python import os import time from dm.common.utility.loggingManager import LoggingManager from dm.common.objects.observedFile import ObservedFile from dm.common.utility.timeUtility import TimeUtility from dm.common.processing.plugins.fileProcessor import FileProcessor from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi from dm.common.utility.dmSubprocess import DmSubprocess class MongoDbFileCatalogPlugin(FileProcessor): DEFAULT_HDF5_METADATA_COMMAND = None def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]): FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.hdf5MetadataCommand = hdf5MetadataCommand self.md5Sum = md5Sum self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) def processMd5Sum(self, filePath, fileInfo={}): if not self.md5Sum or not self.statUtility: return fileInfo if not fileInfo.has_key('md5Sum'): fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath) return fileInfo def processHdf5Metadata(self, filePath, fileInfo={}): if not self.hdf5MetadataCommand: return fileInfo experimentName = fileInfo.get('experimentName', '') if not filePath.endswith('.h5'): return fileInfo command = '%s %s' % (self.hdf5MetadataCommand, filePath) subprocess = DmSubprocess.getSubprocess(command) subprocess.run() stdout = subprocess.getStdOut().replace('\n', ';') parts = stdout.split(';') for part in parts: keyValue = part.split('=') key = keyValue[0] if not len(key): continue value = '' if len(keyValue) > 1: value = '='.join(keyValue[1:]) if not fileInfo.has_key(key): fileInfo[key] = value else: self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName)) return fileInfo def processFile(self, fileInfo): filePath = fileInfo.get('filePath') experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName)) daqInfo = fileInfo.get('daqInfo') storageDirectory = daqInfo.get('storageDirectory') storageHost = daqInfo.get('storageHost') storageUrl = daqInfo.get('storageUrl') storageFilePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['fileProcessingTime'] = time.time() # Prepare catalogging entry fileInfo2 = {} for key in ['md5Sum', 'fileSize']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo.get(key, '') for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']: if fileInfo.has_key(key): t = fileInfo.get(key, 0) fileInfo2[key] = t key2 = '%sstamp' % key fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t) fileLocations = fileInfo.get('fileLocations', []) fileLocations.append('%s/%s' % (storageUrl, experimentFilePath)) fileInfo2['fileName'] = os.path.basename(experimentFilePath) fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileInfo2['fileLocations'] = fileLocations self.logger.debug('Daq info: %s' % (daqInfo)) fileInfo2.update(daqInfo) if daqInfo.has_key('id'): fileInfo2['daqId'] = daqInfo.get('id') del fileInfo2['id'] for key in ['storageDirectory', 'storageUrl', 'storageHost']: if fileInfo2.has_key(key): del fileInfo2[key] self.processHdf5Metadata(filePath, fileInfo2) self.processMd5Sum(filePath, fileInfo2) self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) def processDirectory(self, directoryInfo): uploadInfo = directoryInfo.get('uploadInfo') daqInfo = directoryInfo.get('daqInfo') experiment = directoryInfo.get('experiment') filePathsDict = directoryInfo.get('filePathsDict') uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId if uploadInfo.get('status') != 'aborting': self.processFile(fileInfo) nProcessedFiles += 1 else: nCancelledFiles = nFiles - nProcessedFiles self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) processingInfo = uploadInfo.get('processingInfo') processingInfo[self.name]['status'] = 'aborted' break ####################################################################### # Testing. if __name__ == '__main__': pass