From 1a2058e76e17a0351bee3bf58cf37421ec5fe17c Mon Sep 17 00:00:00 2001 From: Sinisa Veseli <sveseli@aps.anl.gov> Date: Tue, 26 Jan 2016 03:58:43 +0000 Subject: [PATCH] modifications to eliminate processing of hidden files and of unchanged files that were processed already; checksum is now done by default --- src/python/dm/common/utility/ftpUtility.py | 5 ++-- .../service/experimentSessionController.py | 7 +++++ .../impl/dsProcessFileNotificationPlugin.py | 6 ++++- .../impl/experimentSessionControllerImpl.py | 22 +++++++++++----- .../service/impl/fileSystemObserver.py | 9 +++++++ .../service/impl/experimentManager.py | 26 +++++++++++++++---- 6 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/python/dm/common/utility/ftpUtility.py b/src/python/dm/common/utility/ftpUtility.py index 8a75aed4..8cd2be4b 100755 --- a/src/python/dm/common/utility/ftpUtility.py +++ b/src/python/dm/common/utility/ftpUtility.py @@ -55,6 +55,7 @@ class FtpUtility: parseDict = {} self.__parseKeyValue(parts[0], parseDict) self.__parseKeyValue(parts[1], parseDict) + self.__parseKeyValue(parts[2], parseDict) name = parts[-1].strip() parseDict['Name'] = name type = parseDict.get('Type', '') @@ -82,6 +83,6 @@ class FtpUtility: # Testing. if __name__ == '__main__': - ftpUtility = FtpUtility('s8dserv', 2811) - files=ftpUtility.getFiles('/export/8-id-i/test') + ftpUtility = FtpUtility('dmstorage', 2811) + files=ftpUtility.getFiles('/opt/DM/data/ESAF/e1') print files diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py index 034c77b8..8fa338a9 100755 --- a/src/python/dm/daq_web_service/service/experimentSessionController.py +++ b/src/python/dm/daq_web_service/service/experimentSessionController.py @@ -2,6 +2,7 @@ import cherrypy import json +import os from dm.common.service.dmSessionController import DmSessionController from dm.common.exceptions.invalidRequest import InvalidRequest @@ -26,6 +27,9 @@ class ExperimentSessionController(DmSessionController): if not dataDirectory: raise InvalidRequest('Missing data directory.') dataDirectory = Encoder.decode(dataDirectory) + if not dataDirectory.startswith(os.sep): + raise InvalidRequest('Data directory must be an absolute path.') + daqInfo = {} encodedDaqInfo = kwargs.get('daqInfo') if encodedDaqInfo is not None: @@ -66,6 +70,9 @@ class ExperimentSessionController(DmSessionController): if not dataDirectory: raise InvalidRequest('Missing data directory.') dataDirectory = Encoder.decode(dataDirectory) + if not dataDirectory.startswith(os.sep): + raise InvalidRequest('Data directory must be an absolute path.') + daqInfo = {} encodedDaqInfo = kwargs.get('daqInfo') if encodedDaqInfo: diff --git a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py index 55340ed3..d8deccf5 100755 --- a/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py +++ b/src/python/dm/daq_web_service/service/impl/dsProcessFileNotificationPlugin.py @@ -16,8 +16,9 @@ class DsProcessFileNotificationPlugin(FileProcessor): def processFile(self, fileInfo): experimentFilePath = fileInfo.get('experimentFilePath') experimentName = fileInfo.get('experimentName') - self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName)) daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {})) + md5Sum = fileInfo.get('md5Sum') + self.logger.debug('Processing file %s for experiment %s: %s' % (experimentFilePath, experimentName, str(fileInfo))) if daqInfo.has_key('fileDict'): del daqInfo['fileDict'] @@ -25,7 +26,10 @@ class DsProcessFileNotificationPlugin(FileProcessor): fileInfo2 = {} fileInfo2['experimentFilePath'] = experimentFilePath fileInfo2['experimentName'] = experimentName + if md5Sum: + fileInfo2['md5Sum'] = md5Sum fileInfo2['daqInfo'] = daqInfo + self.logger.debug('File info sent to DS service: %s' % (str(fileInfo2))) self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2) ####################################################################### diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py index 07085c6d..0d4e0507 100755 --- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py +++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py @@ -61,7 +61,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): storageDirectory = experiment.get('storageDirectory') if storageDirectory is None: raise InvalidRequest('Experiment %s has not been started.' % experimentName) - filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory) + filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory) fileProcessingManager = FileProcessingManager.getInstance() uploadId = str(uuid.uuid4()) self.logger.debug('Starting upload id %s' % uploadId) @@ -70,6 +70,7 @@ class ExperimentSessionControllerImpl(DmObjectManager): uploadInfo['experimentName'] = experimentName uploadInfo['storageDirectory'] = experiment.get('storageDirectory') uploadInfo['storageHost'] = experiment.get('storageHost') + uploadInfo['storageUrl'] = experiment.get('storageUrl') uploadInfo['dataDirectory'] = dataDirectory startTime = time.time() uploadInfo['startTime'] = startTime @@ -77,23 +78,32 @@ class ExperimentSessionControllerImpl(DmObjectManager): daqInfo['experimentName'] = experimentName daqInfo['storageDirectory'] = experiment.get('storageDirectory') daqInfo['storageHost'] = experiment.get('storageHost') + daqInfo['storageUrl'] = experiment.get('storageUrl') daqInfo['dataDirectory'] = dataDirectory daqInfo['uploadId'] = uploadId + # Remove hidden files + filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo) + # Check which files need to be processed + filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo) + if not len(filePathsDict): + raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory) + fileDict = {} - for filePath in filePaths: + for filePath in filePathsDict.keys(): fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment) fileInfo['daqInfo'] = daqInfo fileInfo['uploadId'] = uploadId fileUploadInfo = { 'processed' : False } - # Stat should be done by agent, not by observer. - try: + # Stat should be done by agent, not by observer. + try: FileUtility.statFile(filePath, fileUploadInfo) except: - # Ok, may be remote file - pass + # Ok, may be remote file + pass fileDict[filePath] = fileUploadInfo fileProcessingManager.processFile(fileInfo) + uploadInfo['fileDict'] = fileDict #self.logger.debug('Upload info %s' % uploadInfo) UploadTracker.getInstance().put(uploadId, uploadInfo) diff --git a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py index b5b15bcb..89c8df97 100755 --- a/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py +++ b/src/python/dm/daq_web_service/service/impl/fileSystemObserver.py @@ -2,12 +2,14 @@ import threading import time +import os from watchdog.observers.polling import PollingObserver from dm.common.utility.loggingManager import LoggingManager from dm.common.utility.configurationManager import ConfigurationManager from dm.common.objects.observedFile import ObservedFile +from dm.common.utility.valueUtility import ValueUtility from dm.common.utility.objectUtility import ObjectUtility from dm.common.utility.singleton import Singleton from dm.common.utility.threadingUtility import ThreadingUtility @@ -86,6 +88,13 @@ class FileSystemObserver(threading.Thread,Singleton): @ThreadingUtility.synchronize def fileUpdated(self, filePath, dataDirectory, experiment): daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory) + # Do not process hidden files unless requested + if not daqInfo or not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')): + fileName = os.path.basename(filePath) + if fileName.startswith('.'): + self.logger.debug('File path %s is hidden file, will not process it' % filePath) + return + observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)) observedFile.setLastUpdateTimeToNow() if daqInfo: diff --git a/src/python/dm/ds_web_service/service/impl/experimentManager.py b/src/python/dm/ds_web_service/service/impl/experimentManager.py index de809aeb..d1206121 100755 --- a/src/python/dm/ds_web_service/service/impl/experimentManager.py +++ b/src/python/dm/ds_web_service/service/impl/experimentManager.py @@ -16,11 +16,13 @@ from dm.common.db.api.experimentDbApi import ExperimentDbApi from dm.common.processing.fileProcessingManager import FileProcessingManager from dm.common.exceptions.objectNotFound import ObjectNotFound from dm.common.exceptions.invalidRequest import InvalidRequest +from pathTracker import PathTracker class ExperimentManager(Singleton): CONFIG_SECTION_NAME = 'ExperimentManager' STORAGE_DIRECTORY_KEY = 'storagedirectory' + STORAGE_ID_KEY = 'storageid' MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions' PLATFORM_UTILITY_KEY = 'platformutility' @@ -43,6 +45,7 @@ class ExperimentManager(Singleton): self.logger.debug('Initializing') self.lock = threading.RLock() self.experimentDbApi = ExperimentDbApi() + self.pathTracker = PathTracker() self.platformUtility = None self.__configure() self.fileProcessingManager = FileProcessingManager.getInstance() @@ -55,6 +58,7 @@ class ExperimentManager(Singleton): configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME) self.logger.debug('Got config items: %s' % configItems) self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY) + self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY) self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY)) platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY) if platformUtility: @@ -75,8 +79,10 @@ class ExperimentManager(Singleton): def updateExperimentWithStorageDataDirectory(self, experiment): storageDirectory = self.__getExperimentStorageDataDirectory(experiment) if os.path.exists(storageDirectory): + storageHost = ConfigurationManager.getInstance().getHost() experiment['storageDirectory'] = storageDirectory - experiment['storageHost'] = ConfigurationManager.getInstance().getHost() + experiment['storageHost'] = storageHost + experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory) return storageDirectory def addUserToGroup(self, username, experimentName): @@ -137,6 +143,8 @@ class ExperimentManager(Singleton): OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE) self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName)) self.platformUtility.changePathGroupOwner(storageDirectory, experimentName) + ownerUpdateTime = time.time() + self.pathTracker.put(storageDirectory, ownerUpdateTime) # Add users to group experimentUsers = experiment.get('experimentUsernameList', []) @@ -157,8 +165,11 @@ class ExperimentManager(Singleton): else: self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory)) OsUtility.createDir(storageDirectory) + + storageHost = ConfigurationManager.getInstance().getHost() experiment['storageDirectory'] = storageDirectory - experiment['storageHost'] = ConfigurationManager.getInstance().getHost() + experiment['storageHost'] = storageHost + experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory) if self.manageStoragePermissions: self.createExperimentGroup(experiment) @@ -180,8 +191,13 @@ class ExperimentManager(Singleton): # Recursively modify subdirectory permissions dirPath = os.path.dirname(filePath) while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)): - self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName)) - self.platformUtility.changePathGroupOwner(dirPath, experimentName) + if self.pathTracker.get(dirPath) is None: + self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName)) + self.platformUtility.changePathGroupOwner(dirPath, experimentName) + ownerUpdateTime = time.time() + self.pathTracker.put(dirPath, ownerUpdateTime) + else: + self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName)) dirPath = os.path.dirname(dirPath) self.logger.debug('Processing file %s' % filePath) @@ -199,7 +215,7 @@ class ExperimentManager(Singleton): self.logger.debug('Getting stat for file path %s' % (filePath)) FileUtility.statFile(filePath, fileInfo) FileUtility.getMd5Sum(filePath, fileInfo) - self.logger.debug('File Info after stat: %s' % (fileInfo)) + self.logger.debug('File info after stat: %s' % str(fileInfo)) else: self.logger.debug('File path %s does not exist' % filePath) raise ObjectNotFound('File %s does not exist' % filePath) -- GitLab