From 9606d62c9b56a13a02538cb21ccd8f7885a723a8 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Thu, 28 Apr 2016 16:53:54 +0000 Subject: [PATCH] added new polling observer agent; enhanced upload functionality to avoid resource starvation with simultaneous daqs and uploads; resolved issue with starting uploads with large number of files in the specified directory --- doc/RELEASE_NOTES.txt | 10 ++ .../dm/cat_web_service/api/fileRestApi.py | 26 ++-- .../cli/addExperimentFileCli.py | 14 +-- .../cli/getExperimentFileCli.py | 14 +-- .../cli/updateExperimentFileCli.py | 14 +-- .../service/fileRouteDescriptor.py | 6 +- .../service/fileSessionController.py | 33 ++--- .../common/mongodb/api/datasetMongoDbApi.py | 6 +- .../dm/common/mongodb/api/dmMongoDbApi.py | 7 +- .../dm/common/mongodb/api/fileMongoDbApi.py | 23 ++-- .../common/mongodb/impl/datasetCollection.py | 2 +- .../common/mongodb/impl/dmMongoCollection.py | 2 +- .../mongodb/impl/experimentCollection.py | 2 +- .../dm/common/mongodb/impl/fileCollection.py | 22 ++-- src/python/dm/common/objects/uploadInfo.py | 3 +- .../impl/experimentSessionControllerImpl.py | 85 ++++++++----- .../impl/ftpFileSystemObserverAgent.py | 89 +------------ .../impl/pollingFileSystemObserverAgent.py | 117 ++++++++++++++++++ 18 files changed, 280 insertions(+), 195 deletions(-) create mode 100755 src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt index 48012ef0..bdefc509 100644 --- a/doc/RELEASE_NOTES.txt +++ b/doc/RELEASE_NOTES.txt @@ -1,3 +1,13 @@ +Release 0.11 (04/29/2016) +============================= + +- Resolved issue with upload command for directories containing large + number of files +- Implemented enhanced upload processing algorithm to avoid resource + starvation between simultaneous DAQs and uploads +- Added new polling file system observer agent as option for monitoring + directories + Release 0.10 (03/11/2016) ============================= diff --git a/src/python/dm/cat_web_service/api/fileRestApi.py b/src/python/dm/cat_web_service/api/fileRestApi.py index ea9f6231..f0cbe42a 100755 --- a/src/python/dm/cat_web_service/api/fileRestApi.py +++ b/src/python/dm/cat_web_service/api/fileRestApi.py @@ -20,10 +20,10 @@ class FileRestApi(CatRestApi): experimentName = fileInfo.get('experimentName') if not experimentName: raise InvalidRequest('File metadata must contain experimentName key.') - fileName = fileInfo.get('fileName') - if not fileName: - raise InvalidRequest('File metadata must contain fileName key.') - url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName) + experimentFilePath = fileInfo.get('experimentFilePath') + if not experimentFilePath: + raise InvalidRequest('File metadata must contain experimentFilePath key.') + url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath)) url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo))) responseData = self.sendSessionRequest(url=url, method='POST') return FileMetadata(responseData) @@ -33,10 +33,10 @@ class FileRestApi(CatRestApi): experimentName = fileInfo.get('experimentName') if not experimentName: raise InvalidRequest('File metadata must contain experimentName key.') - fileName = fileInfo.get('fileName') - if not fileName: - raise InvalidRequest('File metadata must contain fileName key.') - url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName) + experimentFilePath = fileInfo.get('experimentFilePath') + if not experimentFilePath: + raise InvalidRequest('File metadata must contain experimentFilePath key.') + url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath)) url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo))) responseData = self.sendSessionRequest(url=url, method='PUT') return FileMetadata(responseData) @@ -76,12 +76,12 @@ class FileRestApi(CatRestApi): return FileMetadata(responseData) @CatRestApi.execute - def getExperimentFile(self, experimentName, fileName): + def getExperimentFile(self, experimentName, experimentFilePath): if not experimentName: raise InvalidRequest('Invalid experiment name provided.') - if not fileName: - raise InvalidRequest('Invalid file name provided.') - url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName) + if not experimentFilePath: + raise InvalidRequest('Invalid experiment file path provided.') + url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath)) responseData = self.sendSessionRequest(url=url, method='GET') return FileMetadata(responseData) @@ -97,7 +97,7 @@ if __name__ == '__main__': import time t = long(time.time()) fileInfo = { - 'fileName' : 'sv-%s' % t, + 'experimentFilePath' : 'sv-%s' % t, 'experimentName' : 'exp1', 'power' : 12, 'powerUnits' : 'kW', diff --git a/src/python/dm/cat_web_service/cli/addExperimentFileCli.py b/src/python/dm/cat_web_service/cli/addExperimentFileCli.py index 942b05d0..50447f2c 100755 --- a/src/python/dm/cat_web_service/cli/addExperimentFileCli.py +++ b/src/python/dm/cat_web_service/cli/addExperimentFileCli.py @@ -7,24 +7,24 @@ from catWebServiceSessionCli import CatWebServiceSessionCli class AddExperimentFileCli(CatWebServiceSessionCli): def __init__(self): CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) - self.addOption('', '--file', dest='fileName', help='File name.') + self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.') self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') def checkArgs(self): if self.options.experimentName is None: raise InvalidRequest('Experiment name must be provided.') - if self.options.fileName is None: - raise InvalidRequest('File name must be provided.') + if self.options.experimentFilePath is None: + raise InvalidRequest('Experiment file path must be provided.') def getExperimentName(self): return self.options.experimentName - def getFileName(self): - return self.options.fileName + def getExperimentFilePath(self): + return self.options.experimentFilePath def runCommand(self): self.parseArgs(usage=""" - dm-add-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME + dm-add-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME [key1:value1, key2:value2, ...] Description: @@ -35,7 +35,7 @@ Description: api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) fileInfo = self.splitArgsIntoDict() fileInfo['experimentName'] = self.getExperimentName() - fileInfo['fileName'] = self.getFileName() + fileInfo['experimentFilePath'] = self.getExperimentFilePath() fileMetadata = api.addExperimentFile(fileInfo) print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) diff --git a/src/python/dm/cat_web_service/cli/getExperimentFileCli.py b/src/python/dm/cat_web_service/cli/getExperimentFileCli.py index 865d16b9..63c30a58 100755 --- a/src/python/dm/cat_web_service/cli/getExperimentFileCli.py +++ b/src/python/dm/cat_web_service/cli/getExperimentFileCli.py @@ -7,31 +7,31 @@ from catWebServiceSessionCli import CatWebServiceSessionCli class GetExperimentFileCli(CatWebServiceSessionCli): def __init__(self): CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) - self.addOption('', '--file', dest='fileName', help='File name.') + self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.') self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') def checkArgs(self): if self.options.experimentName is None: raise InvalidRequest('Experiment name must be provided.') - if self.options.fileName is None: - raise InvalidRequest('File name must be provided.') + if self.options.experimentFilePath is None: + raise InvalidRequest('Experiment file path must be provided.') def getExperimentName(self): return self.options.experimentName - def getFileName(self): - return self.options.fileName + def getExperimentFilePath(self): + return self.options.experimentFilePath def runCommand(self): self.parseArgs(usage=""" - dm-get-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME + dm-get-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME Description: Retrieve experiment file metadata from the catalog. """) self.checkArgs() api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) - fileMetadata = api.getExperimentFile(self.getExperimentName(), self.getFileName()) + fileMetadata = api.getExperimentFile(self.getExperimentName(), self.getExperimentFilePath()) print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) ####################################################################### diff --git a/src/python/dm/cat_web_service/cli/updateExperimentFileCli.py b/src/python/dm/cat_web_service/cli/updateExperimentFileCli.py index 341ae981..ea32cbf0 100755 --- a/src/python/dm/cat_web_service/cli/updateExperimentFileCli.py +++ b/src/python/dm/cat_web_service/cli/updateExperimentFileCli.py @@ -7,24 +7,24 @@ from catWebServiceSessionCli import CatWebServiceSessionCli class UpdateExperimentFileCli(CatWebServiceSessionCli): def __init__(self): CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS) - self.addOption('', '--file', dest='fileName', help='File name.') + self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.') self.addOption('', '--experiment', dest='experimentName', help='Experiment name.') def checkArgs(self): if self.options.experimentName is None: raise InvalidRequest('Experiment name must be provided.') - if self.options.fileName is None: - raise InvalidRequest('File name must be provided.') + if self.options.experimentFilePath is None: + raise InvalidRequest('Experiment file path must be provided.') def getExperimentName(self): return self.options.experimentName - def getFileName(self): - return self.options.fileName + def getExperimentFilePath(self): + return self.options.experimentFilePath def runCommand(self): self.parseArgs(usage=""" - dm-update-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME + dm-update-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME [key1:value1, key2:value2, ...] Description: @@ -37,7 +37,7 @@ Description: api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol()) fileInfo = self.splitArgsIntoDict() fileInfo['experimentName'] = self.getExperimentName() - fileInfo['fileName'] = self.getFileName() + fileInfo['experimentFilePath'] = self.getExperimentFilePath() fileMetadata = api.updateExperimentFile(fileInfo) print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat()) diff --git a/src/python/dm/cat_web_service/service/fileRouteDescriptor.py b/src/python/dm/cat_web_service/service/fileRouteDescriptor.py index 700a8df8..07931473 100755 --- a/src/python/dm/cat_web_service/service/fileRouteDescriptor.py +++ b/src/python/dm/cat_web_service/service/fileRouteDescriptor.py @@ -22,7 +22,7 @@ class FileRouteDescriptor: # Add experiment file { 'name' : 'addExperimentFile', - 'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot, + 'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot, 'controller' : fileSessionController, 'action' : 'addExperimentFile', 'method' : ['POST'] @@ -40,7 +40,7 @@ class FileRouteDescriptor: # Update experiment file by name { 'name' : 'updateExperimentFile', - 'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot, + 'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot, 'controller' : fileSessionController, 'action' : 'updateExperimentFile', 'method' : ['PUT'] @@ -76,7 +76,7 @@ class FileRouteDescriptor: # Get experiment file by name { 'name' : 'getExperimentFile', - 'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot, + 'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot, 'controller' : fileSessionController, 'action' : 'getExperimentFile', 'method' : ['GET'] diff --git a/src/python/dm/cat_web_service/service/fileSessionController.py b/src/python/dm/cat_web_service/service/fileSessionController.py index f2ff6b8b..d9e3ecbd 100755 --- a/src/python/dm/cat_web_service/service/fileSessionController.py +++ b/src/python/dm/cat_web_service/service/fileSessionController.py @@ -15,37 +15,39 @@ class FileSessionController(DmSessionController): @cherrypy.expose @DmSessionController.require(DmSessionController.isAdministrator()) @DmSessionController.execute - def addExperimentFile(self, experimentName, fileName, **kwargs): + def addExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs): if not experimentName: raise InvalidRequest('Invalid experiment name provided.') - if not fileName: - raise InvalidRequest('Invalid file name provided.') + experimentFilePath = Encoder.decode(encodedExperimentFilePath) + if not experimentFilePath: + raise InvalidRequest('Invalid experiment file path provided.') encodedFileInfo = kwargs.get('fileInfo') if not encodedFileInfo: raise InvalidRequest('Invalid file info provided.') fileInfo = json.loads(Encoder.decode(encodedFileInfo)) - fileInfo['fileName'] = fileName + fileInfo['experimentFilePath'] = experimentFilePath fileInfo['experimentName'] = experimentName response = self.fileSessionControllerImpl.addExperimentFile(fileInfo).getFullJsonRep() - self.logger.debug('Added file %s: %s' % (fileName,response)) + self.logger.debug('Added file %s: %s' % (experimentFilePath,response)) return response @cherrypy.expose @DmSessionController.require(DmSessionController.isAdministrator()) @DmSessionController.execute - def updateExperimentFile(self, experimentName, fileName, **kwargs): + def updateExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs): if not experimentName: raise InvalidRequest('Invalid experiment name provided.') - if not fileName: - raise InvalidRequest('Invalid file name provided.') + experimentFilePath = Encoder.decode(encodedExperimentFilePath) + if not experimentFilePath: + raise InvalidRequest('Invalid experiment file path provided.') encodedFileInfo = kwargs.get('fileInfo') if not encodedFileInfo: raise InvalidRequest('Invalid file info provided.') fileInfo = json.loads(Encoder.decode(encodedFileInfo)) - fileInfo['fileName'] = fileName + fileInfo['experimentFilePath'] = experimentFilePath fileInfo['experimentName'] = experimentName response = self.fileSessionControllerImpl.updateExperimentFile(fileInfo).getFullJsonRep() - self.logger.debug('Updated file %s: %s' % (fileName,response)) + self.logger.debug('Updated file %s: %s' % (experimentFilePath,response)) return response @cherrypy.expose @@ -97,12 +99,13 @@ class FileSessionController(DmSessionController): @cherrypy.expose @DmSessionController.require(DmSessionController.isAdministrator()) @DmSessionController.execute - def getExperimentFile(self, experimentName, fileName, **kwargs): + def getExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs): if not experimentName: raise InvalidRequest('Invalid experiment name provided.') - if not fileName: - raise InvalidRequest('Invalid file name provided.') - response = self.fileSessionControllerImpl.getExperimentFile(experimentName, fileName).getFullJsonRep() - self.logger.debug('Returning file %s: %s' % (fileName,response)) + experimentFilePath = Encoder.decode(encodedExperimentFilePath) + if not experimentFilePath: + raise InvalidRequest('Invalid experiment file path provided.') + response = self.fileSessionControllerImpl.getExperimentFile(experimentName, experimentFilePath).getFullJsonRep() + self.logger.debug('Returning file %s: %s' % (experimentFilePath,response)) return response diff --git a/src/python/dm/common/mongodb/api/datasetMongoDbApi.py b/src/python/dm/common/mongodb/api/datasetMongoDbApi.py index 66a8aa34..1ee5038e 100755 --- a/src/python/dm/common/mongodb/api/datasetMongoDbApi.py +++ b/src/python/dm/common/mongodb/api/datasetMongoDbApi.py @@ -14,7 +14,7 @@ from dm.common.objects.datasetMetadata import DatasetMetadata class DatasetMongoDbApi(DmMongoDbApi): - SYSTEM_KEY_LIST = ['_id', '_datasetName', '_experimentName'] + SYSTEM_KEY_LIST = ['_id', 'datasetName', 'experimentName'] USE_DATASET_NAME_KEY = '_useDatasetName' USE_EXPERIMENT_NAME_KEY = '_useDatasetName' @@ -41,14 +41,14 @@ class DatasetMongoDbApi(DmMongoDbApi): @DmMongoDbApi.executeDbCall def getExperimentDataset(self, experimentName, datasetName, **kwargs): - queryDict = { '_datasetName' : datasetName, '_experimentName' : experimentName } + queryDict = { 'datasetName' : datasetName, 'experimentName' : experimentName } dbDatasetMetadata = self.datasetCollection.findByUniqueKeys(queryDict) return self.toDmObject(dbDatasetMetadata, DatasetMetadata) @DmMongoDbApi.executeDbCall def getExperimentDatasets(self, experimentName, queryDict={}, returnFieldDict=DatasetCollection.ALL_FIELDS_DICT, **kwargs): queryDict2 = copy.copy(queryDict) - queryDict2['_experimentName'] = experimentName + queryDict2['experimentName'] = experimentName return self.getDatasets(queryDict2, returnFieldDict) @DmMongoDbApi.executeDbCall diff --git a/src/python/dm/common/mongodb/api/dmMongoDbApi.py b/src/python/dm/common/mongodb/api/dmMongoDbApi.py index a76e37b4..dc2f8b36 100755 --- a/src/python/dm/common/mongodb/api/dmMongoDbApi.py +++ b/src/python/dm/common/mongodb/api/dmMongoDbApi.py @@ -58,9 +58,10 @@ class DmMongoDbApi: for key in cls.SYSTEM_KEY_LIST: if mongoDbObject.has_key(key): # Remove leading underscore - newKey = key[1:] - mongoDbObject[newKey] = str(mongoDbObject[key]) - del mongoDbObject[key] + if key.startswith('_'): + newKey = key[1:] + mongoDbObject[newKey] = str(mongoDbObject[key]) + del mongoDbObject[key] @classmethod def getMongoDict(cls, dict): diff --git a/src/python/dm/common/mongodb/api/fileMongoDbApi.py b/src/python/dm/common/mongodb/api/fileMongoDbApi.py index e0bdc3f7..4c5a1fd5 100755 --- a/src/python/dm/common/mongodb/api/fileMongoDbApi.py +++ b/src/python/dm/common/mongodb/api/fileMongoDbApi.py @@ -12,7 +12,7 @@ from dm.common.objects.fileMetadata import FileMetadata class FileMongoDbApi(DmMongoDbApi): - SYSTEM_KEY_LIST = ['_id', '_fileName', '_experimentName'] + SYSTEM_KEY_LIST = ['_id', 'experimentFilePath', 'experimentName'] def __init__(self): DmMongoDbApi.__init__(self) @@ -38,15 +38,15 @@ class FileMongoDbApi(DmMongoDbApi): return self.toDmObject(dbFileMetadata, FileMetadata) @DmMongoDbApi.executeDbCall - def getExperimentFile(self, experimentName, fileName, **kwargs): - queryDict = { '_fileName' : fileName, '_experimentName' : experimentName } + def getExperimentFile(self, experimentName, experimentFilePath, **kwargs): + queryDict = { 'experimentFilePath' : experimentFilePath, 'experimentName' : experimentName } dbFileMetadata = self.fileCollection.findByUniqueKeys(queryDict) return self.toDmObject(dbFileMetadata, FileMetadata) @DmMongoDbApi.executeDbCall def getExperimentFiles(self, experimentName, queryDict={}, returnFieldDict=FileCollection.ALL_FIELDS_DICT, **kwargs): queryDict2 = copy.copy(queryDict) - queryDict2['_experimentName'] = experimentName + queryDict2['experimentName'] = experimentName return self.getFiles(queryDict2, returnFieldDict) @DmMongoDbApi.executeDbCall @@ -70,19 +70,22 @@ class FileMongoDbApi(DmMongoDbApi): ####################################################################### # Testing. if __name__ == '__main__': + from dm.common.mongodb.impl.mongoDbManager import MongoDbManager + mgr = MongoDbManager.getInstance() + mgr.dbName = 'dm' api = FileMongoDbApi() - fileName = 'file02' + experimentFilePath = 'file02' experimentName = 'exp-01' - fileInfo = {'fileName' : fileName, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'experimentName' : experimentName} + fileInfo = {'experimentFilePath' : experimentFilePath, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'experimentName' : experimentName} file = api.updateOrAddExperimentFile(fileInfo) print '\nADDED FILE\n', file import time t = long(time.time()) - fileName = 'f-%s' % t + experimentFilePath = 'f-%s' % t experimentName = 'exp-01' - fileInfo = {'fileName' : fileName, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'dictKey' : {'a' : 'A', 'b' : 'B', 'c' : 3}, 'experimentName' : experimentName} + fileInfo = {'experimentFilePath' : experimentFilePath, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'dictKey' : {'a' : 'A', 'b' : 'B', 'c' : 3}, 'experimentName' : experimentName} file = api.updateOrAddExperimentFile(fileInfo) print '\nADDED FILE\n', file @@ -90,11 +93,11 @@ if __name__ == '__main__': for file in files: print 'FILE: %s\n' % file.getDictRep() - fileInfo = {'fileName' : fileName, 'experimentName' : experimentName, 'intKey' : 101} + fileInfo = {'experimentFilePath' : experimentFilePath, 'experimentName' : experimentName, 'intKey' : 101} file = api.updateExperimentFile(fileInfo) print '\nUPDATED FILE\n', file print '\nFILES FOR EXPERIMENT exp1: \n', api.getExperimentFiles(experimentName) - print '\nFILES FOR EXPERIMENT exp1 with fileName=file01: \n', api.getExperimentFiles(experimentName, queryDict={'fileName':'file01'}) + print '\nFILES FOR EXPERIMENT exp1 with experimentFilePath=file01: \n', api.getExperimentFiles(experimentName, queryDict={'experimentFilePath':'file02'}) diff --git a/src/python/dm/common/mongodb/impl/datasetCollection.py b/src/python/dm/common/mongodb/impl/datasetCollection.py index 43f7e812..abdb997a 100755 --- a/src/python/dm/common/mongodb/impl/datasetCollection.py +++ b/src/python/dm/common/mongodb/impl/datasetCollection.py @@ -10,7 +10,7 @@ from dmMongoCollection import DmMongoCollection class DatasetCollection(DmMongoCollection): """Class responsible for updating dataset collection in mongo db.""" - UNIQUE_KEYS_LIST = [ '_datasetName', '_experimentName' ] + UNIQUE_KEYS_LIST = [ '_id', 'datasetName', 'experimentName' ] def __init__(self, dbClient): DmMongoCollection.__init__(self, 'datasets', dbClient) diff --git a/src/python/dm/common/mongodb/impl/dmMongoCollection.py b/src/python/dm/common/mongodb/impl/dmMongoCollection.py index 2d1664b6..f2c2c49f 100755 --- a/src/python/dm/common/mongodb/impl/dmMongoCollection.py +++ b/src/python/dm/common/mongodb/impl/dmMongoCollection.py @@ -53,7 +53,7 @@ class DmMongoCollection(object): return self.findByKey('_id', ObjectId(id)) def findByName(self, name): - return self.findByKey('_name', name) + return self.findByKey(self.NAME_KEY, name) def findByQueryDict(self, queryDict, returnFieldDict=ALL_FIELDS_DICT): return self.dbClient.findAsList(self.collectionName, queryDict, returnFieldDict) diff --git a/src/python/dm/common/mongodb/impl/experimentCollection.py b/src/python/dm/common/mongodb/impl/experimentCollection.py index 114f27a9..f3ead94d 100755 --- a/src/python/dm/common/mongodb/impl/experimentCollection.py +++ b/src/python/dm/common/mongodb/impl/experimentCollection.py @@ -10,7 +10,7 @@ from dmMongoCollection import DmMongoCollection class ExperimentCollection(DmMongoCollection): """Class responsible for updating experiment collection in mongo db.""" - UNIQUE_KEYS_LIST = [ 'name' ] + UNIQUE_KEYS_LIST = [ '_id', 'name' ] def __init__(self, dbClient): DmMongoCollection.__init__(self, 'experiments', dbClient) diff --git a/src/python/dm/common/mongodb/impl/fileCollection.py b/src/python/dm/common/mongodb/impl/fileCollection.py index b794b3d1..7658aef2 100755 --- a/src/python/dm/common/mongodb/impl/fileCollection.py +++ b/src/python/dm/common/mongodb/impl/fileCollection.py @@ -10,23 +10,27 @@ from dmMongoCollection import DmMongoCollection class FileCollection(DmMongoCollection): """Class responsible for updating file collection in mongo db.""" - UNIQUE_KEYS_LIST = [ '_fileName', '_experimentName' ] + UNIQUE_KEYS_LIST = [ 'experimentFilePath', 'experimentName' ] + NAME_KEY = 'experimentFilePath' - def __init__(self, dbClient): - DmMongoCollection.__init__(self, 'files', dbClient) + def __init__(self, dbClient, experimentName=None): + collectionName = 'files' + if experimentName: + collectionName = '%s-files' % experimentName + DmMongoCollection.__init__(self, collectionName, dbClient) ####################################################################### # Testing if __name__ == '__main__': from dmMongoClient import DmMongoClient mongo = DmMongoClient('dm') - fileCollection = FileCollection(mongo) - fileInfo = {'_fileName' : 'xyz-001', '_experimentName' : 'myexp-001', 'update' : 'sv2', 'locationList' : '[/opt/xyz, /data/xyz]'} - print fileCollection.updateByUniqueKeys(fileInfo) - #print type(fileCollection.findByName('xyz-001')) - print fileCollection.findByQueryDict({'experiment' : 'exp-001'}, {'locationList' : 1}) - fileInfo['_experimentName'] = 'ddm1' + fileCollection = FileCollection(mongo, 'exp01') + fileInfo = {'fileName' : 'xyz-001', 'experimentName' : 'exp01', 'update' : 'sv2', 'locationList' : '[/opt/xyz, /data/xyz]', 'experimentFilePath' : 'd1/xyz-001'} print fileCollection.addByUniqueKeys(fileInfo) + #print type(fileCollection.findByName('xyz-001')) + print fileCollection.findByQueryDict({'experimentName' : 'exp01'}, {'locationList' : 1}) + fileInfo['owner'] = 'ddm1' + print fileCollection.updateByUniqueKeys(fileInfo) diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index 5fc59a35..a119b0ae 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility class UploadInfo(DmObject): - DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ] + DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ] def __init__(self, dict={}): DmObject.__init__(self, dict) @@ -68,6 +68,7 @@ class UploadInfo(DmObject): nProcessingErrors = self.get('nProcessingErrors', 0) processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors + self['nCompletedFiles'] = nCompletedFiles startTime = self.get('startTime', now) runTime = now - startTime diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 5899f05c..d08f2e1c 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -33,6 +33,8 @@ class ExperimentSessionControllerImpl(DmObjectManager): """ Experiment session controller implementation class. """ UPLOAD_DELAY_IN_SECONDS = 1.0 + UPLOAD_CHUNK_SIZE_IN_FILES = 100 + UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0 DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0 def __init__(self): @@ -75,7 +77,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) - filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) + fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) @@ -117,45 +119,72 @@ class ExperimentSessionControllerImpl(DmObjectManager): if not len(processorList): raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory) - # Remove hidden files - self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) - filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) - # Check which files need to be processed - filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) - if not len(filePathsDict): - raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) - UploadTracker.getInstance().startUpload(uploadId, uploadInfo) - uploadInfo['nFiles'] = len(filePathsDict) - uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING - self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) - timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict]) + uploadInfo['nFiles'] = 0 + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING + self.logger.debug('Starting upload timer for %s' % dataDirectory) + + timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment]) timer.start() return uploadInfo - def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict): + def prepareUploadFiles(self, uploadInfo, daqInfo, experiment): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) dataDirectory = uploadInfo.get('dataDirectory') fileProcessingManager = FileProcessingManager.getInstance() + + try: + # Get files + filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) + + # Remove hidden files + self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) + filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) + + # Check which files need to be processed + filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) + + if not len(filePathsDict): + raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) + except Exception, ex: + self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex))) + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED + uploadInfo['errorMessage'] = str(ex) + return + + uploadInfo['nFiles'] = len(filePathsDict) + uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING + self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) nProcessedFiles = 0 nFiles = len(filePathsDict) for (filePath,filePathDict) in filePathsDict.items(): - fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) - fileInfo.update(filePathDict) - fileInfo['daqInfo'] = daqInfo - fileInfo['uploadId'] = uploadId - fileInfo['statusMonitor'] = uploadInfo - fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) try: - if uploadInfo.get('status') != 'aborting': - fileProcessingManager.processFile(fileInfo) - nProcessedFiles += 1 - else: - nCancelledFiles = nFiles - nProcessedFiles - uploadInfo.uploadAborted(nCancelledFiles) - self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) - break + # Only create new uploads if we have less than + # UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed + while True: + status = uploadInfo.get('status') + if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING: + nCancelledFiles = nFiles - nProcessedFiles + uploadInfo.uploadAborted(nCancelledFiles) + self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles)) + return + nCompletedFiles = uploadInfo.get('nCompletedFiles', 0) + nWaitingFiles = nProcessedFiles - nCompletedFiles + if nWaitingFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES: + # We need to add more files for upload + break + self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)) + time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS) + + fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) + fileInfo.update(filePathDict) + fileInfo['daqInfo'] = daqInfo + fileInfo['uploadId'] = uploadId + fileInfo['statusMonitor'] = uploadInfo + fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', []) + fileProcessingManager.processFile(fileInfo) + nProcessedFiles += 1 except Exception, ex: self.logger.error('Processing error: %s', ex) self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) diff --git a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py index e7f9c079..5203ef92 100755 --- a/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py +++ b/src/python/dm/daq_web_service/service/impl/ftpFileSystemObserverAgent.py @@ -1,22 +1,19 @@ #!/usr/bin/env python from threading import Timer -from fileSystemObserverAgent import FileSystemObserverAgent +from pollingFileSystemObserverAgent import PollingFileSystemObserverAgent from dm.common.utility.ftpUtility import FtpUtility -class FtpFileSystemObserverAgent(FileSystemObserverAgent): +class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent): DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): - FileSystemObserverAgent.__init__(self) + PollingFileSystemObserverAgent.__init__(self, pollingPeriod) self.host = host self.port = port self.username = username self.password = password - self.pollingPeriod = pollingPeriod - self.observedDirDict = {} - self.isDone = False def getFiles(self, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port) @@ -24,86 +21,6 @@ class FtpFileSystemObserverAgent(FileSystemObserverAgent): ftpUtility = FtpUtility(host, port, self.username, self.password) return ftpUtility.getFiles(dirPath, {}) - def updateFile(self, filePath, dataDirectory, experiment): - if self.fileSystemObserver: - self.logger.debug('Processing file path: %s' % filePath) - self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment) - - def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment): - for filePath in fileDict.keys(): - if not oldFileDict.has_key(filePath): - # new file, must be updated - self.logger.debug('New file path detected: %s' % filePath) - self.updateFile(filePath, dataDirectory, experiment) - else: - # old file, check timestamp - oldFileInfo = oldFileDict.get(filePath) - oldModifyTime = oldFileInfo.get('fileModificationTime', '') - fileInfo = fileDict.get(filePath) - modifyTime = fileInfo.get('fileModificationTime') - if modifyTime != oldModifyTime: - # file has been modified, need to process it - self.logger.debug('Modified file path detected: %s' % filePath) - self.updateFile(filePath, dataDirectory, experiment) - - - def pollFileSystem(self, dataDirectory, experiment): - try: - fileDict = self.getFiles(dataDirectory) - observedDirInfo = self.observedDirDict.get(dataDirectory) - if not observedDirInfo: - self.logger.debug('Polling cancelled for directory: %s' % dataDirectory) - return - oldFileDict = observedDirInfo.get('files') - observedDirInfo['files'] = fileDict - self.processFiles(fileDict, oldFileDict, dataDirectory, experiment) - self.startPollingTimer(observedDirInfo, dataDirectory, experiment) - except Exception, ex: - self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex)) - - def startPollingTimer(self, observedDirInfo, dataDirectory, experiment): - if self.isDone: - return - - t = Timer(self.pollingPeriod, self.pollFileSystem, [dataDirectory, experiment]) - observedDirInfo['pollTimer'] = t - t.start() - - def startObservingPath(self, dataDirectory, experiment): - observedDirInfo = self.observedDirDict.get(dataDirectory) - if observedDirInfo: - self.logger.debug('Observer for %s is already active' % dataDirectory) - return - self.logger.debug('Starting observer for %s' % dataDirectory) - fileDict = self.getFiles(dataDirectory) - observedDirInfo = self.observedDirDict.get(dataDirectory, {}) - observedDirInfo['files'] = fileDict - observedDirInfo['experiment'] = experiment - self.observedDirDict[dataDirectory] = observedDirInfo - self.startPollingTimer(observedDirInfo, dataDirectory, experiment) - - def stopObservingPath(self, dataDirectory, experiment): - observedDirInfo = self.observedDirDict.get(dataDirectory) - if not observedDirInfo: - self.logger.debug('Observer for %s is not active' % dataDirectory) - return - - self.logger.debug('Stopping observer for %s' % dataDirectory) - t = observedDirInfo.get('pollTimer') - if t: - t.cancel() - del self.observedDirDict[dataDirectory] - - def start(self): - self.logger.debug('Starting ftp observer agent') - - def stop(self): - self.logger.debug('Stopping ftp observer agent') - self.isDone = True - for (dataDirectory,observedDirInfo) in self.observedDirDict.items(): - experiment = observedDirInfo.get('experiment') - self.stopObservingPath(dataDirectory, experiment) - #################################################################### # Testing diff --git a/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py new file mode 100755 index 00000000..53c15630 --- /dev/null +++ b/src/python/dm/daq_web_service/service/impl/pollingFileSystemObserverAgent.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +from threading import Timer +from fileSystemObserverAgent import FileSystemObserverAgent +from dm.common.utility.osUtility import OsUtility + +class PollingFileSystemObserverAgent(FileSystemObserverAgent): + + DEFAULT_POLLING_PERIOD_IN_SECONDS = 5 + DEFAULT_RETRY_PERIOD_IN_SECONDS = 60 + + def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS): + FileSystemObserverAgent.__init__(self) + self.pollingPeriod = pollingPeriod + self.retryDelay = 0 + self.observedDirDict = {} + self.isDone = False + + def getFiles(self, dataDirectory): + self.logger.debug('Retrieving files for directory: %s' % (dataDirectory)) + return OsUtility.findFilesAsDict(dataDirectory, {}) + + def updateFile(self, filePath, dataDirectory, experiment): + if self.fileSystemObserver: + self.logger.debug('Processing file path: %s' % filePath) + self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment) + + def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment): + for filePath in fileDict.keys(): + if not oldFileDict.has_key(filePath): + # new file, must be updated + self.logger.debug('New file path detected: %s' % filePath) + self.updateFile(filePath, dataDirectory, experiment) + else: + # old file, check timestamp + oldFileInfo = oldFileDict.get(filePath) + oldModifyTime = oldFileInfo.get('fileModificationTime', '') + fileInfo = fileDict.get(filePath) + modifyTime = fileInfo.get('fileModificationTime') + if modifyTime != oldModifyTime: + # file has been modified, need to process it + self.logger.debug('Modified file path detected: %s' % filePath) + self.updateFile(filePath, dataDirectory, experiment) + + + def pollFileSystem(self, dataDirectory, experiment): + try: + observedDirInfo = self.observedDirDict.get(dataDirectory) + if not observedDirInfo: + self.logger.debug('Polling cancelled for directory: %s' % dataDirectory) + return + oldFileDict = observedDirInfo.get('files') + fileDict = self.getFiles(dataDirectory) + observedDirInfo['files'] = fileDict + self.processFiles(fileDict, oldFileDict, dataDirectory, experiment) + self.retryDelay = 0 + except Exception, ex: + self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex)) + self.retryDelay = self.DEFAULT_RETRY_PERIOD_IN_SECONDS + self.logger.debug('Next polling for directory %s will be delayed by: %s seconds' % (dataDirectory, self.retryDelay)) + self.startPollingTimer(observedDirInfo, dataDirectory, experiment) + + def startPollingTimer(self, observedDirInfo, dataDirectory, experiment): + if self.isDone: + return + + delay = self.pollingPeriod + self.retryDelay + t = Timer(delay, self.pollFileSystem, [dataDirectory, experiment]) + observedDirInfo['pollTimer'] = t + t.start() + + def startObservingPath(self, dataDirectory, experiment): + observedDirInfo = self.observedDirDict.get(dataDirectory) + if observedDirInfo: + self.logger.debug('Observer for %s is already active' % dataDirectory) + return + self.logger.debug('Starting observer for %s' % dataDirectory) + fileDict = self.getFiles(dataDirectory) + observedDirInfo = self.observedDirDict.get(dataDirectory, {}) + observedDirInfo['files'] = fileDict + observedDirInfo['experiment'] = experiment + self.observedDirDict[dataDirectory] = observedDirInfo + self.startPollingTimer(observedDirInfo, dataDirectory, experiment) + + def stopObservingPath(self, dataDirectory, experiment): + observedDirInfo = self.observedDirDict.get(dataDirectory) + if not observedDirInfo: + self.logger.debug('Observer for %s is not active' % dataDirectory) + return + + self.logger.debug('Stopping observer for %s' % dataDirectory) + t = observedDirInfo.get('pollTimer') + if t: + t.cancel() + del self.observedDirDict[dataDirectory] + + def start(self): + self.logger.debug('Starting ftp observer agent') + + def stop(self): + self.logger.debug('Stopping ftp observer agent') + self.isDone = True + for (dataDirectory,observedDirInfo) in self.observedDirDict.items(): + experiment = observedDirInfo.get('experiment') + self.stopObservingPath(dataDirectory, experiment) + +#################################################################### +# Testing + +if __name__ == '__main__': + import time + agent = PollingFileSystemObserverAgent() + print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test') + agent.startObservingPath('/tmp/test', 'e1') + time.sleep(100) + agent.stopObservingPath('/tmp/test', 'e1') + -- GitLab