#!/usr/bin/env python import os import time from dm.common.utility.loggingManager import LoggingManager from dm.common.objects.observedFile import ObservedFile from dm.common.utility.timeUtility import TimeUtility from dm.common.processing.plugins.fileProcessor import FileProcessor from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi class MongoDbFileCatalogPlugin(FileProcessor): def __init__(self, dependsOn=[]): FileProcessor.__init__(self, dependsOn=dependsOn) self.fileMongoDbApi = FileMongoDbApi() self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName)) daqInfo = fileInfo.get('daqInfo') storageDirectory = daqInfo.get('storageDirectory') storageHost = daqInfo.get('storageHost') storageUrl = daqInfo.get('storageUrl') storageFilePath = os.path.join(storageDirectory, experimentFilePath) fileInfo['fileProcessingTime'] = time.time() # Prepare catalogging entry fileInfo2 = {} for key in ['md5Sum', 'fileSize']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo.get(key, '') for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']: if fileInfo.has_key(key): t = fileInfo.get(key, 0) fileInfo2[key] = t key2 = '%sstamp' % key fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t) fileLocations = fileInfo.get('fileLocations', []) fileLocations.append('%s/%s' % (storageUrl, experimentFilePath)) fileInfo2['fileName'] = os.path.basename(experimentFilePath) fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileInfo2['fileLocations'] = fileLocations self.logger.debug('Daq info: %s' % (daqInfo)) fileInfo2.update(daqInfo) if daqInfo.has_key('id'): fileInfo2['daqId'] = daqInfo.get('id') del fileInfo2['id'] for key in ['storageDirectory', 'storageUrl', 'storageHost']: if fileInfo2.has_key(key): del fileInfo2[key] self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2))) self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2) def processDirectory(self, directoryInfo): uploadInfo = directoryInfo.get('uploadInfo') daqInfo = directoryInfo.get('daqInfo') experiment = directoryInfo.get('experiment') filePathsDict = directoryInfo.get('filePathsDict') uploadId = uploadInfo.get('id') dataDirectory = uploadInfo.get('dataDirectory') nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo.update(filePathDict) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId if uploadInfo.get('status') != 'aborting': self.processFile(fileInfo) nProcessedFiles += 1 else: nCancelledFiles = nFiles - nProcessedFiles self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) processingInfo = uploadInfo.get('processingInfo') processingInfo[self.name]['status'] = 'aborted' break ####################################################################### # Testing. if __name__ == '__main__': pass