Forked from
DM / dm-docs
261 commits behind, 732 commits ahead of the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
mongoDbFileCatalogPlugin.py 5.81 KiB
#!/usr/bin/env python
import os
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processStat(self, filePath, fileInfo={}):
if not self.statUtility:
return fileInfo
if not fileInfo.has_key('fileSize'):
self.statUtility.statFile(filePath)
return fileInfo
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand:
return fileInfo
experimentName = fileInfo.get('experimentName', '')
if not filePath.endswith('.h5'):
return fileInfo
command = '%s %s' % (self.hdf5MetadataCommand, filePath)
subprocess = DmSubprocess.getSubprocess(command)
subprocess.run()
stdout = subprocess.getStdOut().replace('\n', ';')
parts = stdout.split(';')
for part in parts:
keyValue = part.split('=')
key = keyValue[0]
if not len(key):
continue
value = ''
if len(keyValue) > 1:
value = '='.join(keyValue[1:])
if not fileInfo.has_key(key):
fileInfo[key] = value
else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
storageHost = daqInfo.get('storageHost')
storageUrl = daqInfo.get('storageUrl')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['fileProcessingTime'] = time.time()
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']:
if fileInfo.has_key(key):
t = fileInfo.get(key, 0)
fileInfo2[key] = t
key2 = '%sstamp' % key
fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t)
fileLocations = fileInfo.get('fileLocations', [])
fileLocations.append('%s/%s' % (storageUrl, experimentFilePath))
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['fileLocations'] = fileLocations
self.logger.debug('Daq info: %s' % (daqInfo))
fileInfo2.update(daqInfo)
if daqInfo.has_key('id'):
fileInfo2['daqId'] = daqInfo.get('id')
del fileInfo2['id']
for key in ['storageDirectory', 'storageUrl', 'storageHost']:
if fileInfo2.has_key(key):
del fileInfo2[key]
self.processStat(filePath, fileInfo2)
self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
daqInfo = directoryInfo.get('daqInfo')
experiment = directoryInfo.get('experiment')
filePathsDict = directoryInfo.get('filePathsDict')
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
if uploadInfo.get('status') != dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.processFile(fileInfo)
nProcessedFiles += 1
else:
nCancelledFiles = nFiles - nProcessedFiles
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
processingInfo = uploadInfo.get('processingInfo')
processingInfo[self.name]['status'] = 'aborted'
break
#######################################################################
# Testing.
if __name__ == '__main__':
pass