diff --git a/src/python/dm/__init__.py b/src/python/dm/__init__.py index 7000cfa190658a75fd4d03541540dbcb9243a93a..04562a4f370d198d704c23b11af3de2cf672d8ad 100644 --- a/src/python/dm/__init__.py +++ b/src/python/dm/__init__.py @@ -1 +1 @@ -__version__ = "0.14 (2016.10.13)" +__version__ = "0.15 (2016.10.26)" diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index 3a4829cad2745ee5a87ee34b5b45687f37dfed64..846db26baf415e2dfafb4b6bf33e98c5fc426927 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -144,11 +144,13 @@ class SftpUtility: if __name__ == '__main__': #sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12') - sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa') + #sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa') + sftpUtility = SftpUtility('xstor-devel') + files = sftpUtility.getFiles('/data/testing/id7-test02') #files = sftpUtility.getFiles('/export/dm/test') - #print files - print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt') - print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt') + print files + #print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt') + #print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt') #print 'Closing connection' #sftpUtility.closeConnection() #print sftpUtility.statFile('/export/dm/test/testfile01') diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index bafd9fa6e4be42e0f7b4e6f1c159a9b038a46a24..9a0a6d902d88c7e1dcd0545b022acba7b6bd3caa 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -124,11 +124,14 @@ class FileSystemObserver(threading.Thread,Singleton): # more files should be added for processing, so we need to # update all daq infos before going over observed files DaqTracker.getInstance().updateDaqInfos() + nWaitingFilesDict = {} for (filePath,observedFile) in self.observedFileMap.items(): daqInfo = observedFile['daqInfo'] - nWaitingFiles = daqInfo.get('nWaitingFiles', 0) + daqId = daqInfo['id'] + nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0)) if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: # We do not need to add more files for processing for this DAQ + #self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id'])) continue timestamp = observedFile.get('lastUpdateTime') @@ -136,6 +139,7 @@ class FileSystemObserver(threading.Thread,Singleton): if deltaT > self.minFileProcessingDelayInSeconds: self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) filePathsForProcessing.append(filePath) + nWaitingFilesDict[daqId] = nWaitingFiles+1 return filePathsForProcessing @ThreadingUtility.synchronize @@ -163,7 +167,7 @@ class FileSystemObserver(threading.Thread,Singleton): try: filePathsForProcessing = self.checkObservedFilesForProcessing() if len(filePathsForProcessing): - self.logger.debug('Checking observed files') + self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing))) for filePath in filePathsForProcessing: self.processFile(filePath) except Exception, ex: