From 22cc3b1ba3abb41f90e1256a97dc3af0cec5bbf3 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Sun, 7 Feb 2016 05:43:24 +0000 Subject: [PATCH] reworked reporting for both daqs and uploads; improved ftp utility efficiency; simplified file processing --- src/python/dm/common/objects/daqInfo.py | 96 +++++++++++-------- src/python/dm/common/objects/uploadInfo.py | 76 +++++++-------- .../processing/fileProcessingManager.py | 12 ++- .../common/processing/fileProcessingThread.py | 46 ++++----- .../processing/plugins/fileTransferPlugin.py | 16 ++-- .../plugins/gridftpFileTransferPlugin.py | 28 +++--- .../plugins/mongoDbFileCatalogPlugin.py | 16 +++- .../plugins/rsyncFileTransferPlugin.py | 12 ++- ...WithChecksumAndDeleteFileTransferPlugin.py | 2 +- src/python/dm/common/utility/fileUtility.py | 6 +- src/python/dm/common/utility/ftpUtility.py | 65 +++++++++++-- .../utility/ldapLinuxPlatformUtility.py | 4 +- src/python/dm/common/utility/linuxUtility.py | 4 +- src/python/dm/common/utility/objectCache.py | 24 ++--- src/python/dm/common/utility/sftpUtility.py | 25 +++-- .../utility/timeBasedProcessingQueue.py | 13 ++- src/python/dm/common/utility/timeUtility.py | 8 +- .../daq_web_service/service/daqWebService.py | 4 +- .../service/impl/daqTracker.py | 7 +- .../impl/dsProcessFileNotificationPlugin.py | 4 +- .../impl/experimentSessionControllerImpl.py | 23 +++-- .../service/impl/fileSystemObserver.py | 22 +++-- 22 files changed, 301 insertions(+), 212 deletions(-) diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py index c355cd03..2e127fff 100755 --- a/src/python/dm/common/objects/daqInfo.py +++ b/src/python/dm/common/objects/daqInfo.py @@ -2,8 +2,11 @@ import copy import time +import threading + from dmObject import DmObject from dm.common.utility.dictUtility import DictUtility +from dm.common.utility.timeUtility import TimeUtility class DaqInfo(DmObject): @@ -12,33 +15,55 @@ class DaqInfo(DmObject): def __init__(self, dict={}): DmObject.__init__(self, dict) + self.lock = threading.RLock() self.originalKeys = dict.keys() - self['fileDict'] = self.get('fileDict', {}) + + def fileAdded(self, filePath): + self.lock.acquire() + try: + self['nFiles'] = self.get('nFiles', 0) + 1 + finally: + self.lock.release() + + def fileProcessed(self, filePath, processingEndTime): + self.lock.acquire() + try: + self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1 + lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + self['lastFileProcessed'] = filePath + self['lastFileProcessingEndTime'] = processingEndTime + finally: + self.lock.release() + + def fileProcessingError(self, filePath, processingError, processingEndTime): + self.lock.acquire() + try: + processingErrors = self.get('processingErrors', {}) + processingErrors[filePath] = processingError + self['processingErrors'] = processingErrors + self['nProcessingErrors'] = len(processingErrors) + lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + self['lastFileProcessingEndTime'] = processingEndTime + finally: + self.lock.release() def updateStatus(self): now = time.time() daqStatus = self.get('status', 'running') if daqStatus == 'done': return - fileDict = self.get('fileDict') - nFiles = len(fileDict) - nProcessedFiles = 0 - nProcessingErrors = 0 - processingErrors = {} - for (filePath,fileProcessingInfo) in fileDict.items(): - if fileProcessingInfo.get('processed'): - nProcessedFiles += 1 - elif fileProcessingInfo.get('processingError'): - nProcessingErrors += 1 - processingErrors[filePath] = fileProcessingInfo.get('processingError') - if len(processingErrors): - self['processingErrors'] = processingErrors - - # need to handle 'failed' uploads + nFiles = self.get('nFiles', 0) + nProcessedFiles = self.get('nProcessedFiles', 0) + nProcessingErrors = self.get('nProcessingErrors', 0) + processingErrors = self.get('processingErrors', {}) nCompletedFiles = nProcessedFiles+nProcessingErrors - self['nProcessedFiles'] = '%s' % (nProcessedFiles) - self['nProcessingErrors'] = '%s' % (nProcessingErrors) - self['nFiles'] = '%s' % (nFiles) + + startTime = self.get('startTime') + if startTime: + runTime = now - startTime + self['runTime'] = runTime percentageComplete = 100.0 percentageProcessed = 100.0 @@ -54,30 +79,21 @@ class DaqInfo(DmObject): runTime = now - self.get('startTime') self['runTime'] = runTime - if self.get('endTime'): + if self.get('endTime') and nCompletedFiles == nFiles: daqStatus = 'done' - self['runTime'] = self.get('endTime') - self.get('startTime') + lastFileProcessingEndTime = self.get('lastFileProcessingEndTime') + if lastFileProcessingEndTime > self['endTime']: + self['endTime'] = lastFileProcessingEndTime + self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) + if startTime: + runTime = endTime - startTime + self['runTime'] = runTime self['status'] = daqStatus def toDictWithOriginalKeys(self): - dict = copy.deepcopy(self.data) - for key in dict.keys(): - if key not in self.originalKeys: - del dict[key] + dict = {} + for key in self.originalKeys: + if self.has_key(key): + dict[key] = self.get(key) return dict - def scrub(self, includeFileDetails=False): - # Remove redundant information - daqInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict']) - if not includeFileDetails: - return DaqInfo(daqInfo2) - fileDict = self.get('fileDict', {}) - fileDict2 = {} - for (filePath,fileInfo) in fileDict.items(): - fileInfo2 = {} - for key in ['processed', 'lastUpdateTime', 'processingError']: - if fileInfo.has_key(key): - fileInfo2[key] = fileInfo[key] - fileDict2[filePath] = fileInfo2 - daqInfo2['fileDict'] = fileDict2 - return DaqInfo(daqInfo2) diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py index e00a44a6..21c9ba91 100755 --- a/src/python/dm/common/objects/uploadInfo.py +++ b/src/python/dm/common/objects/uploadInfo.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import time +import threading + from dmObject import DmObject from dm.common.utility.dictUtility import DictUtility from dm.common.utility.timeUtility import TimeUtility @@ -11,52 +13,57 @@ class UploadInfo(DmObject): def __init__(self, dict={}): DmObject.__init__(self, dict) - self['fileDict'] = self.get('fileDict', {}) + self.lock = threading.RLock() + + def fileProcessed(self, filePath, processingEndTime): + self.lock.acquire() + try: + self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1 + lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + self['lastFileProcessed'] = filePath + self['lastFileProcessingEndTime'] = processingEndTime + finally: + self.lock.release() + + def fileProcessingError(self, filePath, processingError, processingEndTime): + self.lock.acquire() + try: + processingErrors = self.get('processingErrors', {}) + processingErrors[filePath] = processingError + self['processingErrors'] = processingErrors + self['nProcessingErrors'] = len(processingErrors) + lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0) + if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime: + self['lastFileProcessingEndTime'] = processingEndTime + finally: + self.lock.release() def updateStatus(self): now = time.time() uploadStatus = self.get('status', 'running') if uploadStatus == 'done': return - fileDict = self.get('fileDict') - nFiles = self.get('nFiles', len(fileDict)) - nProcessedFiles = 0 - nProcessingErrors = 0 - processingErrors = {} - endTime = 0 - for (filePath,fileProcessingInfo) in fileDict.items(): - if fileProcessingInfo.get('processed'): - nProcessedFiles += 1 - elif fileProcessingInfo.get('processingError'): - nProcessingErrors += 1 - processingErrors[filePath] = fileProcessingInfo.get('processingError') - - endProcessingTime = fileProcessingInfo.get('endProcessingTime') - if endProcessingTime is not None and endProcessingTime > endTime: - endTime = endProcessingTime - if len(processingErrors): - self['processingErrors'] = processingErrors + nFiles = self.get('nFiles', 0) + nProcessedFiles = self.get('nProcessedFiles', 0) + nProcessingErrors = self.get('nProcessingErrors', 0) + processingErrors = self.get('processingErrors', {}) + nCompletedFiles = nProcessedFiles+nProcessingErrors startTime = self.get('startTime') if startTime: runTime = now - startTime self['runTime'] = runTime - # need to handle 'failed' uploads - nCompletedFiles = nProcessedFiles+nProcessingErrors if nCompletedFiles == nFiles: uploadStatus = 'done' - if not endTime: - endTime = now + endTime = self.get('lastFileProcessingEndTime', now) self['endTime'] = endTime - self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime) + self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) if startTime: runTime = endTime - startTime self['runTime'] = runTime self['status'] = uploadStatus - self['nProcessedFiles'] = '%s' % (nProcessedFiles) - self['nProcessingErrors'] = '%s' % (nProcessingErrors) - #self['nFiles'] = '%s' % (nFiles) percentageComplete = 100.0 percentageProcessed = 100.0 @@ -69,19 +76,4 @@ class UploadInfo(DmObject): self['percentageProcessed'] = '%.2f' % percentageProcessed self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors - def scrub(self, includeFileDetails=False): - # Remove redundant information - uploadInfo2 = DictUtility.deepCopy(self.data, excludeKeys=['fileDict']) - if not includeFileDetails: - return UploadInfo(uploadInfo2) - fileDict = self.get('fileDict', {}) - fileDict2 = {} - for (filePath,fileInfo) in fileDict.items(): - fileInfo2 = {} - for key in ['processed', 'lastUpdateTime', 'processingError']: - if fileInfo.has_key(key): - fileInfo2[key] = fileInfo[key] - fileDict2[filePath] = fileInfo2 - uploadInfo2['fileDict'] = fileDict2 - return UploadInfo(uploadInfo2) diff --git a/src/python/dm/common/processing/fileProcessingManager.py b/src/python/dm/common/processing/fileProcessingManager.py index 8a5ea3db..faec3eb2 100755 --- a/src/python/dm/common/processing/fileProcessingManager.py +++ b/src/python/dm/common/processing/fileProcessingManager.py @@ -39,8 +39,6 @@ class FileProcessingManager(threading.Thread,Singleton): self.fileProcessorDict = {} self.fileProcessorKeyList = [] self.fileProcessingQueue = TimeBasedProcessingQueue() - self.processedFileDict = {} - self.unprocessedFileDict = {} self.__configure() self.logger.debug('Initialization complete') finally: @@ -74,11 +72,15 @@ class FileProcessingManager(threading.Thread,Singleton): if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')): del uploadInfo['processHiddenFiles'] return filePathsDict - for filePath in filePathsDict.keys(): + self.logger.debug('Checking for hidden files') + nRemoved = 0 + for (filePath,filePathDict) in filePathsDict.items(): fileName = os.path.basename(filePath) if fileName.startswith('.'): self.logger.debug('File path %s is hidden file, will not process it' % filePath) del filePathsDict[filePath] + nRemoved += 1 + self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict))) return filePathsDict # Each plugin calculates list of files that need to be processed @@ -87,6 +89,7 @@ class FileProcessingManager(threading.Thread,Singleton): if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')): del uploadInfo['reprocessFiles'] return filePathsDict + self.logger.debug('Checking files with processor plugins') checkedFilePathsDict = {} for processorKey in self.fileProcessorKeyList: processor = self.fileProcessorDict.get(processorKey) @@ -94,6 +97,7 @@ class FileProcessingManager(threading.Thread,Singleton): pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if len(pluginFilePathsDict): checkedFilePathsDict.update(pluginFilePathsDict) + self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict))) return checkedFilePathsDict def processFile(self, fileInfo): @@ -113,7 +117,7 @@ class FileProcessingManager(threading.Thread,Singleton): self.logger.debug('Starting file processing threads') for i in range(0, self.nProcessingThreads): tName = 'FileProcessingThread-%s' % i - t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue, self.processedFileDict, self.unprocessedFileDict) + t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue) t.start() self.fileProcessingThreadDict[tName] = t finally: diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py index 9a50ae9e..a132b50a 100755 --- a/src/python/dm/common/processing/fileProcessingThread.py +++ b/src/python/dm/common/processing/fileProcessingThread.py @@ -9,7 +9,7 @@ class FileProcessingThread(threading.Thread): THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0 - def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue, processedFileDict, unprocessedFileDict): + def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue): threading.Thread.__init__(self) self.setName(name) @@ -18,8 +18,6 @@ class FileProcessingThread(threading.Thread): self.fileProcessorDict = fileProcessorDict self.fileProcessorKeyList = fileProcessorKeyList self.fileProcessingQueue = fileProcessingQueue - self.processedFileDict = processedFileDict - self.unprocessedFileDict = unprocessedFileDict self.logger = LoggingManager.getInstance().getLogger(name) def run(self): @@ -37,51 +35,49 @@ class FileProcessingThread(threading.Thread): filePath = fileInfo.get('filePath') try: + statusMonitor = fileInfo['statusMonitor'] fileInfo['startProcessingTime'] = time.time() - processingInfo = fileInfo.get('processingInfo') - processingInfo['startProcessingTime'] = fileInfo['startProcessingTime'] processorNumber = 0 nProcessors = len(self.fileProcessorKeyList) + processedByDict = fileInfo.get('processedByDict', {}) + fileInfo['processedByDict'] = processedByDict for processorKey in self.fileProcessorKeyList: processorNumber += 1 processor = self.fileProcessorDict.get(processorKey) - processorName = processor.__class__.__name__ - fileProcessedByDict = fileInfo.get('processedByDict', {}) - fileInfo['processedByDict'] = fileProcessedByDict + processorName = '%s-%s' % (processor.__class__.__name__,processorNumber) - if fileProcessedByDict.has_key(processorName): + if processedByDict.has_key(processorName): self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName)) continue - self.logger.debug('%s is about to process file %s ' % (processorName, fileInfo)) + self.logger.debug('%s is about to process file %s ' % (processorName, filePath)) try: processor.processFile(fileInfo) - fileProcessedByDict[processorName] = True + processedByDict[processorName] = True self.logger.debug('%s processed file at path %s ' % (processorName, filePath)) if processorNumber == nProcessors: self.logger.debug('File %s processing is complete' % (filePath)) - fileInfo['endProcessingTime'] = time.time() - processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] - processingInfo['processed'] = True + endProcessingTime = time.time() + statusMonitor.fileProcessed(filePath, endProcessingTime) + fileInfo.clear() except Exception, ex: self.logger.exception(ex) - errorMsg = '%s processing error: %s' % (processorName, str(ex)) - self.logger.debug(errorMsg) - fileProcessingDict = fileInfo.get('processingDict', {}) - fileInfo['processingDict'] = fileProcessingDict - processorDict = fileProcessingDict.get(processorName, {}) - fileProcessingDict[processorName] = processorDict + processingError = '%s processing error: %s' % (processorName, str(ex)) + self.logger.debug(processingError) + processingDict = fileInfo.get('processingDict', {}) + fileInfo['processingDict'] = processingDict + processorDict = processingDict.get(processorName, {}) + processingDict[processorName] = processorDict processorDict['lastError'] = str(ex) nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft)) processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1 if nRetriesLeft <= 0: - self.unprocessedFileDict[filePath] = fileInfo - fileInfo['endProcessingTime'] = time.time() - processingInfo['endProcessingTime'] = fileInfo['endProcessingTime'] - processingInfo['processingError'] = errorMsg - self.logger.debug('No more %s retries left for file %s' % (processorName, fileInfo)) + endProcessingTime = time.time() + statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) + fileInfo.clear() + self.logger.debug('No more %s retries left for file %s' % (processorName, filePath)) else: retryWaitPeriod = processor.getRetryWaitPeriodInSeconds() self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod)) diff --git a/src/python/dm/common/processing/plugins/fileTransferPlugin.py b/src/python/dm/common/processing/plugins/fileTransferPlugin.py index ea5e579a..1cae8d35 100755 --- a/src/python/dm/common/processing/plugins/fileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/fileTransferPlugin.py @@ -30,7 +30,7 @@ class FileTransferPlugin(FileProcessor): storageDirectory = fileInfo.get('storageDirectory') destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory) srcUrl = self.getSrcUrl(filePath, dataDirectory) - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) def getSrcUrl(self, filePath, dataDirectory): # Use relative path with respect to data directory as a source @@ -43,7 +43,7 @@ class FileTransferPlugin(FileProcessor): return destUrl def getFullCommand(self, src, dest): - return '%s %s %s' % (self.command, src, dest) + return '%s "%s" "%s"' % (self.command, src, dest) def setSrc(self, src): self.src = src @@ -51,17 +51,17 @@ class FileTransferPlugin(FileProcessor): def setDest(self, dest): self.dest = dest - def start(self, src=None, dest=None): + def start(self, src=None, dest=None, fileInfo={}): # Use preconfigured source if provided source is None fileSrc = src if src is None: fileSrc = self.src - # Use provided destination only if preconfigured destination is None - # Plugins may have desired destination preconfigured for all files - fileDest = self.dest - if self.dest is None: - fileDest = dest + fileDest = dest + if dest is None: + fileDest = self.dest + fileSrc = self.replaceTemplates(fileSrc, fileInfo) + fileDest = self.replaceTemplates(fileDest, fileInfo) if not fileSrc or not fileDest: raise InvalidRequest('Both source and destination must be non-empty strings.') self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest)) diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py index 929dc77b..42f90fb1 100755 --- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py @@ -16,12 +16,13 @@ class GridftpFileTransferPlugin(FileTransferPlugin): DEFAULT_PORT = 2811 - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.pluginMustProcessFiles = pluginMustProcessFiles def getSrcUrl(self, filePath, dataDirectory): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) @@ -37,19 +38,26 @@ class GridftpFileTransferPlugin(FileTransferPlugin): (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT) dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip() fileName = os.path.basename(filePath) - destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) + if self.dest: + destUrl = '%s/%s/%s' % (self.dest, dirName, fileName) + else: + destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName) return destUrl def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + if not self.pluginMustProcessFiles: + return {} storageHost = uploadInfo['storageHost'] storageDirectory = uploadInfo['storageDirectory'] dataDirectory = uploadInfo['dataDirectory'] - self.logger.debug('Upload info: %s', uploadInfo) + self.logger.debug('Upload info: %s' % uploadInfo) # Original data directory may contain host/port (scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory) + self.logger.debug('Number of original files: %s' % len(filePathsDict)) + self.logger.debug('Looking for existing files in %s' % storageDirectory) ftpUtility = SftpUtility(storageHost) storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath) - pluginFilePathsDict = copy.copy(filePathsDict) + self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory)) # Remove file from plugin dict if we do not need to transfer it for (filePath,storageFilePathDict) in storageFilePathsDict.items(): filePathDict = filePathsDict.get(filePath) @@ -73,10 +81,10 @@ class GridftpFileTransferPlugin(FileTransferPlugin): continue # No need to transfer file - del pluginFilePathsDict[filePath] + del filePathsDict[filePath] - self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) - return pluginFilePathsDict + self.logger.debug('Number of files that require processing: %s' % len(filePathsDict)) + return filePathsDict def processFile(self, fileInfo): filePath = fileInfo.get('filePath') @@ -97,10 +105,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin): ftpUtility.getMd5Sum(filePath, fileInfo) # Transfer file - srcUrl = self.replaceTemplates(srcUrl, fileInfo) - destUrl = self.replaceTemplates(destUrl, fileInfo) - self.logger.debug('Starting transfer: %s' % fileInfo) - self.start(srcUrl, destUrl) + self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl)) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py index 00850c81..7a80d092 100755 --- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py +++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py @@ -14,6 +14,9 @@ class MongoDbFileCatalogPlugin(FileProcessor): self.fileMongoDbApi = FileMongoDbApi() self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__) + def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + return filePathsDict + def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') @@ -25,13 +28,20 @@ class MongoDbFileCatalogPlugin(FileProcessor): storageUrl = daqInfo.get('storageUrl') storageFilePath = os.path.join(storageDirectory, experimentFilePath) fileProcessingTime = time.time() - fileProcessingTimeStamp = TimeUtility.formatLocalTimeStamp(fileProcessingTime) + fileProcessingTimestamp = TimeUtility.formatLocalTimestamp(fileProcessingTime) # Prepare catalogging entry fileInfo2 = {} - for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp', 'fileModificationTime', 'fileModificationTimeStamp']: + for key in ['md5Sum', 'fileSize']: if fileInfo.has_key(key): fileInfo2[key] = fileInfo.get(key, '') + for key in ['fileCreationTime', 'fileModificationTime']: + if fileInfo.has_key(key): + t = fileInfo.get(key, 0) + fileInfo2[key] = t + key2 = '%sstamp' % key + fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t) + fileLocations = fileInfo.get('fileLocations', []) fileLocations.append('%s/%s' % (storageUrl, experimentFilePath)) @@ -43,7 +53,7 @@ class MongoDbFileCatalogPlugin(FileProcessor): fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName fileInfo2['fileProcessingTime'] = fileProcessingTime - fileInfo2['fileProcessingTimeStamp'] = fileProcessingTimeStamp + fileInfo2['fileProcessingTimestamp'] = fileProcessingTimestamp fileInfo2['fileLocations'] = fileLocations self.logger.debug('Daq info: %s' % (daqInfo)) fileInfo2.update(daqInfo) diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py index 38ec8ec1..0522362e 100755 --- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py @@ -14,14 +14,18 @@ class RsyncFileTransferPlugin(FileTransferPlugin): DEFAULT_COMMAND = 'rsync -arvlPR' DRY_RUN_COMMAND = 'rsync -arvlP' - def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False): + def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, + remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True): FileTransferPlugin.__init__(self, command, src, dest) self.dsFileApi = DsRestApiFactory.getFileRestApi() self.localMd5Sum = localMd5Sum self.remoteMd5Sum = remoteMd5Sum self.deleteOriginal = deleteOriginal + self.pluginMustProcessFiles = pluginMustProcessFiles def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo): + if not self.pluginMustProcessFiles: + return {} storageDirectory = uploadInfo['storageDirectory'] storageHost = uploadInfo['storageHost'] dataDirectory = uploadInfo['dataDirectory'] @@ -38,7 +42,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin): filePathDict = filePathsDict.get(filePath) if filePathDict: pluginFilePathsDict[filePath] = filePathDict - self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict)) + self.logger.debug('Number of original files: %s, number of plugin files: %s' % len(filePathsDict), len(pluginFilePathsDict)) return pluginFilePathsDict def processFile(self, fileInfo): @@ -60,10 +64,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin): FileUtility.getMd5Sum(filePath, fileInfo) # Transfer file - srcUrl = self.replaceTemplates(srcUrl, fileInfo) - destUrl = self.replaceTemplates(destUrl, fileInfo) self.logger.debug('Starting transfer: %s' % fileInfo) - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum if self.remoteMd5Sum: diff --git a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py index f821f5e7..55cfef1b 100755 --- a/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py +++ b/src/python/dm/common/processing/plugins/rsyncWithChecksumAndDeleteFileTransferPlugin.py @@ -35,7 +35,7 @@ class RsyncWithChecksumAndDeleteFileTransferPlugin(FileTransferPlugin): self.logger.debug('File info before transfer: %s' % fileInfo) # Transfer file - self.start(srcUrl, destUrl) + self.start(srcUrl, destUrl, fileInfo) # Get remote checksum fileInfo2 = {} diff --git a/src/python/dm/common/utility/fileUtility.py b/src/python/dm/common/utility/fileUtility.py index 6aa499ce..26070214 100755 --- a/src/python/dm/common/utility/fileUtility.py +++ b/src/python/dm/common/utility/fileUtility.py @@ -22,11 +22,11 @@ class FileUtility: fileInfo['filePath'] = filePath fileInfo['fileSize'] = statResult[stat.ST_SIZE] fileInfo['fileCreationTime'] = statResult[stat.ST_CTIME] - fileInfo['fileCreationTimeStamp'] = TimeUtility.formatLocalTimeStamp(statResult[stat.ST_CTIME]) + fileInfo['fileCreationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_CTIME]) fileInfo['fileAccessTime'] = statResult[stat.ST_ATIME] - fileInfo['fileAccessTimeStamp'] = TimeUtility.formatLocalTimeStamp(statResult[stat.ST_ATIME]) + fileInfo['fileAccessTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_ATIME]) fileInfo['fileModificationTime'] = statResult[stat.ST_MTIME] - fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(statResult[stat.ST_MTIME]) + fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(statResult[stat.ST_MTIME]) return fileInfo ####################################################################### diff --git a/src/python/dm/common/utility/ftpUtility.py b/src/python/dm/common/utility/ftpUtility.py index 312faf28..4514cf63 100755 --- a/src/python/dm/common/utility/ftpUtility.py +++ b/src/python/dm/common/utility/ftpUtility.py @@ -87,12 +87,12 @@ class FtpUtility: if self.serverUsesUtcTime: modifyTime = TimeUtility.utcToLocalTime(modifyTime) fileStatDict['fileModificationTime'] = modifyTime - fileStatDict['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime) + fileStatDict['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(modifyTime) fileStatDict['fileSize'] = int(fileStatDict.get('fileSize')) del fileStatDict['Modify'] del fileStatDict['Type'] - def getFiles(self, dirPath, fileDict={}, replacementDirPath=None): + def getFiles2(self, dirPath, fileDict={}, replacementDirPath=None, initialCall=True): if not self.ftpClient: self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password) # Need these to be class members for the callback function @@ -106,10 +106,39 @@ class FtpUtility: del fileInfo['Name'] filePath = '%s/%s' % (replacementDirPath, fileName) fileDict[filePath] = fileInfo - for d in copy.copy(self.mlsdDirList): + mlsdDirList = copy.copy(self.mlsdDirList) + for d in mlsdDirList: + dirPath2 = '%s/%s' % (dirPath,d) + replacementDirPath2 = '%s/%s' % (replacementDirPath,d) + self.getFiles2(dirPath2,fileDict, replacementDirPath2, initialCall=False) + if initialCall: + del self.mlsdFileDict + del self.mlsdDirList + return fileDict + + def getFiles(self, dirPath, fileDict={}, replacementDirPath=None): + if not self.ftpClient: + self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password) + # Need these to be class members for the callback function + self.mlsdFileDict = {} + self.mlsdDirList = [] + self.ftpClient.retrlines('MLSD %s' % dirPath, self.__parseMlsdOutput) + if not replacementDirPath: + replacementDirPath = dirPath + for (fileName,fileInfo) in self.mlsdFileDict.items(): + filePath = '%s/%s' % (replacementDirPath, fileName) + modifyTime = fileInfo.get('Modify') + modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S')) + if self.serverUsesUtcTime: + modifyTime = TimeUtility.utcToLocalTime(modifyTime) + fileSize = int(fileInfo.get('fileSize')) + #print '%s %s %s' % (filePath, fileSize, modifyTime) + fileDict[filePath] = {'fileSize' : fileSize, 'fileModificationTime' : modifyTime} + mlsdDirList = copy.copy(self.mlsdDirList) + for d in mlsdDirList: dirPath2 = '%s/%s' % (dirPath,d) replacementDirPath2 = '%s/%s' % (replacementDirPath,d) - self.getFiles(dirPath2,fileDict, replacementDirPath2) + self.getFiles(dirPath2, fileDict, replacementDirPath2) return fileDict def getMd5Sum(self, filePath, fileInfo={}): @@ -136,13 +165,29 @@ class FtpUtility: # Testing. if __name__ == '__main__': - ftpUtility = FtpUtility('s8dserv', 2811) - files = ftpUtility.getFiles('/export/8-id-i/test') + print "Round 1: " + ftpUtility = FtpUtility('s7dserv', 2811) + #files = ftpUtility.getFiles2('/export/7IDSprayimage/Cummins/Data') + files = ftpUtility.getFiles2('/export/dm/test') print files - files = ftpUtility.getFiles('/export/8-id-i/test', replacementDirPath='/data/testing/8-id-i') + files = ftpUtility.getFiles('/export/dm/test') print files - print ftpUtility.getMd5Sum('/export/8-id-i/test/testfile01') - print ftpUtility.statFile('/export/8-id-i/test/testfile01') + #files = ftpUtility.getFiles('/export/7IDSprayimage/Cummins/Data') + #files = ftpUtility.getFiles2('/export/8-id-i/test', replacementDirPath='/data/testing/8-id-i') + #print "Number of files: ", len(files) + #time.sleep(60) + #print "Removing files" + #del files + #print "Files removed" + #time.sleep(60) + #del ftpUtility + #print "Utility removed" + #time.sleep(60) + #print "Round 2: " + #ftpUtility = FtpUtility('s7dserv', 2811) + #files = ftpUtility.getFiles2('/export/7IDSprayimage/Cummins/Data') + #print ftpUtility.getMd5Sum('/export/8-id-i/test/testfile01') + #print ftpUtility.statFile('/export/8-id-i/test/testfile01') #ftpUtility = FtpUtility('xstor-devel', 22) - #files = ftpUtility.getFiles('/data/testing') + #files = ftpUtility.getFiles2('/data/testing') #print files diff --git a/src/python/dm/common/utility/ldapLinuxPlatformUtility.py b/src/python/dm/common/utility/ldapLinuxPlatformUtility.py index ec2581c3..58f32b1a 100755 --- a/src/python/dm/common/utility/ldapLinuxPlatformUtility.py +++ b/src/python/dm/common/utility/ldapLinuxPlatformUtility.py @@ -293,14 +293,14 @@ class LdapLinuxPlatformUtility: """ Set path permissions for the given group. """ logger = cls.getLogger() logger.debug('Allowing group %s to read/execute path %s' % (groupName, path)) - cmd = '%s -m group\:%s\:rx %s' % (cls.SETFACL_CMD, groupName, path) + cmd = '%s -m group\:%s\:rx "%s"' % (cls.SETFACL_CMD, groupName, path) cls.executeSudoCommand(cmd) @classmethod def changePathGroupOwner(cls, path, groupName): logger = cls.getLogger() logger.debug('Changing group owner to %s for path %s' % (groupName, path)) - cmd = '%s \:%s %s' % (cls.CHOWN_CMD, groupName, path) + cmd = '%s \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) cls.executeSudoCommand(cmd) @classmethod diff --git a/src/python/dm/common/utility/linuxUtility.py b/src/python/dm/common/utility/linuxUtility.py index 7bfa1835..3517eea3 100755 --- a/src/python/dm/common/utility/linuxUtility.py +++ b/src/python/dm/common/utility/linuxUtility.py @@ -75,14 +75,14 @@ class LinuxUtility: """ Set path permissions for the given group. """ logger = cls.getLogger() logger.debug('Allowing group %s to read/execute path %s' % (groupName, path)) - cmd = '%s -m group\:%s\:rx %s' % (cls.SETFACL_CMD, groupName, path) + cmd = '%s -m group\:%s\:rx "%s"' % (cls.SETFACL_CMD, groupName, path) cls.executeSudoCommand(cmd) @classmethod def changePathGroupOwner(cls, path, groupName): logger = cls.getLogger() logger.debug('Changing group owner to %s for path %s' % (groupName, path)) - cmd = '%s \:%s %s' % (cls.CHOWN_CMD, groupName, path) + cmd = '%s \:%s "%s"' % (cls.CHOWN_CMD, groupName, path) cls.executeSudoCommand(cmd) ####################################################################### diff --git a/src/python/dm/common/utility/objectCache.py b/src/python/dm/common/utility/objectCache.py index 93ff0ea0..81829ae5 100755 --- a/src/python/dm/common/utility/objectCache.py +++ b/src/python/dm/common/utility/objectCache.py @@ -19,7 +19,7 @@ class ObjectCache: def __init__(self, cacheSize, objectLifetime=DEFAULT_OBJECT_LIFETIME, objectClass=None): self.lock = threading.RLock() self.objectMap = {} # id/object map - self.timeStampDeq = deque() # timeStamp deq + self.timestampDeq = deque() # timestamp deq self.cacheSize = cacheSize self.objectLifetime = objectLifetime self.deqSize = ObjectCache.DEFAULT_TIME_STAMP_DEQ_SIZE_FACTOR*cacheSize @@ -34,7 +34,7 @@ class ObjectCache: def __purgeOne(self): # Get rid of one cached item based on the last accessed algorithm. while True: - deqEntry = self.timeStampDeq.popleft() + deqEntry = self.timestampDeq.popleft() oldId = deqEntry[0] cachedEntry = self.objectMap.get(oldId) if cachedEntry is not None: @@ -46,20 +46,20 @@ class ObjectCache: # Done. return - def __purgeTimeStampDeq(self): + def __purgeTimestampDeq(self): # Get rid of stale entries. - timeStampDeq = deque() - while len(self.timeStampDeq): - deqEntry = self.timeStampDeq.popleft() + timestampDeq = deque() + while len(self.timestampDeq): + deqEntry = self.timestampDeq.popleft() id = deqEntry[0] cachedEntry = self.objectMap.get(id) if cachedEntry is not None: # Timestamp entry is valid. if cachedEntry == deqEntry: # Found current item, keep it. - timeStampDeq.append(deqEntry) + timestampDeq.append(deqEntry) # Done. - self.timeStampDeq = timeStampDeq + self.timestampDeq = timestampDeq return def put(self, id, item, objectLifetime=None): @@ -71,11 +71,11 @@ class ObjectCache: self.lock.acquire() try: self.objectMap[id] = entry - self.timeStampDeq.append(entry) + self.timestampDeq.append(entry) if len(self.objectMap) > self.cacheSize: self.__purgeOne() - if len(self.timeStampDeq) > self.deqSize: - self.__purgeTimeStampDeq() + if len(self.timestampDeq) > self.deqSize: + self.__purgeTimestampDeq() finally: self.lock.release() @@ -110,7 +110,7 @@ class ObjectCache: return len(self.objectMap) def __str__(self): - return '%s' % self.timeStampDeq + return '%s' % self.timestampDeq ####################################################################### # Testing. diff --git a/src/python/dm/common/utility/sftpUtility.py b/src/python/dm/common/utility/sftpUtility.py index db860c2a..b79a2c7c 100755 --- a/src/python/dm/common/utility/sftpUtility.py +++ b/src/python/dm/common/utility/sftpUtility.py @@ -3,6 +3,7 @@ import copy import stat import pysftp +from dm.common.utility.timeUtility import TimeUtility from dm.common.utility.loggingManager import LoggingManager import urlparse @@ -50,27 +51,33 @@ class SftpUtility: outputDict[key] = value return outputDict - def getFiles(self, dirPath, fileDict={}): + def getFiles(self, dirPath, fileDict={}, replacementDirPath=None): if not self.sftpClient: self.sftpClient = self.getSftpClient(self.host, self.port, self.username, self.password) - # Need these to be class members for the callback function + if not replacementDirPath: + replacementDirPath = dirPath attrs = self.sftpClient.listdir_attr(dirPath) - mode = attrs[0].st_mode for attr in attrs: fileName = attr.filename mode = attr.st_mode - fullPath = '%s/%s' % (dirPath, fileName) if stat.S_ISDIR(mode): - self.getFiles(fullPath, fileDict) + dirPath2 = '%s/%s' % (dirPath, fileName) + replacementDirPath2 = '%s/%s' % (replacementDirPath, fileName) + self.getFiles(dirPath2, fileDict, replacementDirPath2) elif stat.S_ISREG(mode): - fileInfo = {'filePath' : fullPath, 'fileSize' : attr.st_size, } - fileDict[fullPath] = fileInfo + filePath = '%s/%s' % (replacementDirPath, fileName) + fileInfo = {'fileSize' : attr.st_size, + 'fileModificationTime' : attr.st_mtime } + fileInfo['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(attr.st_mtime) + fileDict[filePath] = fileInfo return fileDict ####################################################################### # Testing. if __name__ == '__main__': - sftpUtility = SftpUtility('dmstorage', username='dm') - files = sftpUtility.getFiles('/opt/DM/data/ESAF/e1') + sftpUtility = SftpUtility('xstor-devel', username='dmadmin') + files = sftpUtility.getFiles('/data/testing/test1') + print files + files = sftpUtility.getFiles('/data/testing/test1', replacementDirPath='/xyz/ccc') print files diff --git a/src/python/dm/common/utility/timeBasedProcessingQueue.py b/src/python/dm/common/utility/timeBasedProcessingQueue.py index d25544e3..3850638a 100755 --- a/src/python/dm/common/utility/timeBasedProcessingQueue.py +++ b/src/python/dm/common/utility/timeBasedProcessingQueue.py @@ -53,15 +53,20 @@ class TimeBasedProcessingQueue: if __name__ == '__main__': import random q = TimeBasedProcessingQueue() - for i in range(0,10): + for i in range(0,10000000): waitTime = random.uniform(0,10) q.push(i, waitTime) - print 'Added: ', i, '; Processing wait: ', waitTime + #print 'Added: ', i, '; Processing wait: ', waitTime + print "Sleeping..." + time.sleep(60) + print "Removing..." while not q.isEmpty(): i = q.pop() - print 'Got: ', i - time.sleep(1) + #print 'Got: ', i + #time.sleep(1) + print "Sleeping..." + time.sleep(60) diff --git a/src/python/dm/common/utility/timeUtility.py b/src/python/dm/common/utility/timeUtility.py index 6d60f76a..d0eb6710 100755 --- a/src/python/dm/common/utility/timeUtility.py +++ b/src/python/dm/common/utility/timeUtility.py @@ -11,22 +11,22 @@ class TimeUtility: UTC_MINUS_LOCAL_TIME = None @classmethod - def getCurrentGMTimeStamp(cls): + def getCurrentGMTimestamp(cls): """ Formats GMT timestamp. """ return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(time.time())) @classmethod - def formatGMTimeStamp(cls, t): + def formatGMTimestamp(cls, t): """ Format GMT timestamp. """ return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(t)) @classmethod - def getCurrentLocalTimeStamp(cls): + def getCurrentLocalTimestamp(cls): """ Formats local timestamp. """ return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(time.time())) @classmethod - def formatLocalTimeStamp(cls, t): + def formatLocalTimestamp(cls, t): """ Formats local timestamp. """ return time.strftime('%Y/%m/%d %H:%M:%S %Z', time.localtime(t)) diff --git a/src/python/dm/daq_web_service/service/daqWebService.py b/src/python/dm/daq_web_service/service/daqWebService.py index 788c3da6..edcb0ff4 100755 --- a/src/python/dm/daq_web_service/service/daqWebService.py +++ b/src/python/dm/daq_web_service/service/daqWebService.py @@ -27,8 +27,8 @@ class DaqWebService(DmRestWebServiceBase): moduleManager.addModule(FileProcessingManager.getInstance()) # Requred processing plugin - notificationPlugin = DaqProcessingCompleteNotificationPlugin() - FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin) + #notificationPlugin = DaqProcessingCompleteNotificationPlugin() + #FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin) self.logger.debug('Initialized dm modules') def getDefaultServerHost(self): diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py index e6ca84c0..18bd49ba 100755 --- a/src/python/dm/daq_web_service/service/impl/daqTracker.py +++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py @@ -30,6 +30,9 @@ class DaqTracker(ObjectTracker): raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory)) daqId = str(uuid.uuid4()) daqInfo['id'] = daqId + daqInfo['nFiles'] = 0 + daqInfo['nProcessedFiles'] = 0 + daqInfo['nProcessingErrors'] = 0 daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') @@ -39,7 +42,7 @@ class DaqTracker(ObjectTracker): startTime = time.time() daqInfo2['startTime'] = startTime - daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimeStamp(startTime) + daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime) self.activeDaqDict[activeDaqKey] = daqInfo2 self.put(daqId, daqInfo2) @@ -54,7 +57,7 @@ class DaqTracker(ObjectTracker): raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory)) endTime = time.time() daqInfo['endTime'] = endTime - daqInfo['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime) + daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime) daqInfo.updateStatus() del self.activeDaqDict[activeDaqKey] return daqInfo diff --git a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py index d8deccf5..35cd9bbb 100755 --- a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py +++ b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py @@ -16,11 +16,9 @@ class DsProcessFileNotificationPlugin(FileProcessor): def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') - daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {})) + daqInfo = fileInfo.get('daqInfo', {}) md5Sum = fileInfo.get('md5Sum') self.logger.debug('Processing file %s for experiment %s: %s' % (experimentFilePath, experimentName, str(fileInfo))) - if daqInfo.has_key('fileDict'): - del daqInfo['fileDict'] # Prepare dictionary for processing. Only send needed data. fileInfo2 = {} diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 0bf7ff67..e0c51b6c 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -50,14 +50,14 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory) FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment) daqInfo.updateStatus() - return daqInfo.scrub(includeFileDetails) + return daqInfo def getDaqInfo(self, id, includeFileDetails=False): daqInfo = DaqTracker.getInstance().getDaqInfo(id) if not daqInfo: raise ObjectNotFound('Daq id %s not found.' % id) daqInfo.updateStatus() - return daqInfo.scrub(includeFileDetails) + return daqInfo def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False): experiment = self.dsExperimentApi.getExperimentByName(experimentName) @@ -78,9 +78,12 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadInfo['storageHost'] = experiment.get('storageHost') uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory + uploadInfo['nProcessedFiles'] = 0 + uploadInfo['nProcessingErrors'] = 0 + startTime = time.time() uploadInfo['startTime'] = startTime - uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime) + uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime) daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') @@ -89,17 +92,16 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo['uploadId'] = uploadId # Remove hidden files + self.logger.debug('Checking %s processing candidates' % len(filePathsDict)) filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) # Check which files need to be processed filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) if not len(filePathsDict): raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) - fileDict = {} - #UploadTracker.getInstance().put(uploadId, uploadInfo) UploadTracker.getInstance().startUpload(uploadId, uploadInfo) - uploadInfo['fileDict'] = fileDict uploadInfo['nFiles'] = len(filePathsDict) + self.logger.debug('Will prepare upload of %s files' % len(filePathsDict)) timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict]) timer.start() return uploadInfo @@ -107,27 +109,24 @@ class ExperimentSessionControllerImpl(DmObjectManager): def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict): uploadId = uploadInfo.get('id') self.logger.debug('Preparing upload id: %s' % uploadId) - fileDict = uploadInfo.get('fileDict') dataDirectory = uploadInfo.get('dataDirectory') fileProcessingManager = FileProcessingManager.getInstance() for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId - fileProcessingInfo = { 'processed' : False } - fileDict[filePath] = fileProcessingInfo - fileInfo['processingInfo'] = fileProcessingInfo + fileInfo['statusMonitor'] = uploadInfo try: fileProcessingManager.processFile(fileInfo) except Exception, ex: self.logger.error('Processing error: %s', ex) self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict))) - def getUploadInfo(self, id, includeFileDetails=False): + def getUploadInfo(self, id): uploadInfo = UploadTracker.getInstance().get(id) if not uploadInfo: raise ObjectNotFound('Upload id %s not found.' % id) uploadInfo.updateStatus() - return uploadInfo.scrub(includeFileDetails) + return uploadInfo diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index 90b0e099..1d907ce9 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -88,21 +88,27 @@ class FileSystemObserver(threading.Thread,Singleton): @ThreadingUtility.synchronize def fileUpdated(self, filePath, dataDirectory, experiment): daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory) + experimentName = experiment.get('name') + + # No daq info, ignore + if not daqInfo: + self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectoy, experimentName, experimentfilePath)) + return + # Do not process hidden files unless requested - if not daqInfo or not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')): + if not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')): fileName = os.path.basename(filePath) if fileName.startswith('.'): self.logger.debug('File path %s is hidden file, will not process it' % filePath) return - observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) - fileProcessingInfo = { 'processed' : False } - observedFile['processingInfo'] = fileProcessingInfo - observedFile.setLastUpdateTimeToNow() - if daqInfo: - daqFileDict = daqInfo['fileDict'] - daqFileDict[filePath] = observedFile + observedFile = self.observedFileMap.get(filePath) + if not observedFile: + daqInfo.fileAdded(filePath) + observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys() + observedFile['statusMonitor'] = daqInfo + observedFile.setLastUpdateTimeToNow() self.observedFileMap[filePath] = observedFile self.logger.debug('Observed file updated: %s', observedFile) -- GitLab