Skip to content
Snippets Groups Projects
Commit ef574585 authored by hammonds's avatar hammonds
Browse files

Merge branch 'master' of https://git.aps.anl.gov/hammonds/dm.git

parents faf96875 4ae70d52
No related branches found
No related tags found
No related merge requests found
Showing
with 17 additions and 46 deletions
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
......@@ -110,7 +110,7 @@ class CdbProcessingPlugin(FileProcessor):
self.__cdbLock.release()
# Add metadata
fileUrl = 'dm://extrepid/%s/%s' % (experimentName, fileName)
fileUrl = 'dm://voyager/%s/%s' % (experimentName, fileName)
fileProperty = self.itemRestApi.addPropertyValueToItemWithId(itemId=fileItem['id'],propertyTypeName=self.CDB_FILE_PROPERTY_TYPE, tag=None, value=fileUrl, units=None,description=None,isUserWriteable=None, isDynamic=None)
fileMetadata = fileInfo.get('metadata', {})
......
......@@ -63,8 +63,8 @@ class MongoDbFileCatalogPlugin(FileProcessor):
self.logger.debug('Before releasing Mongo DB API instance semaphore count is %s ' % (self.mongoApiSemaphore.__dict__.get('_Semaphore__value')))
self.mongoApiSemaphore.release()
def getInitialCollectionSize(self, fileMongoDbApi, experimentName, uploadOrDaqId):
key = '%s-%s' % (experimentName, uploadOrDaqId)
def getInitialCollectionSize(self, fileMongoDbApi, experimentName, uploadId):
key = '%s-%s' % (experimentName, uploadId)
self.lock.acquire()
try:
nFiles = self.initialCollectionSizeMap.get(key)
......@@ -78,35 +78,35 @@ class MongoDbFileCatalogPlugin(FileProcessor):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
uploadOrDaqId = fileInfo.get('uploadId') or fileInfo.get('daqId')
uploadId = fileInfo.get('uploadId')
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
fileMetadata = self.metadataCollector.processMetadata(fileInfo, self.statUtility)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileMetadata)))
fileInfo['metadata'] = fileMetadata
fileMongoDbApi = self.acquireMongoApi()
if self.getInitialCollectionSize(fileMongoDbApi, experimentName, uploadOrDaqId) == 0:
self.logger.debug('Initial collection size is zero, adding file %s' % (experimentFilePath))
if uploadId and self.getInitialCollectionSize(fileMongoDbApi, experimentName, uploadId) == 0:
self.logger.debug('Initial collection size is zero for upload %s, adding file %s' % (uploadId, experimentFilePath))
fileMongoDbApi.addExperimentFileUnchecked(fileMetadata)
else:
self.logger.debug('Initial collection size is not zero, updating file %s' % (experimentFilePath))
self.logger.debug('Updating file %s' % (experimentFilePath))
fileMongoDbApi.updateOrAddExperimentFile(fileMetadata)
self.releaseMongoApi(fileMongoDbApi)
def processFile2(self, fileInfo, fileMongoDbApi):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
uploadOrDaqId = fileInfo.get('uploadId') or fileInfo.get('daqId')
uploadId = fileInfo.get('uploadId')
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
fileMetadata = self.metadataCollector.processMetadata(fileInfo, self.statUtility)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileMetadata)))
fileInfo['metadata'] = fileMetadata
if self.getInitialCollectionSize(fileMongoDbApi, experimentName, uploadOrDaqId) == 0:
self.logger.debug('Initial collection size is zero, adding file %s' % (experimentFilePath))
if uploadId and self.getInitialCollectionSize(fileMongoDbApi, experimentName, uploadId) == 0:
self.logger.debug('Initial collection size is zero for upload %s, adding file %s' % (uploadId, experimentFilePath))
fileMongoDbApi.addExperimentFileUnchecked(fileMetadata)
else:
self.logger.debug('Initial collection size is not zero, updating file %s' % (experimentFilePath))
self.logger.debug('Updating file %s' % (experimentFilePath))
fileMongoDbApi.updateOrAddExperimentFile(fileMetadata)
def processFilePath(self, fileMongoDbApi, filePath, filePathDict, experiment, dataDirectory, destDirectory, daqInfo, uploadId, processDirectoryInfo):
......@@ -162,3 +162,8 @@ class MongoDbFileCatalogPlugin(FileProcessor):
processingTimer = threading.Timer(self.PROCESSING_TIMER_DELAY_PERIOD, self.processFilePath, args=[fileMongoDbApi, filePath, filePathDict, experiment, dataDirectory, destDirectory, daqInfo, uploadId, processDirectoryInfo])
processingTimer.start()
#######################################################################
# Testing.
if __name__ == '__main__':
pass
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
*.pyc
__pycache__
......@@ -376,7 +376,7 @@ class ExperimentDsApi(DsRestApi):
response = self.sendSessionRequest(url=url, method='POST', rawOutput=True)
fileName = os.path.basename(experimentFilePath)
filePath = os.path.join(destDirectory, fileName)
open(filePath, 'w').write(response)
open(filePath, 'w').write(str(response))
return FileMetadata({'experimentName' : experimentName, 'experimentFilePath' : experimentFilePath, 'fileName' : fileName, 'localFilePath' : filePath})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment