Commit 9848a911 authored by sveseli's avatar sveseli
Browse files

Merge branch '0.9.2' into release/0.9

parents 6987b8db 90f73919
Release 0.11 (04/29/2016)
=============================
- Resolved issue with upload command for directories containing large
number of files
- Implemented enhanced upload processing algorithm to avoid resource
starvation between simultaneous DAQs and uploads
- Added new polling file system observer agent as option for monitoring
directories
- Reworked catalog API and corresponding MongoDB interfaces to use unique
experiment file paths, rather than file names
Release 0.10 (03/11/2016)
=============================
......
......@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.10 (DM_DATE)"
DM_SOFTWARE_VERSION="0.11 (DM_DATE)"
__version__ = "0.10 (2016.03.21)"
__version__ = "0.11 (2016.04.28)"
......@@ -20,10 +20,10 @@ class FileRestApi(CatRestApi):
experimentName = fileInfo.get('experimentName')
if not experimentName:
raise InvalidRequest('File metadata must contain experimentName key.')
fileName = fileInfo.get('fileName')
if not fileName:
raise InvalidRequest('File metadata must contain fileName key.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName)
experimentFilePath = fileInfo.get('experimentFilePath')
if not experimentFilePath:
raise InvalidRequest('File metadata must contain experimentFilePath key.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath))
url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
responseData = self.sendSessionRequest(url=url, method='POST')
return FileMetadata(responseData)
......@@ -33,10 +33,10 @@ class FileRestApi(CatRestApi):
experimentName = fileInfo.get('experimentName')
if not experimentName:
raise InvalidRequest('File metadata must contain experimentName key.')
fileName = fileInfo.get('fileName')
if not fileName:
raise InvalidRequest('File metadata must contain fileName key.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName)
experimentFilePath = fileInfo.get('experimentFilePath')
if not experimentFilePath:
raise InvalidRequest('File metadata must contain experimentFilePath key.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath))
url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
responseData = self.sendSessionRequest(url=url, method='PUT')
return FileMetadata(responseData)
......@@ -76,12 +76,12 @@ class FileRestApi(CatRestApi):
return FileMetadata(responseData)
@CatRestApi.execute
def getExperimentFile(self, experimentName, fileName):
def getExperimentFile(self, experimentName, experimentFilePath):
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not fileName:
raise InvalidRequest('Invalid file name provided.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, fileName)
if not experimentFilePath:
raise InvalidRequest('Invalid experiment file path provided.')
url = '%s/filesByExperiment/%s/%s' % (self.getContextRoot(), experimentName, Encoder.encode(experimentFilePath))
responseData = self.sendSessionRequest(url=url, method='GET')
return FileMetadata(responseData)
......@@ -97,7 +97,7 @@ if __name__ == '__main__':
import time
t = long(time.time())
fileInfo = {
'fileName' : 'sv-%s' % t,
'experimentFilePath' : 'sv-%s' % t,
'experimentName' : 'exp1',
'power' : 12,
'powerUnits' : 'kW',
......
......@@ -7,24 +7,24 @@ from catWebServiceSessionCli import CatWebServiceSessionCli
class AddExperimentFileCli(CatWebServiceSessionCli):
def __init__(self):
CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--file', dest='fileName', help='File name.')
self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.')
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.fileName is None:
raise InvalidRequest('File name must be provided.')
if self.options.experimentFilePath is None:
raise InvalidRequest('Experiment file path must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getFileName(self):
return self.options.fileName
def getExperimentFilePath(self):
return self.options.experimentFilePath
def runCommand(self):
self.parseArgs(usage="""
dm-add-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME
dm-add-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME
[key1:value1, key2:value2, ...]
Description:
......@@ -35,7 +35,7 @@ Description:
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileInfo = self.splitArgsIntoDict()
fileInfo['experimentName'] = self.getExperimentName()
fileInfo['fileName'] = self.getFileName()
fileInfo['experimentFilePath'] = self.getExperimentFilePath()
fileMetadata = api.addExperimentFile(fileInfo)
print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
......
......@@ -7,31 +7,31 @@ from catWebServiceSessionCli import CatWebServiceSessionCli
class GetExperimentFileCli(CatWebServiceSessionCli):
def __init__(self):
CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--file', dest='fileName', help='File name.')
self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.')
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.fileName is None:
raise InvalidRequest('File name must be provided.')
if self.options.experimentFilePath is None:
raise InvalidRequest('Experiment file path must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getFileName(self):
return self.options.fileName
def getExperimentFilePath(self):
return self.options.experimentFilePath
def runCommand(self):
self.parseArgs(usage="""
dm-get-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME
dm-get-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME
Description:
Retrieve experiment file metadata from the catalog.
""")
self.checkArgs()
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileMetadata = api.getExperimentFile(self.getExperimentName(), self.getFileName())
fileMetadata = api.getExperimentFile(self.getExperimentName(), self.getExperimentFilePath())
print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
......
......@@ -7,24 +7,24 @@ from catWebServiceSessionCli import CatWebServiceSessionCli
class UpdateExperimentFileCli(CatWebServiceSessionCli):
def __init__(self):
CatWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--file', dest='fileName', help='File name.')
self.addOption('', '--file', dest='experimentFilePath', help='Experiment file path.')
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.fileName is None:
raise InvalidRequest('File name must be provided.')
if self.options.experimentFilePath is None:
raise InvalidRequest('Experiment file path must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getFileName(self):
return self.options.fileName
def getExperimentFilePath(self):
return self.options.experimentFilePath
def runCommand(self):
self.parseArgs(usage="""
dm-update-experiment-file --file=FILENAME --experiment=EXPERIMENTNAME
dm-update-experiment-file --file=EXPERIMENTFILEPATH --experiment=EXPERIMENTNAME
[key1:value1, key2:value2, ...]
Description:
......@@ -37,7 +37,7 @@ Description:
api = FileRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
fileInfo = self.splitArgsIntoDict()
fileInfo['experimentName'] = self.getExperimentName()
fileInfo['fileName'] = self.getFileName()
fileInfo['experimentFilePath'] = self.getExperimentFilePath()
fileMetadata = api.updateExperimentFile(fileInfo)
print fileMetadata.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
......
......@@ -22,7 +22,7 @@ class FileRouteDescriptor:
# Add experiment file
{
'name' : 'addExperimentFile',
'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot,
'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot,
'controller' : fileSessionController,
'action' : 'addExperimentFile',
'method' : ['POST']
......@@ -40,7 +40,7 @@ class FileRouteDescriptor:
# Update experiment file by name
{
'name' : 'updateExperimentFile',
'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot,
'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot,
'controller' : fileSessionController,
'action' : 'updateExperimentFile',
'method' : ['PUT']
......@@ -76,7 +76,7 @@ class FileRouteDescriptor:
# Get experiment file by name
{
'name' : 'getExperimentFile',
'path' : '%s/filesByExperiment/:(experimentName)/:(fileName)' % contextRoot,
'path' : '%s/filesByExperiment/:(experimentName)/:(encodedExperimentFilePath)' % contextRoot,
'controller' : fileSessionController,
'action' : 'getExperimentFile',
'method' : ['GET']
......
......@@ -15,37 +15,39 @@ class FileSessionController(DmSessionController):
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def addExperimentFile(self, experimentName, fileName, **kwargs):
def addExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs):
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not fileName:
raise InvalidRequest('Invalid file name provided.')
experimentFilePath = Encoder.decode(encodedExperimentFilePath)
if not experimentFilePath:
raise InvalidRequest('Invalid experiment file path provided.')
encodedFileInfo = kwargs.get('fileInfo')
if not encodedFileInfo:
raise InvalidRequest('Invalid file info provided.')
fileInfo = json.loads(Encoder.decode(encodedFileInfo))
fileInfo['fileName'] = fileName
fileInfo['experimentFilePath'] = experimentFilePath
fileInfo['experimentName'] = experimentName
response = self.fileSessionControllerImpl.addExperimentFile(fileInfo).getFullJsonRep()
self.logger.debug('Added file %s: %s' % (fileName,response))
self.logger.debug('Added file %s: %s' % (experimentFilePath,response))
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def updateExperimentFile(self, experimentName, fileName, **kwargs):
def updateExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs):
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not fileName:
raise InvalidRequest('Invalid file name provided.')
experimentFilePath = Encoder.decode(encodedExperimentFilePath)
if not experimentFilePath:
raise InvalidRequest('Invalid experiment file path provided.')
encodedFileInfo = kwargs.get('fileInfo')
if not encodedFileInfo:
raise InvalidRequest('Invalid file info provided.')
fileInfo = json.loads(Encoder.decode(encodedFileInfo))
fileInfo['fileName'] = fileName
fileInfo['experimentFilePath'] = experimentFilePath
fileInfo['experimentName'] = experimentName
response = self.fileSessionControllerImpl.updateExperimentFile(fileInfo).getFullJsonRep()
self.logger.debug('Updated file %s: %s' % (fileName,response))
self.logger.debug('Updated file %s: %s' % (experimentFilePath,response))
return response
@cherrypy.expose
......@@ -97,12 +99,13 @@ class FileSessionController(DmSessionController):
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getExperimentFile(self, experimentName, fileName, **kwargs):
def getExperimentFile(self, experimentName, encodedExperimentFilePath, **kwargs):
if not experimentName:
raise InvalidRequest('Invalid experiment name provided.')
if not fileName:
raise InvalidRequest('Invalid file name provided.')
response = self.fileSessionControllerImpl.getExperimentFile(experimentName, fileName).getFullJsonRep()
self.logger.debug('Returning file %s: %s' % (fileName,response))
experimentFilePath = Encoder.decode(encodedExperimentFilePath)
if not experimentFilePath:
raise InvalidRequest('Invalid experiment file path provided.')
response = self.fileSessionControllerImpl.getExperimentFile(experimentName, experimentFilePath).getFullJsonRep()
self.logger.debug('Returning file %s: %s' % (experimentFilePath,response))
return response
......@@ -14,7 +14,7 @@ from dm.common.objects.datasetMetadata import DatasetMetadata
class DatasetMongoDbApi(DmMongoDbApi):
SYSTEM_KEY_LIST = ['_id', '_datasetName', '_experimentName']
SYSTEM_KEY_LIST = ['_id', 'datasetName', 'experimentName']
USE_DATASET_NAME_KEY = '_useDatasetName'
USE_EXPERIMENT_NAME_KEY = '_useDatasetName'
......@@ -41,14 +41,14 @@ class DatasetMongoDbApi(DmMongoDbApi):
@DmMongoDbApi.executeDbCall
def getExperimentDataset(self, experimentName, datasetName, **kwargs):
queryDict = { '_datasetName' : datasetName, '_experimentName' : experimentName }
queryDict = { 'datasetName' : datasetName, 'experimentName' : experimentName }
dbDatasetMetadata = self.datasetCollection.findByUniqueKeys(queryDict)
return self.toDmObject(dbDatasetMetadata, DatasetMetadata)
@DmMongoDbApi.executeDbCall
def getExperimentDatasets(self, experimentName, queryDict={}, returnFieldDict=DatasetCollection.ALL_FIELDS_DICT, **kwargs):
queryDict2 = copy.copy(queryDict)
queryDict2['_experimentName'] = experimentName
queryDict2['experimentName'] = experimentName
return self.getDatasets(queryDict2, returnFieldDict)
@DmMongoDbApi.executeDbCall
......
......@@ -58,9 +58,10 @@ class DmMongoDbApi:
for key in cls.SYSTEM_KEY_LIST:
if mongoDbObject.has_key(key):
# Remove leading underscore
newKey = key[1:]
mongoDbObject[newKey] = str(mongoDbObject[key])
del mongoDbObject[key]
if key.startswith('_'):
newKey = key[1:]
mongoDbObject[newKey] = str(mongoDbObject[key])
del mongoDbObject[key]
@classmethod
def getMongoDict(cls, dict):
......
......@@ -12,7 +12,7 @@ from dm.common.objects.fileMetadata import FileMetadata
class FileMongoDbApi(DmMongoDbApi):
SYSTEM_KEY_LIST = ['_id', '_fileName', '_experimentName']
SYSTEM_KEY_LIST = ['_id', 'experimentFilePath', 'experimentName']
def __init__(self):
DmMongoDbApi.__init__(self)
......@@ -38,15 +38,15 @@ class FileMongoDbApi(DmMongoDbApi):
return self.toDmObject(dbFileMetadata, FileMetadata)
@DmMongoDbApi.executeDbCall
def getExperimentFile(self, experimentName, fileName, **kwargs):
queryDict = { '_fileName' : fileName, '_experimentName' : experimentName }
def getExperimentFile(self, experimentName, experimentFilePath, **kwargs):
queryDict = { 'experimentFilePath' : experimentFilePath, 'experimentName' : experimentName }
dbFileMetadata = self.fileCollection.findByUniqueKeys(queryDict)
return self.toDmObject(dbFileMetadata, FileMetadata)
@DmMongoDbApi.executeDbCall
def getExperimentFiles(self, experimentName, queryDict={}, returnFieldDict=FileCollection.ALL_FIELDS_DICT, **kwargs):
queryDict2 = copy.copy(queryDict)
queryDict2['_experimentName'] = experimentName
queryDict2['experimentName'] = experimentName
return self.getFiles(queryDict2, returnFieldDict)
@DmMongoDbApi.executeDbCall
......@@ -70,19 +70,22 @@ class FileMongoDbApi(DmMongoDbApi):
#######################################################################
# Testing.
if __name__ == '__main__':
from dm.common.mongodb.impl.mongoDbManager import MongoDbManager
mgr = MongoDbManager.getInstance()
mgr.dbName = 'dm'
api = FileMongoDbApi()
fileName = 'file02'
experimentFilePath = 'file02'
experimentName = 'exp-01'
fileInfo = {'fileName' : fileName, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'experimentName' : experimentName}
fileInfo = {'experimentFilePath' : experimentFilePath, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'experimentName' : experimentName}
file = api.updateOrAddExperimentFile(fileInfo)
print '\nADDED FILE\n', file
import time
t = long(time.time())
fileName = 'f-%s' % t
experimentFilePath = 'f-%s' % t
experimentName = 'exp-01'
fileInfo = {'fileName' : fileName, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'dictKey' : {'a' : 'A', 'b' : 'B', 'c' : 3}, 'experimentName' : experimentName}
fileInfo = {'experimentFilePath' : experimentFilePath, 'intKey' : 1, 'doubleKey' : 2.0, 'stringKey' : 'myString' , 'dictKey' : {'a' : 'A', 'b' : 'B', 'c' : 3}, 'experimentName' : experimentName}
file = api.updateOrAddExperimentFile(fileInfo)
print '\nADDED FILE\n', file
......@@ -90,11 +93,11 @@ if __name__ == '__main__':
for file in files:
print 'FILE: %s\n' % file.getDictRep()
fileInfo = {'fileName' : fileName, 'experimentName' : experimentName, 'intKey' : 101}
fileInfo = {'experimentFilePath' : experimentFilePath, 'experimentName' : experimentName, 'intKey' : 101}
file = api.updateExperimentFile(fileInfo)
print '\nUPDATED FILE\n', file
print '\nFILES FOR EXPERIMENT exp1: \n', api.getExperimentFiles(experimentName)
print '\nFILES FOR EXPERIMENT exp1 with fileName=file01: \n', api.getExperimentFiles(experimentName, queryDict={'fileName':'file01'})
print '\nFILES FOR EXPERIMENT exp1 with experimentFilePath=file01: \n', api.getExperimentFiles(experimentName, queryDict={'experimentFilePath':'file02'})
......@@ -10,7 +10,7 @@ from dmMongoCollection import DmMongoCollection
class DatasetCollection(DmMongoCollection):
"""Class responsible for updating dataset collection in mongo db."""
UNIQUE_KEYS_LIST = [ '_datasetName', '_experimentName' ]
UNIQUE_KEYS_LIST = [ '_id', 'datasetName', 'experimentName' ]
def __init__(self, dbClient):
DmMongoCollection.__init__(self, 'datasets', dbClient)
......
......@@ -53,7 +53,7 @@ class DmMongoCollection(object):
return self.findByKey('_id', ObjectId(id))
def findByName(self, name):
return self.findByKey('_name', name)
return self.findByKey(self.NAME_KEY, name)
def findByQueryDict(self, queryDict, returnFieldDict=ALL_FIELDS_DICT):
return self.dbClient.findAsList(self.collectionName, queryDict, returnFieldDict)
......
......@@ -10,7 +10,7 @@ from dmMongoCollection import DmMongoCollection
class ExperimentCollection(DmMongoCollection):
"""Class responsible for updating experiment collection in mongo db."""
UNIQUE_KEYS_LIST = [ 'name' ]
UNIQUE_KEYS_LIST = [ '_id', 'name' ]
def __init__(self, dbClient):
DmMongoCollection.__init__(self, 'experiments', dbClient)
......
......@@ -10,23 +10,27 @@ from dmMongoCollection import DmMongoCollection
class FileCollection(DmMongoCollection):
"""Class responsible for updating file collection in mongo db."""
UNIQUE_KEYS_LIST = [ '_fileName', '_experimentName' ]
UNIQUE_KEYS_LIST = [ 'experimentFilePath', 'experimentName' ]
NAME_KEY = 'experimentFilePath'
def __init__(self, dbClient):
DmMongoCollection.__init__(self, 'files', dbClient)
def __init__(self, dbClient, experimentName=None):
collectionName = 'files'
if experimentName:
collectionName = '%s-files' % experimentName
DmMongoCollection.__init__(self, collectionName, dbClient)
#######################################################################
# Testing
if __name__ == '__main__':
from dmMongoClient import DmMongoClient
mongo = DmMongoClient('dm')
fileCollection = FileCollection(mongo)
fileInfo = {'_fileName' : 'xyz-001', '_experimentName' : 'myexp-001', 'update' : 'sv2', 'locationList' : '[/opt/xyz, /data/xyz]'}
print fileCollection.updateByUniqueKeys(fileInfo)
#print type(fileCollection.findByName('xyz-001'))
print fileCollection.findByQueryDict({'experiment' : 'exp-001'}, {'locationList' : 1})
fileInfo['_experimentName'] = 'ddm1'
fileCollection = FileCollection(mongo, 'exp01')
fileInfo = {'fileName' : 'xyz-001', 'experimentName' : 'exp01', 'update' : 'sv2', 'locationList' : '[/opt/xyz, /data/xyz]', 'experimentFilePath' : 'd1/xyz-001'}
print fileCollection.addByUniqueKeys(fileInfo)
#print type(fileCollection.findByName('xyz-001'))
print fileCollection.findByQueryDict({'experimentName' : 'exp01'}, {'locationList' : 1})
fileInfo['owner'] = 'ddm1'
print fileCollection.updateByUniqueKeys(fileInfo)
......@@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
......@@ -68,6 +68,7 @@ class UploadInfo(DmObject):
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
startTime = self.get('startTime', now)
runTime = now - startTime
......
......@@ -33,6 +33,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
def __init__(self):
......@@ -75,7 +77,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
......@@ -117,45 +119,73 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment, filePathsDict])
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
self.logger.debug('Starting upload timer for %s' % dataDirectory)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment, filePathsDict):
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
try:
# Get files
self.logger.debug('Retrieving file paths for %s' % dataDirectory)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
except Exception, ex:
self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex)))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():