diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py
index 17a60e3c4bd40005a99f5ac0b00fc77a939412b3..f881e473754eef74b28f6d176cf6ad71ea124642 100644
--- a/src/python/dm/__init__.py
+++ b/src/python/dm/__init__.py
@@ -1 +1 @@
-__version__ = "1.1 (2017.04.12)"
+__version__ = "1.1 (2017.04.18)"
diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py
index b55001c2acd65854b8451bc73dc3ad795e5405e3..ac72f784f08048d3852b2d26e118a91ea2f25c64 100755
--- a/src/python/dm/common/processing/fileProcessingManager.py
+++ b/src/python/dm/common/processing/fileProcessingManager.py
@@ -132,6 +132,8 @@ class FileProcessingManager(threading.Thread,Singleton):
def queueFile(self, fileInfo, processingWaitTime=0):
owner = fileInfo.get('owner')
self.fileProcessingQueue.push(fileInfo, itemProcessingWaitTime=processingWaitTime, owner=owner)
+ if not owner:
+ self.logger.warn('Owner is not set for file %s' % fileInfo)
if not processingWaitTime:
self.eventFlag.set()
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index e7e032040d2fd2ed65658c7a6f73ef196367ca08..7732c9ddcc73fdc4cf948e577ae6b5eaa27621bf 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -235,7 +235,9 @@ class ExperimentSessionControllerImpl(DmObjectManager):
nProcessedFiles += 1
except Exception, ex:
self.logger.error('Processing error: %s', ex)
- self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
+ nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
+ nQueuedFiles = fileProcessingManager.getNumberOfQueuedFilesByOwner(uploadId)
+ self.logger.debug('Done preparing upload %s; it has a total of %s files (%s completed and %s queued)' % (uploadId, nFiles, nCompletedFiles, nQueuedFiles))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
index 1cb0f9435c8bcbe3fae29e8245c466c4a8d04aa2..a3150139dbb1261fb782fbafa141389ccc11267b 100755
--- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
+++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py
@@ -127,6 +127,9 @@ class FileSystemObserver(threading.Thread,Singleton):
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
nObservedFiles = len(self.observedFileMap)
+ if not nObservedFiles:
+ return
+
self.logger.debug('There are %s observed files waiting to be processed.' % (nObservedFiles))
scheduledDict = {}
skippedDict = {}
@@ -136,7 +139,9 @@ class FileSystemObserver(threading.Thread,Singleton):
nCompletedFiles = daqInfo.get('nCompletedFiles', 0)
nQueuedFiles = self.fileProcessingManager.getNumberOfQueuedFilesByOwner(daqId)
- if nQueuedFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
+ nScheduledFiles = scheduledDict.get(daqId,0)
+ #self.logger.debug('There are %s scheduled and %s already queued files for DAQ id %s.' % (nScheduledFiles, nQueuedFiles, daqId))
+ if nQueuedFiles+nScheduledFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nQueuedFiles, daqId))
skippedDict[daqId] = skippedDict.get(daqId,0)+1