Skip to content
Snippets Groups Projects
Commit 329269fa authored by sveseli's avatar sveseli
Browse files

fixed issue with starting directory uploads or daqs with large source directories

parent ee522b37
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ from dm.common.utility.timeUtility import TimeUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
......
......@@ -54,7 +54,7 @@ class DaqTracker(ObjectTracker):
self.put(daqId, daqInfo2)
return daqInfo2
def stopDaq(self, experiment, dataDirectory):
def stopDaq(self, experiment, dataDirectory, status=None, errorMessage=None):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
......@@ -64,7 +64,12 @@ class DaqTracker(ObjectTracker):
endTime = time.time()
daqInfo['endTime'] = endTime
daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
daqInfo.updateStatus()
if status:
daqInfo['status'] = status
else:
daqInfo.updateStatus()
if errorMessage:
daqInfo['errorMessage'] = errorMessage
del self.activeDaqDict[activeDaqKey]
return daqInfo
......
......@@ -80,12 +80,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqInfo.updateStatus()
daqId = daqInfo.get('id')
self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
self.logger.debug('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
# Prepare upload on exit
uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
if uploadDataDirectoryOnExit:
self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
self.logger.debug('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
daqInfo2 = {}
daqInfo2['originalDaqId'] = daqId
uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit')
......@@ -137,7 +137,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
......@@ -262,7 +262,6 @@ class ExperimentSessionControllerImpl(DmObjectManager):
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
......@@ -275,7 +274,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
uploadInfo['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
......@@ -287,11 +286,37 @@ class ExperimentSessionControllerImpl(DmObjectManager):
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
self.logger.debug('Starting upload directory %s timer for experiment %s' % (dataDirectory, experimentName))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadDirectory, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadDirectory(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo['id']
dataDirectory = uploadInfo['dataDirectory']
experimentName = uploadInfo['experimentName']
skipPlugins = uploadInfo.get('skipPlugins', [])
self.logger.debug('Preparing directory %s upload for experiment %s' % (dataDirectory, experimentName))
try:
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
self.logger.debug('There are %s files in directory %s (experiment %s)' % (len(filePathsDict), dataDirectory, experimentName))
except Exception, ex:
self.logger.error('Cannot retrieve files for directory upload %s: %s' % (uploadId, str(ex)))
self.logger.error('Marking directory upload %s as failed' % (uploadId))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
self.logger.debug('Preparing plugin timers for directory %s upload (experiment %s)' % (dataDirectory, experimentName))
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
......@@ -303,10 +328,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
return uploadInfo
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
......
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.constants import dmProcessingStatus
from dm.common.utility.osUtility import OsUtility
from fileSystemObserverAgent import FileSystemObserverAgent
from daqTracker import DaqTracker
class PollingFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_START_OBSERVING_PATH_DELAY_IN_SECONDS = 3
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
......@@ -75,13 +78,23 @@ class PollingFileSystemObserverAgent(FileSystemObserverAgent):
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
t = Timer(self.DEFAULT_START_OBSERVING_PATH_DELAY_IN_SECONDS, self.startObservingPathTimer, [observedDirInfo, dataDirectory, experiment])
t.start()
def startObservingPathTimer(self, observedDirInfo, dataDirectory, experiment):
try:
self.logger.debug('Starting initial retrieval of files for directory %s' % (dataDirectory))
fileDict = self.getFiles(dataDirectory)
observedDirInfo['files'] = fileDict
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
except Exception, ex:
self.logger.error('Could not retrieve files for directory %s: %s' % (dataDirectory,ex))
DaqTracker.getInstance().stopDaq(experiment, dataDirectory, status=dmProcessingStatus.DM_PROCESSING_STATUS_FAILED, errorMessage=str(ex))
self.logger.error('Marked as failed DAQ for experiment %s, data directory %s' % (experiment['name'], dataDirectory))
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment