Commit 24820bc9 authored by sveseli's avatar sveseli
Browse files

Merge branch '0.9.6' into release/0.9

parents e91b32e5 97e9dcbc
Release 0.15 (11/1/2016)
=============================
- Resolved issue with incorrect accounting of processing errors for DAQs
- Improved DAQ processing algorithm to avoid resource starvation between
simultaneous DAQs and uploads
- Enhanced monitoring status information for both DAQs and uploads
Release 0.14 (10/14/2016)
=============================
......
......@@ -15,5 +15,5 @@ DM_DAQ_WEB_SERVICE_HOST=DM_HOSTNAME
DM_DAQ_WEB_SERVICE_PORT=33336
DM_CAT_WEB_SERVICE_HOST=DM_HOSTNAME
DM_CAT_WEB_SERVICE_PORT=44436
DM_SOFTWARE_VERSION="0.14 (DM_DATE)"
DM_SOFTWARE_VERSION="0.15 (DM_DATE)"
__version__ = "0.14 (2016.10.13)"
__version__ = "0.15 (2016.10.31)"
......@@ -11,7 +11,7 @@ from dm.common.utility.timeUtility import TimeUtility
class DaqInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp' ]
def __init__(self, dict={}):
......@@ -40,10 +40,11 @@ class DaqInfo(DmObject):
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
# file can be processed multiple times, keep only the last error
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
self['nProcessingErrors'] = len(processingErrors)
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
......@@ -60,6 +61,9 @@ class DaqInfo(DmObject):
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
......
......@@ -10,7 +10,7 @@ from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
......@@ -30,10 +30,10 @@ class UploadInfo(DmObject):
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
self['nProcessingErrors'] = len(processingErrors)
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
......@@ -69,6 +69,8 @@ class UploadInfo(DmObject):
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
......
......@@ -32,13 +32,16 @@ class FileProcessingThread(threading.Thread):
try:
statusMonitor = fileInfo.get('statusMonitor')
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
statusMonitorId = ''
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
statusMonitor.updateStatus()
return
self.logger.debug('Starting to process file %s' % filePath)
if statusMonitor:
statusMonitorId = statusMonitor.get('id', '')
self.logger.debug('Starting to process file %s (upload or DAQ id: %s)' % (filePath, statusMonitorId))
startProcessingTime = fileInfo.get('startProcessingTime', time.time())
fileInfo['startProcessingTime'] = startProcessingTime
nProcessors = len(self.fileProcessorKeyList)
......@@ -64,11 +67,13 @@ class FileProcessingThread(threading.Thread):
processedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath))
endProcessingTime = time.time()
self.logger.debug('File %s processing is complete (upload or DAQ id: %s)' % (filePath, statusMonitorId))
if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime)
statusMonitor.updateStatus()
nProcessedFiles = statusMonitor.get('nProcessedFiles', 0)
self.logger.debug('Upload or DAQ id %s has processed %s files so far)' % (statusMonitorId, nProcessedFiles))
except Exception, ex:
self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex))
......
......@@ -144,11 +144,13 @@ class SftpUtility:
if __name__ == '__main__':
#sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12')
sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
#sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
sftpUtility = SftpUtility('xstor-devel')
files = sftpUtility.getFiles('/data/testing/id7-test02')
#files = sftpUtility.getFiles('/export/dm/test')
#print files
print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
print files
#print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
#print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
#print 'Closing connection'
#sftpUtility.closeConnection()
#print sftpUtility.statFile('/export/dm/test/testfile01')
......@@ -71,6 +71,13 @@ class DaqTracker(ObjectTracker):
def getDaqInfo(self, id):
return self.get(id)
def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING):
daqInfoList = self.getAll()
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
daqInfo.updateStatus()
def getDaqInfos(self, status=None):
daqInfoList = self.getAll()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
......
......@@ -24,6 +24,7 @@ class FileSystemObserver(threading.Thread,Singleton):
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
DAQ_CHUNK_SIZE_IN_FILES = 500
# Singleton.
__instanceLock = threading.RLock()
......@@ -92,7 +93,7 @@ class FileSystemObserver(threading.Thread,Singleton):
# No daq info, ignore
if not daqInfo:
self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectoy, experimentName, experimentfilePath))
self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath))
return
# Do not process hidden files unless requested
......@@ -102,28 +103,43 @@ class FileSystemObserver(threading.Thread,Singleton):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return
daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath)
if not observedFile:
daqInfo.fileAdded(filePath)
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo
self.observedFileMap[filePath] = observedFile
self.logger.debug('New observed file: %s', filePath)
self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
daqInfo.fileAdded(filePath)
else:
self.logger.debug('Observed file updated: %s', filePath)
self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId))
observedFile.setLastUpdateTimeToNow()
@ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self):
now = time.time()
filePathsForProcessing = []
# We use number of waiting files to determine whether
# more files should be added for processing, so we need to
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
nWaitingFilesDict = {}
for (filePath,observedFile) in self.observedFileMap.items():
daqInfo = observedFile['daqInfo']
daqId = daqInfo['id']
nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0))
if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id']))
continue
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
nWaitingFilesDict[daqId] = nWaitingFiles+1
return filePathsForProcessing
@ThreadingUtility.synchronize
......@@ -151,7 +167,7 @@ class FileSystemObserver(threading.Thread,Singleton):
try:
filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing):
self.logger.debug('Checking observed files')
self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))
for filePath in filePathsForProcessing:
self.processFile(filePath)
except Exception, ex:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment