Skip to content
Snippets Groups Projects
Commit 9f0c5635 authored by sveseli's avatar sveseli
Browse files

merge from 0.15

parents 532086cf 355aac09
No related branches found
No related tags found
No related merge requests found
__version__ = "0.14 (2016.10.13)" __version__ = "0.15 (2016.10.26)"
...@@ -144,11 +144,13 @@ class SftpUtility: ...@@ -144,11 +144,13 @@ class SftpUtility:
if __name__ == '__main__': if __name__ == '__main__':
#sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12') #sftpUtility = SftpUtility('s1dserv', username='dmadmin', password='theKey12')
sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa') #sftpUtility = SftpUtility('s1dserv',privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
sftpUtility = SftpUtility('xstor-devel')
files = sftpUtility.getFiles('/data/testing/id7-test02')
#files = sftpUtility.getFiles('/export/dm/test') #files = sftpUtility.getFiles('/export/dm/test')
#print files print files
print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt') #print sftpUtility.getMd5Sum('/export/dm/test/testfile03(2nd).txt')
print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt') #print sftpUtility.getMd5Sum('/export/dm/test/testfile 04.txt')
#print 'Closing connection' #print 'Closing connection'
#sftpUtility.closeConnection() #sftpUtility.closeConnection()
#print sftpUtility.statFile('/export/dm/test/testfile01') #print sftpUtility.statFile('/export/dm/test/testfile01')
...@@ -124,11 +124,14 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -124,11 +124,14 @@ class FileSystemObserver(threading.Thread,Singleton):
# more files should be added for processing, so we need to # more files should be added for processing, so we need to
# update all daq infos before going over observed files # update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos() DaqTracker.getInstance().updateDaqInfos()
nWaitingFilesDict = {}
for (filePath,observedFile) in self.observedFileMap.items(): for (filePath,observedFile) in self.observedFileMap.items():
daqInfo = observedFile['daqInfo'] daqInfo = observedFile['daqInfo']
nWaitingFiles = daqInfo.get('nWaitingFiles', 0) daqId = daqInfo['id']
nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0))
if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES: if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ # We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id']))
continue continue
timestamp = observedFile.get('lastUpdateTime') timestamp = observedFile.get('lastUpdateTime')
...@@ -136,6 +139,7 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -136,6 +139,7 @@ class FileSystemObserver(threading.Thread,Singleton):
if deltaT > self.minFileProcessingDelayInSeconds: if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT)) self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath) filePathsForProcessing.append(filePath)
nWaitingFilesDict[daqId] = nWaitingFiles+1
return filePathsForProcessing return filePathsForProcessing
@ThreadingUtility.synchronize @ThreadingUtility.synchronize
...@@ -163,7 +167,7 @@ class FileSystemObserver(threading.Thread,Singleton): ...@@ -163,7 +167,7 @@ class FileSystemObserver(threading.Thread,Singleton):
try: try:
filePathsForProcessing = self.checkObservedFilesForProcessing() filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing): if len(filePathsForProcessing):
self.logger.debug('Checking observed files') self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))
for filePath in filePathsForProcessing: for filePath in filePathsForProcessing:
self.processFile(filePath) self.processFile(filePath)
except Exception, ex: except Exception, ex:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment