Skip to content
Snippets Groups Projects
Forked from DM / dm-docs
261 commits behind, 664 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
mongoDbFileCatalogPlugin.py 3.83 KiB
#!/usr/bin/env python

import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi

class MongoDbFileCatalogPlugin(FileProcessor):

    def __init__(self, dependsOn=[]):
        FileProcessor.__init__(self, dependsOn=dependsOn)
        self.fileMongoDbApi = FileMongoDbApi()
        self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)

    def processFile(self, fileInfo):
        experimentFilePath = fileInfo.get('experimentFilePath')
        experimentName = fileInfo.get('experimentName')
        self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))

        daqInfo = fileInfo.get('daqInfo')
        storageDirectory = daqInfo.get('storageDirectory')
        storageHost = daqInfo.get('storageHost')
        storageUrl = daqInfo.get('storageUrl')
        storageFilePath = os.path.join(storageDirectory, experimentFilePath)
        fileInfo['fileProcessingTime'] = time.time()

        # Prepare catalogging entry
        fileInfo2 = {}
        for key in ['md5Sum', 'fileSize']:
            if fileInfo.has_key(key):
                fileInfo2[key] = fileInfo.get(key, '')
        for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']:
            if fileInfo.has_key(key):
                t = fileInfo.get(key, 0)
                fileInfo2[key] = t
                key2 = '%sstamp' % key
                fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t)

        fileLocations = fileInfo.get('fileLocations', [])
        fileLocations.append('%s/%s' % (storageUrl, experimentFilePath))

        fileInfo2['fileName'] = os.path.basename(experimentFilePath)
        fileInfo2['experimentFilePath'] = experimentFilePath
        fileInfo2['experimentName'] = experimentName
        fileInfo2['fileLocations'] = fileLocations
        self.logger.debug('Daq info: %s' % (daqInfo))
        fileInfo2.update(daqInfo)
        if daqInfo.has_key('id'): 
            fileInfo2['daqId'] = daqInfo.get('id')
            del fileInfo2['id']
        for key in ['storageDirectory', 'storageUrl', 'storageHost']:
            if fileInfo2.has_key(key):
                del fileInfo2[key]

        self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
        self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)

    def processDirectory(self, directoryInfo):
        uploadInfo = directoryInfo.get('uploadInfo')
        daqInfo = directoryInfo.get('daqInfo')
        experiment = directoryInfo.get('experiment')
        filePathsDict = directoryInfo.get('filePathsDict')

        uploadId = uploadInfo.get('id')
        dataDirectory = uploadInfo.get('dataDirectory')

        nProcessedFiles = 0
        nFiles = len(filePathsDict)
        for (filePath,filePathDict) in filePathsDict.items():
            fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
            fileInfo.update(filePathDict)
            fileInfo['daqInfo'] = daqInfo
            fileInfo['uploadId'] = uploadId

            if uploadInfo.get('status') != 'aborting':
                self.processFile(fileInfo)
                nProcessedFiles += 1
            else:
                nCancelledFiles = nFiles - nProcessedFiles
                self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
                processingInfo = uploadInfo.get('processingInfo')
                processingInfo[self.name]['status'] = 'aborted'
                break

#######################################################################
# Testing.
if __name__ == '__main__':
    pass