Skip to content
Snippets Groups Projects
Commit 1a2058e7 authored by sveseli's avatar sveseli
Browse files

modifications to eliminate processing of hidden files and of unchanged files...

modifications to eliminate processing of hidden files and of unchanged files that were processed already; checksum is now done by default
parent 22a58e0e
No related branches found
No related tags found
No related merge requests found
......@@ -55,6 +55,7 @@ class FtpUtility:
parseDict = {}
self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict)
name = parts[-1].strip()
parseDict['Name'] = name
type = parseDict.get('Type', '')
......@@ -82,6 +83,6 @@ class FtpUtility:
# Testing.
if __name__ == '__main__':
ftpUtility = FtpUtility('s8dserv', 2811)
files=ftpUtility.getFiles('/export/8-id-i/test')
ftpUtility = FtpUtility('dmstorage', 2811)
files=ftpUtility.getFiles('/opt/DM/data/ESAF/e1')
print files
......@@ -2,6 +2,7 @@
import cherrypy
import json
import os
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
......@@ -26,6 +27,9 @@ class ExperimentSessionController(DmSessionController):
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith(os.sep):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo is not None:
......@@ -66,6 +70,9 @@ class ExperimentSessionController(DmSessionController):
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith(os.sep):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo:
......
......@@ -16,8 +16,9 @@ class DsProcessFileNotificationPlugin(FileProcessor):
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {}))
md5Sum = fileInfo.get('md5Sum')
self.logger.debug('Processing file %s for experiment %s: %s' % (experimentFilePath, experimentName, str(fileInfo)))
if daqInfo.has_key('fileDict'):
del daqInfo['fileDict']
......@@ -25,7 +26,10 @@ class DsProcessFileNotificationPlugin(FileProcessor):
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
if md5Sum:
fileInfo2['md5Sum'] = md5Sum
fileInfo2['daqInfo'] = daqInfo
self.logger.debug('File info sent to DS service: %s' % (str(fileInfo2)))
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
#######################################################################
......
......@@ -61,7 +61,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
......@@ -70,6 +70,7 @@ class ExperimentSessionControllerImpl(DmObjectManager):
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
......@@ -77,23 +78,32 @@ class ExperimentSessionControllerImpl(DmObjectManager):
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
# Remove hidden files
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
fileDict = {}
for filePath in filePaths:
for filePath in filePathsDict.keys():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
# Stat should be done by agent, not by observer.
try:
# Stat should be done by agent, not by observer.
try:
FileUtility.statFile(filePath, fileUploadInfo)
except:
# Ok, may be remote file
pass
# Ok, may be remote file
pass
fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
UploadTracker.getInstance().put(uploadId, uploadInfo)
......
......@@ -2,12 +2,14 @@
import threading
import time
import os
from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
......@@ -86,6 +88,13 @@ class FileSystemObserver(threading.Thread,Singleton):
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
# Do not process hidden files unless requested
if not daqInfo or not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')):
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
observedFile.setLastUpdateTimeToNow()
if daqInfo:
......
......@@ -16,11 +16,13 @@ from dm.common.db.api.experimentDbApi import ExperimentDbApi
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.exceptions.invalidRequest import InvalidRequest
from pathTracker import PathTracker
class ExperimentManager(Singleton):
CONFIG_SECTION_NAME = 'ExperimentManager'
STORAGE_DIRECTORY_KEY = 'storagedirectory'
STORAGE_ID_KEY = 'storageid'
MANAGE_STORAGE_PERMISSIONS_KEY = 'managestoragepermissions'
PLATFORM_UTILITY_KEY = 'platformutility'
......@@ -43,6 +45,7 @@ class ExperimentManager(Singleton):
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.experimentDbApi = ExperimentDbApi()
self.pathTracker = PathTracker()
self.platformUtility = None
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
......@@ -55,6 +58,7 @@ class ExperimentManager(Singleton):
configItems = cm.getConfigItems(ExperimentManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.storageDirectory = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_DIRECTORY_KEY)
self.storageId = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.STORAGE_ID_KEY)
self.manageStoragePermissions = ValueUtility.toBoolean(cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.MANAGE_STORAGE_PERMISSIONS_KEY))
platformUtility = cm.getConfigOption(ExperimentManager.CONFIG_SECTION_NAME, ExperimentManager.PLATFORM_UTILITY_KEY)
if platformUtility:
......@@ -75,8 +79,10 @@ class ExperimentManager(Singleton):
def updateExperimentWithStorageDataDirectory(self, experiment):
storageDirectory = self.__getExperimentStorageDataDirectory(experiment)
if os.path.exists(storageDirectory):
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = ConfigurationManager.getInstance().getHost()
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
return storageDirectory
def addUserToGroup(self, username, experimentName):
......@@ -137,6 +143,8 @@ class ExperimentManager(Singleton):
OsUtility.chmodPath(storageDirectory, dirMode=self.DIR_PERMISSIONS_MODE)
self.logger.debug('Changing group owner for %s to %s' % (storageDirectory, experimentName))
self.platformUtility.changePathGroupOwner(storageDirectory, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(storageDirectory, ownerUpdateTime)
# Add users to group
experimentUsers = experiment.get('experimentUsernameList', [])
......@@ -157,8 +165,11 @@ class ExperimentManager(Singleton):
else:
self.logger.debug('Creating data directory for experiment %s: %s' % (experimentName, storageDirectory))
OsUtility.createDir(storageDirectory)
storageHost = ConfigurationManager.getInstance().getHost()
experiment['storageDirectory'] = storageDirectory
experiment['storageHost'] = ConfigurationManager.getInstance().getHost()
experiment['storageHost'] = storageHost
experiment['storageUrl'] = '%s://%s%s' % (self.storageId, storageHost, storageDirectory)
if self.manageStoragePermissions:
self.createExperimentGroup(experiment)
......@@ -180,8 +191,13 @@ class ExperimentManager(Singleton):
# Recursively modify subdirectory permissions
dirPath = os.path.dirname(filePath)
while (os.path.abspath(dirPath) != os.path.abspath(storageDirectory)):
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
if self.pathTracker.get(dirPath) is None:
self.logger.debug('Changing group owner for experiment subdirectory %s to %s' % (dirPath, experimentName))
self.platformUtility.changePathGroupOwner(dirPath, experimentName)
ownerUpdateTime = time.time()
self.pathTracker.put(dirPath, ownerUpdateTime)
else:
self.logger.debug('Group owner for experiment subdirectory %s is already set to %s' % (dirPath, experimentName))
dirPath = os.path.dirname(dirPath)
self.logger.debug('Processing file %s' % filePath)
......@@ -199,7 +215,7 @@ class ExperimentManager(Singleton):
self.logger.debug('Getting stat for file path %s' % (filePath))
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
self.logger.debug('File Info after stat: %s' % (fileInfo))
self.logger.debug('File info after stat: %s' % str(fileInfo))
else:
self.logger.debug('File path %s does not exist' % filePath)
raise ObjectNotFound('File %s does not exist' % filePath)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment