Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1509 additions and 0 deletions
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import uuid
import copy
import threading
from dm.common.constants import dmProcessingStatus
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.objects.pluginInfo import PluginInfo
from dm.common.objects.directoryUploadInfo import DirectoryUploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
UPLOAD_CHUNK_SIZE_IN_FILES = 100
UPLOAD_CHUNK_REFRESH_IN_SECONDS = 10.0
DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS = 15.0
SECONDS_PER_HOUR = 60*60
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
if maxRunTimeInHours:
daqId = daqInfo['id']
self.logger.debug('Starting timer to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours' % (daqId, experimentName, maxRunTimeInHours))
maxRunTimeInSeconds = maxRunTimeInHours*self.SECONDS_PER_HOUR
timer = threading.Timer(maxRunTimeInSeconds, self.stopDaqTimer, args=[experimentName, dataDirectory, daqId])
timer.start()
return daqInfo
def stopDaqTimer(self, experimentName, dataDirectory, daqId):
try:
daqInfo = DaqTracker.getInstance().getDaqInfo(daqId)
maxRunTimeInHours = daqInfo.get('maxRunTimeInHours')
self.logger.debug('Attempting to automatically stop DAQ id %s for experiment %s, after max runtime of %s hours was exceeded' % (daqId, experimentName, maxRunTimeInHours))
daqStatus = daqInfo.get('status')
if daqStatus != dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
self.logger.debug('DAQ id %s has status of %s, will not stop it automatically' % (daqId, daqStatus))
return
self.stopDaq(experimentName, dataDirectory)
except Exception, ex:
self.logger.error('Could not automatically stop DAQ id %s: %s' % (daqId, str(ex)))
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
daqInfo.updateStatus()
daqId = daqInfo.get('id')
self.logger.error('Stopped DAQ id %s for experiment %s' % (daqId, experimentName))
# Prepare upload on exit
uploadDataDirectoryOnExit = daqInfo.get('uploadDataDirectoryOnExit')
if uploadDataDirectoryOnExit:
self.logger.error('Attempting automatic upload on exit for DAQ id %s, experiment %s' % (daqId, experimentName))
daqInfo2 = {}
daqInfo2['originalDaqId'] = daqId
uploadTargetDirectoryOnExit = daqInfo.get('uploadTargetDirectoryOnExit')
if uploadTargetDirectoryOnExit:
self.logger.debug('Automatic upload on exit for DAQ id %s (experiment %s) is using target directory: %s' % (daqId, experimentName, uploadTargetDirectoryOnExit))
daqInfo2['targetDirectory'] = uploadTargetDirectoryOnExit
try:
uploadInfo = self.uploadFiles(experimentName, uploadDataDirectoryOnExit, daqInfo2)
daqInfo['uploadIdOnExit'] = uploadInfo.get('id')
except Exception, ex:
self.logger.error('Could not automatically upload DAQ id %s: %s' % (daqId, str(ex)))
daqInfo['uploadErrorOnExit'] = str(ex)
return daqInfo
def getDaqInfo(self, id):
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo
def listDaqs(self, status):
daqInfoList = DaqTracker.getInstance().getDaqInfos(status)
return daqInfoList
def uploadFiles(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
uploadInfo['nProcessedFiles'] = 0
uploadInfo['nProcessingErrors'] = 0
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
# Check that there is at least one processor that can process files
processorList = []
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName not in skipPlugins:
processorList.append(processor)
if not len(processorList):
raise InvalidRequest('There are no plugins that can process files for upload in directory %s.' % dataDirectory)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = 0
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_PENDING
self.logger.debug('Starting upload timer for %s' % dataDirectory)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUploadFiles, args=[uploadInfo, daqInfo, experiment])
timer.start()
return uploadInfo
def prepareUploadFiles(self, uploadInfo, daqInfo, experiment):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
dataDirectory = uploadInfo.get('dataDirectory')
targetDirectory = uploadInfo.get('targetDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
try:
# Get files
self.logger.debug('Retrieving file paths for %s' % dataDirectory)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
# Remove hidden files
self.logger.debug('Checking %s processing candidates' % len(filePathsDict))
filePathsDict = fileProcessingManager.removeHiddenFilesFromProcessing(filePathsDict, uploadInfo)
# Check which files need to be processed
filePathsDict = fileProcessingManager.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if not len(filePathsDict):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
except Exception, ex:
self.logger.error('Processing error for upload %s: %s' % (uploadId, str(ex)))
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
uploadInfo['errorMessage'] = str(ex)
return
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
self.logger.debug('Will prepare upload of %s files' % len(filePathsDict))
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
try:
# Only create new uploads if we have less than
# UPLOAD_CHUNK_SIZE_IN_FILES waiting to be completed
while True:
status = uploadInfo.get('status')
if status == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
nCancelledFiles = nFiles - nProcessedFiles
uploadInfo.uploadAborted(nCancelledFiles)
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
return
nCompletedFiles = uploadInfo.get('nCompletedFiles', 0)
nWaitingFiles = nProcessedFiles - nCompletedFiles
if nWaitingFiles < self.UPLOAD_CHUNK_SIZE_IN_FILES:
# We need to add more files for upload
break
self.logger.debug('Upload %s has %s files waiting for upload, will not add any more for %s seconds' % (uploadId, nWaitingFiles, self.UPLOAD_CHUNK_REFRESH_IN_SECONDS))
time.sleep(self.UPLOAD_CHUNK_REFRESH_IN_SECONDS)
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileInfo['statusMonitor'] = uploadInfo
fileInfo['skipPlugins'] = uploadInfo.get('skipPlugins', [])
fileProcessingManager.processFile(fileInfo)
nProcessedFiles += 1
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo
def listUploads(self, status):
uploadInfoList = UploadTracker.getInstance().getUploadInfos(status)
return uploadInfoList
def stopUpload(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING
uploadInfo.updateStatus()
return uploadInfo
def uploadDirectory(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePathsDict = FileSystemObserver.getInstance().getFiles(dataDirectory)
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = DirectoryUploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['storageUrl'] = experiment.get('storageUrl')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimestamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
skipPlugins = DictUtility.getAndRemoveKey(daqInfo, 'skipPlugins', '')
if len(skipPlugins):
skipPlugins = skipPlugins.split(',')
uploadInfo['skipPlugins'] = skipPlugins
else:
skipPlugins = []
fileProcessingManager = FileProcessingManager.getInstance()
processingInfo = {}
uploadInfo['processingInfo'] = processingInfo
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipPlugins:
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED}
else:
self.logger.debug('Starting %s processing timer for directory %s upload' % (processorName, dataDirectory))
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.processUploadDirectory, args=[processor, uploadInfo, daqInfo, experiment, filePathsDict])
processingInfo[processorName] = {'status' : dmProcessingStatus.DM_PROCESSING_STATUS_PENDING}
timer.start()
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['nFiles'] = len(filePathsDict)
uploadInfo['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
return uploadInfo
def processUploadDirectory(self, processor, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
processorName = processor.name
processingInfo = uploadInfo.get('processingInfo')
self.logger.debug('Starting %s processing for upload %s by %s' % (dataDirectory, uploadId, processorName))
try:
dependsOn = processor.dependsOn
while True:
# Check status
if uploadInfo['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
return
# Check that processor can proceed
canProcess = False
if not len(dependsOn):
canProcess = True
for depProcessorName in dependsOn:
depProcessorStatus = processingInfo.get(depProcessorName).get('status')
if depProcessorStatus in ['skipped', 'aborted', 'failed']:
# We must skip processing
self.logger.debug('Skipping %s processing for upload %s due to %s status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
elif depProcessorStatus in [dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING]:
# Do nothing
pass
elif depProcessorStatus == 'done':
# We can proceed
canProcess = True
else:
# This should not happen
self.logger.error('Skipping %s processing for upload %s due to %s unrecognized status of %s' % (processorName, uploadId, depProcessorName, depProcessorStatus))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_SKIPPED
return
# Process directory if we can
if canProcess:
directoryInfo = {'uploadInfo' : uploadInfo,
'daqInfo' : daqInfo,
'experiment' : experiment,
'filePathsDict' : filePathsDict
}
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
processingStartTime = time.time()
processor.processDirectory(directoryInfo)
if processingInfo[processorName]['status'] == dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING:
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_DONE
self.logger.debug('Directory %s processing complete for upload %s by %s' % (dataDirectory, uploadId, processorName))
else:
self.logger.debug('Incomplete directory %s processing upload %s by %s, status: %s' % (dataDirectory, uploadId, processorName, processingInfo[processorName]['status']))
break
# Wait a bit longer
time.sleep(self.DIRECTORY_UPLOAD_PROCESSING_WAIT_IN_SECONDS)
except Exception, ex:
self.logger.error('%s processing for upload %s failed: %s' % (processorName, uploadId, str(ex)))
processingInfo[processorName]['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_FAILED
processingInfo[processorName]['processingError'] = str(ex)
processingEndTime = time.time()
processingInfo[processorName]['processingEndTime'] = processingEndTime
processingInfo[processorName]['processingStartTime'] = processingStartTime
processingInfo[processorName]['processingRunTime'] = processingEndTime-processingStartTime
def getProcessingPlugins(self):
pluginList = []
fileProcessingManager = FileProcessingManager.getInstance()
for processorKey in fileProcessingManager.fileProcessorKeyList:
processor = fileProcessingManager.fileProcessorDict.get(processorKey)
pluginInfo = {'name' : processor.name, 'dependsOn' : processor.dependsOn}
pluginList.append(PluginInfo(pluginInfo))
return pluginList
#!/usr/bin/env python
from dm.common.objects.experiment import Experiment
from dm.common.utility.objectTracker import ObjectTracker
class ExperimentTracker(ObjectTracker):
# Cache configuration
objectClass = Experiment
####################################################################
# Testing
if __name__ == '__main__':
et = ExperimentTracker.getInstance()
print et
et2 = ExperimentTracker.getInstance()
print et2
#!/usr/bin/env python
import threading
import time
import os
from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dmFileSystemEventHandler import DmFileSystemEventHandler
from daqTracker import DaqTracker
class FileSystemObserver(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileSystemObserver'
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
DAQ_CHUNK_SIZE_IN_FILES = 500
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileSystemObserver.__instanceLock.acquire()
try:
if FileSystemObserver.__instance:
return
FileSystemObserver.__instance = self
threading.Thread.__init__(self)
self.setName('FileSystemObserverThread')
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.eventFlag = threading.Event()
self.exitFlag = False
self.observedFileMap = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
FileSystemObserver.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.logger.debug('Minimum file processing delay: %s seconds' % self.minFileProcessingDelayInSeconds)
self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
self.logger.debug('Creating file system observer agent instance of class %s' % className)
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def createDirectory(self, dataDirectory):
self.fileSystemObserverAgent.createDirectory(dataDirectory)
@ThreadingUtility.synchronize
def getFiles(self, dataDirectory):
self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
return self.fileSystemObserverAgent.getFiles(dataDirectory)
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def stopObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
experimentName = experiment.get('name')
# No daq info, ignore
if not daqInfo:
self.logger.debug('No daq for data directory %s and experiment %s, file path %s will not be processed' % (dataDirectory, experimentName, experimentfilePath))
return
# Do not process hidden files unless requested
if not ValueUtility.toBoolean(daqInfo.get('processHiddenFiles')):
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
return
daqId = daqInfo['id']
observedFile = self.observedFileMap.get(filePath)
targetDirectory = daqInfo.get('targetDirectory')
if not observedFile:
observedFile = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, targetDirectory=targetDirectory)
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
observedFile['statusMonitor'] = daqInfo
self.observedFileMap[filePath] = observedFile
self.logger.debug('New observed file: %s (daq id: %s)' % (filePath, daqId))
daqInfo.fileAdded(filePath)
else:
self.logger.debug('Observed file updated: %s (daq id: %s)' % (filePath, daqId))
observedFile.setLastUpdateTimeToNow()
@ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self):
now = time.time()
filePathsForProcessing = []
# We use number of waiting files to determine whether
# more files should be added for processing, so we need to
# update all daq infos before going over observed files
DaqTracker.getInstance().updateDaqInfos()
nWaitingFilesDict = {}
for (filePath,observedFile) in self.observedFileMap.items():
daqInfo = observedFile['daqInfo']
daqId = daqInfo['id']
nWaitingFiles = nWaitingFilesDict.get(daqId, daqInfo.get('nWaitingFiles', 0))
if nWaitingFiles >= self.DAQ_CHUNK_SIZE_IN_FILES:
# We do not need to add more files for processing for this DAQ
#self.logger.debug('There are %s waiting files for DAQ id %s, will not add more for processing.' % (nWaitingFiles, daqInfo['id']))
continue
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
nWaitingFilesDict[daqId] = nWaitingFiles+1
return filePathsForProcessing
@ThreadingUtility.synchronize
def processFile(self, filePath):
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.processFile(observedFile)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting file observer agent')
self.fileSystemObserverAgent.start()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName())
break
try:
filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing):
self.logger.debug('Will queue %s new files for processing' % (len(filePathsForProcessing)))
for filePath in filePathsForProcessing:
self.processFile(filePath)
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemEventTimeoutInSeconds)
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopping file observer agent')
self.fileSystemObserverAgent.stop()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
@ThreadingUtility.synchronize
def setEvent(self):
self.eventFlag.set()
@ThreadingUtility.synchronize
def clearEvent(self):
self.eventFlag.clear()
####################################################################
# Testing
if __name__ == '__main__':
fp = FileSystemObserver.getInstance()
fp.start()
time.sleep(30)
fp.stop()
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class FileSystemObserverAgent:
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = None
def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver
def createDirectory(self, dataDirectory):
pass
def getFiles(self, dataDirectory):
pass
def startObservingPath(self, dataDirectory, experiment):
pass
def stopObservingPath(self, dataDirectory, experiment):
pass
def start(self):
pass
def stop(self):
pass
#!/usr/bin/env python
from threading import Timer
from pollingFileSystemObserverAgent import PollingFileSystemObserverAgent
from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
self.host = host
self.port = port
self.username = username
self.password = password
def getFiles(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
self.logger.debug('Retrieving files from FTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
ftpUtility = FtpUtility(host, port, self.username, self.password)
return ftpUtility.getFiles(dirPath, {})
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = FtpFileSystemObserverAgent('zagreb', 2811)
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.osUtility import OsUtility
class PollingFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_RETRY_PERIOD_IN_SECONDS = 60
def __init__(self, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
FileSystemObserverAgent.__init__(self)
self.pollingPeriod = pollingPeriod
self.retryDelay = 0
self.observedDirDict = {}
self.isDone = False
def getFiles(self, dataDirectory):
self.logger.debug('Retrieving files for directory: %s' % (dataDirectory))
return OsUtility.findFilesAsDict(dataDirectory, {})
def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver:
self.logger.debug('Processing file path: %s' % filePath)
self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)
def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
for filePath in fileDict.keys():
if not oldFileDict.has_key(filePath):
# new file, must be updated
self.logger.debug('New file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('fileModificationTime', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('fileModificationTime')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
def pollFileSystem(self, dataDirectory, experiment):
try:
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
return
oldFileDict = observedDirInfo.get('files')
fileDict = self.getFiles(dataDirectory)
observedDirInfo['files'] = fileDict
self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
self.retryDelay = 0
except Exception, ex:
self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
self.retryDelay = self.DEFAULT_RETRY_PERIOD_IN_SECONDS
self.logger.debug('Next polling for directory %s will be delayed by: %s seconds' % (dataDirectory, self.retryDelay))
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
if self.isDone:
return
delay = self.pollingPeriod + self.retryDelay
t = Timer(delay, self.pollFileSystem, [dataDirectory, experiment])
observedDirInfo['pollTimer'] = t
t.start()
def startObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if observedDirInfo:
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Observer for %s is not active' % dataDirectory)
return
self.logger.debug('Stopping observer for %s' % dataDirectory)
t = observedDirInfo.get('pollTimer')
if t:
t.cancel()
del self.observedDirDict[dataDirectory]
def start(self):
self.logger.debug('Starting ftp observer agent')
def stop(self):
self.logger.debug('Stopping ftp observer agent')
self.isDone = True
for (dataDirectory,observedDirInfo) in self.observedDirDict.items():
experiment = observedDirInfo.get('experiment')
self.stopObservingPath(dataDirectory, experiment)
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = PollingFileSystemObserverAgent()
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')
#!/usr/bin/env python
from threading import Timer
from pollingFileSystemObserverAgent import PollingFileSystemObserverAgent
from dm.common.utility.sftpUtility import SftpUtility
class SftpFileSystemObserverAgent(PollingFileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 15
DEFAULT_PORT = 22
def __init__(self, host, port=DEFAULT_PORT, username=None, password=None, privateKey=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
PollingFileSystemObserverAgent.__init__(self, pollingPeriod)
self.host = host
self.port = port
self.username = username
self.password = password
self.privateKey = privateKey
def getFiles(self, dataDirectory):
(scheme, host, port, dirPath) = SftpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
self.logger.debug('Retrieving files from SFTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
sftpUtility = SftpUtility(host, port, self.username, self.password, self.privateKey)
return sftpUtility.getFiles(dirPath, {})
####################################################################
# Testing
if __name__ == '__main__':
import time
dirPath='/export/beams12/S1IDUSER/mnt/orthros/park_apr16_rec_reduced'
agent = SftpFileSystemObserverAgent('s1dserv', privateKey='/home/beams/DMADMIN/.ssh/id_dsa')
print 'TIME1: ', time.time()
print 'ORIGINAL FILES: ', len(agent.getFiles(dirPath))
print 'TIME2: ', time.time()
#agent.startObservingPath('/export/dm/test', 'e1')
#time.sleep(100)
#agent.stopObservingPath('/export/dm/test', 'e1')
#!/usr/bin/env python
import os
from dm.common.constants import dmProcessingStatus
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
class UploadTracker(ObjectTracker):
# Cache configuration
objectClass = UploadInfo
cacheSize = 100
def __init__(self, *args, **kwargs):
ObjectTracker.__init__(self, args, kwargs)
self.activeUploadDict = {}
def checkForActiveUpload(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeUploadKey = experimentName + dataDir
uploadId = self.activeUploadDict.get(activeUploadKey)
if uploadId:
uploadInfo = self.get(uploadId)
if uploadInfo is not None:
uploadInfo.updateStatus()
if uploadInfo.get('status') in dmProcessingStatus.DM_ACTIVE_PROCESSING_STATUS_LIST:
raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir))
del self.activeUploadDict[activeUploadKey]
def startUpload(self, uploadId, uploadInfo):
experimentName = uploadInfo.get('experimentName')
dataDirectory = uploadInfo.get('dataDirectory')
dataDir = os.path.normpath(dataDirectory)
activeUploadKey = experimentName + dataDir
self.activeUploadDict[activeUploadKey] = uploadId
self.put(uploadId, uploadInfo)
def getUploadInfo(self, id):
return self.get(id)
def getUploadInfos(self, status=None):
uploadInfoList = self.getAll()
filteredUploadInfoList = []
for uploadInfo in uploadInfoList:
uploadInfo.updateStatus()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY or uploadInfo.get('status', '') == status:
filteredUploadInfoList.append(uploadInfo)
return filteredUploadInfoList
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
#
# Implementation for user info controller.
#
#######################################################################
from dm.common.objects.dmObject import DmObject
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.db.api.userDbApi import UserDbApi
#######################################################################
class UserInfoControllerImpl(DmObjectManager):
""" User info controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.userDbApi = UserDbApi()
def getUsers(self):
return self.userDbApi.getUsers()
def getUserById(self, id):
return self.userDbApi.getUserById(id)
def getUserByUsername(self, username):
return self.userDbApi.getUserByUsername(username)
#!/usr/bin/env python
from dm.common.utility.osUtility import OsUtility
from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
DAQ_PERMISSIONS_MODE = 0777
def __init__(self):
FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver()
self.observedWatchDict = {}
def createDirectory(self, dataDirectory):
try:
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
except Exception, ex:
self.logger.warn('Unable to create directory %s: %s' % (dataDirectory, ex))
def getFiles(self, dataDirectory):
return OsUtility.findFilesAsDict(dataDirectory)
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
self.observedWatchDict[dataDirectory] = observedWatch
def stopObservingPath(self, dataDirectory, experiment):
observedWatch = self.observedWatchDict.get(dataDirectory)
if observedWatch:
self.logger.debug('Stopping observer for %s' % dataDirectory)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[dataDirectory]
else:
self.logger.debug('Observer for %s is not active' % dataDirectory)
def start(self):
self.logger.debug('Starting watchdog observer agent')
self.observer.start()
def stop(self):
self.logger.debug('Stopping watchdog observer agent')
self.observer.stop()
self.observer.join()
#!/usr/bin/env python
import os
import json
import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.authorizationPrincipal import AuthorizationPrincipal
from dm.common.objects.dmSession import DmSession
from dsRestApi import DsRestApi
class AuthRestApi(DsRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DsRestApi.__init__(self, username, password, host, port, protocol)
@DsRestApi.execute
def getAuthorizationPrincipal(self, username):
if username is None:
raise InvalidRequest('Username must be provided.')
url = '%s/authorizationPrincipals/%s' % (self.getContextRoot(), username)
self.logger.debug('Retrieving principal for user %s' % (username))
responseData = self.sendSessionRequest(url=url, method='GET')
return AuthorizationPrincipal(responseData)
@DsRestApi.execute
def addSession(self, sessionId, sessionInfo):
if sessionId is None:
raise InvalidRequest('Session id must be provided.')
if sessionInfo is None:
raise InvalidRequest('Session info must be provided.')
url = '%s/sessions/%s' % (self.getContextRoot(), sessionId)
url += '?sessionInfo=%s' % Encoder.encode(json.dumps(sessionInfo))
responseData = self.sendSessionRequest(url=url, method='POST')
return DmSession(responseData)
@DsRestApi.execute
def checkSession(self, sessionId):
if sessionId is None:
raise InvalidRequest('Session id must be provided.')
url = '%s/sessions/%s' % (self.getContextRoot(), sessionId)
responseData = self.sendSessionRequest(url=url, method='PUT')
return DmSession(responseData)
#######################################################################
# Testing.
if __name__ == '__main__':
api = AuthRestApi('dm', 'dm', 'zagreb.svdev.net', 22236, 'http')
print api.authenticateUser('sveseli', 'sv')
#!/usr/bin/env python
from dm.common.api.dmRestApi import DmRestApi
from dm.common.utility.configurationManager import ConfigurationManager
class DsRestApi(DmRestApi):
""" Base DS DM REST api class. """
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
if host == None:
host = ConfigurationManager.getInstance().getDsWebServiceHost()
if port == None:
port = ConfigurationManager.getInstance().getDsWebServicePort()
DmRestApi.__init__(self, username, password, host, port, protocol)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
class DsRestApiFactory:
CONFIG_SECTION_NAME = 'DsRestApiFactory'
USERNAME_KEY = 'username'
PASSWORD_FILE_KEY = 'passwordfile'
HOST_KEY = 'host'
PORT_KEY = 'port'
PROTOCOL_KEY = 'protocol'
__logger = None
__username = None
__password = None
__host = None
__port = None
__protocol = None
@classmethod
def getLogger(cls):
if cls.__logger is None:
cls.__logger = LoggingManager.getInstance().getLogger(cls.__name__)
return cls.__logger
@classmethod
def __getConfiguration(cls):
if cls.__username is None:
cls.__username = ConfigurationManager.getInstance().getConfigOption(cls.CONFIG_SECTION_NAME, cls.USERNAME_KEY)
cls.__password = open(ConfigurationManager.getInstance().getConfigOption(cls.CONFIG_SECTION_NAME, cls.PASSWORD_FILE_KEY)).read().strip()
cls.__host = ConfigurationManager.getInstance().getConfigOption(cls.CONFIG_SECTION_NAME, cls.HOST_KEY)
cls.__port = ConfigurationManager.getInstance().getConfigOption(cls.CONFIG_SECTION_NAME, cls.PORT_KEY)
cls.__protocol = ConfigurationManager.getInstance().getConfigOption(cls.CONFIG_SECTION_NAME, cls.PROTOCOL_KEY)
return (cls.__username, cls.__password, cls.__host, cls.__port, cls.__protocol)
@classmethod
def getUserRestApi(cls):
from userRestApi import UserRestApi
(username, password, host, port, protocol) = cls.__getConfiguration()
api = UserRestApi(username, password, host, port, protocol)
return api
@classmethod
def getAuthRestApi(cls):
from authRestApi import AuthRestApi
(username, password, host, port, protocol) = cls.__getConfiguration()
api = AuthRestApi(username, password, host, port, protocol)
return api
@classmethod
def getExperimentRestApi(cls):
from experimentRestApi import ExperimentRestApi
(username, password, host, port, protocol) = cls.__getConfiguration()
api = ExperimentRestApi(username, password, host, port, protocol)
return api
@classmethod
def getFileRestApi(cls):
from fileRestApi import FileRestApi
(username, password, host, port, protocol) = cls.__getConfiguration()
api = FileRestApi(username, password, host, port, protocol)
return api
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.objects.experiment import Experiment
from dm.common.objects.experimentType import ExperimentType
from dsRestApi import DsRestApi
class ExperimentRestApi(DsRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DsRestApi.__init__(self, username, password, host, port, protocol)
@DsRestApi.execute
def getExperimentTypes(self):
url = '%s/experimentTypes' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, ExperimentType)
@DsRestApi.execute
def getExperiments(self):
url = '%s/experiments' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, Experiment)
@DsRestApi.execute
def getExperimentsByStation(self, stationName):
url = '%s/experimentsByStation/%s' % (self.getContextRoot(), stationName)
if not stationName:
raise InvalidRequest('Experiment station name must be provided.')
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, Experiment)
@DsRestApi.execute
def getExperimentByName(self, name):
url = '%s/experimentsByName/%s' % (self.getContextRoot(), name)
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return Experiment(responseDict)
@DsRestApi.execute
def getExperimentById(self, id):
url = '%s/experimentsById/%s' % (self.getContextRoot(), id)
if id is None:
raise InvalidRequest('Experiment id must be provided.')
responseDict = self.sendSessionRequest(url=url, method='GET')
return Experiment(responseDict)
@DsRestApi.execute
def startExperiment(self, name):
url = '%s/experiments/start' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
responseDict = self.sendSessionRequest(url=url, method='PUT')
return Experiment(responseDict)
@DsRestApi.execute
def updateExperiment(self, name):
url = '%s/experiments/update' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
responseDict = self.sendSessionRequest(url=url, method='PUT')
return Experiment(responseDict)
@DsRestApi.execute
def stopExperiment(self, name):
url = '%s/experiments/stop' % (self.getContextRoot())
if name is None or not len(name):
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
responseDict = self.sendSessionRequest(url=url, method='PUT')
return Experiment(responseDict)
@DsRestApi.execute
def addExperiment(self, name, stationName, typeName, description, startDate, endDate):
url = '%s/experiments' % (self.getContextRoot())
if not name:
raise InvalidRequest('Experiment name must be provided.')
url += '?name=%s' % Encoder.encode(name)
if not stationName:
raise InvalidRequest('Experiment station name must be provided.')
url += '&stationName=%s' % Encoder.encode(stationName)
if not typeName:
raise InvalidRequest('Experiment type must be provided.')
url += '&typeName=%s' % Encoder.encode(typeName)
if description is not None:
url += '&description=%s' % Encoder.encode(description)
if startDate is not None:
url += '&startDate=%s' % Encoder.encode(startDate)
if endDate is not None:
url += '&endDate=%s' % Encoder.encode(endDate)
responseDict = self.sendSessionRequest(url=url, method='POST')
return Experiment(responseDict)
#######################################################################
# Testing.
if __name__ == '__main__':
api = ExperimentRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 33336, 'http')
print api.startExperiment('experiment1')
#!/usr/bin/env python
import os
import urllib
import json
import getpass
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.fileMetadata import FileMetadata
from dm.common.objects.directoryMetadata import DirectoryMetadata
from dm.common.objects.experiment import Experiment
from dm.common.utility.rsyncFileTransfer import RsyncFileTransfer
from dsRestApi import DsRestApi
class FileRestApi(DsRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DsRestApi.__init__(self, username, password, host, port, protocol)
@DsRestApi.execute
def processFile(self, experimentFilePath, experimentName, fileInfo={}):
url = '%s/files/processFile' % (self.getContextRoot())
if not experimentFilePath:
raise InvalidRequest('Experiment file path must be provided.')
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
fileInfo['experimentFilePath'] = experimentFilePath
fileInfo['experimentName'] = experimentName
url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return FileMetadata(responseDict)
@DsRestApi.execute
def statFile(self, experimentFilePath, experimentName, fileInfo={}):
url = '%s/files/statFile' % (self.getContextRoot())
if not experimentFilePath:
raise InvalidRequest('Experiment file path must be provided.')
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
fileInfo['experimentFilePath'] = experimentFilePath
fileInfo['experimentName'] = experimentName
url += '?fileInfo=%s' % (Encoder.encode(json.dumps(fileInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return FileMetadata(responseDict)
@DsRestApi.execute
def processDirectory(self, experimentDirectoryPath, experimentName, directoryInfo={}):
url = '%s/files/processDirectory' % (self.getContextRoot())
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath
directoryInfo['experimentName'] = experimentName
url += '?directoryInfo=%s' % (Encoder.encode(json.dumps(directoryInfo)))
responseDict = self.sendSessionRequest(url=url, method='POST')
return DirectoryMetadata(responseDict)
@DsRestApi.execute
def download(self, experimentName, experimentFilePath='', destDirectory='.'):
username = getpass.getuser()
# Initialize download
url = '%s/downloadAuthorizations/%s/%s' % (self.getContextRoot(), username, experimentName)
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
self.logger.info('Authorizing download for user %s (experiment: %s)' % (username, experimentName))
responseDict = self.sendSessionRequest(url=url, method='POST')
experiment = Experiment(responseDict)
# Download
try:
storageDirectory = experiment.get('storageDirectory')
storageHost = experiment.get('storageHost')
src = '%s@%s:%s' % (username, storageHost, storageDirectory)
if experimentFilePath:
src = '%s/%s' % (src, experimentFilePath)
dest = destDirectory
rsyncPath = '/tmp/rsync.%s.%s' % (username, experimentName)
flags = '-arvlP --rsync-path="%s"' % rsyncPath
fileTransfer = RsyncFileTransfer(src=src, dest=dest, flags=flags)
self.logger.info('Executing file download on behalf of %s (experiment: %s)' % (username, experimentName))
fileTransfer.execute()
finally:
# Finalize download
self.logger.info('Deleting download authorization for user %s (experiment: %s)' % (username, experimentName))
self.sendSessionRequest(url=url, method='DELETE')
#######################################################################
# Testing.
if __name__ == '__main__':
api = FileRestApi('sveseli', 'sveseli', 'zagreb.svdev.net', 22236, 'http')
print api.processFile('file1', '/ESAF/exp1', 'exp1')
#!/usr/bin/env python
import os
import urllib
from dm.common.utility.encoder import Encoder
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.objects.userInfo import UserInfo
from dm.common.objects.userExperimentRole import UserExperimentRole
from dm.common.objects.userSystemRole import UserSystemRole
from dsRestApi import DsRestApi
class UserRestApi(DsRestApi):
def __init__(self, username=None, password=None, host=None, port=None, protocol=None):
DsRestApi.__init__(self, username, password, host, port, protocol)
@DsRestApi.execute
def getUsers(self):
url = '%s/users' % (self.getContextRoot())
responseData = self.sendSessionRequest(url=url, method='GET')
return self.toDmObjectList(responseData, UserInfo)
@DsRestApi.execute
def getUserById(self, id):
if id is None:
raise InvalidRequest('User id must be provided.')
url = '%s/users/%s' % (self.getContextRoot(), id)
responseData = self.sendSessionRequest(url=url, method='GET')
return UserInfo(responseData)
@DsRestApi.execute
def getUserByUsername(self, username):
if username is None:
raise InvalidRequest('Username must be provided.')
url = '%s/usersByUsername/%s' % (self.getContextRoot(), username)
responseData = self.sendSessionRequest(url=url, method='GET')
return UserInfo(responseData)
@DsRestApi.execute
def addUserSystemRole(self, username, roleName, experimentStationName=None):
if not username:
raise InvalidRequest('Username must be provided.')
if not roleName:
raise InvalidRequest('Role name must be provided.')
url = '%s/userSystemRoles/%s/%s' % (self.getContextRoot(), username, roleName)
if experimentStationName:
url += '?experimentStationName=%s' % (experimentStationName)
responseData = self.sendSessionRequest(url=url, method='POST')
return UserSystemRole(responseData)
@DsRestApi.execute
def deleteUserSystemRole(self, username, roleName, experimentStationName=None):
if not username:
raise InvalidRequest('Username must be provided.')
if not roleName:
raise InvalidRequest('Role name must be provided.')
url = '%s/userSystemRoles/%s/%s' % (self.getContextRoot(), username, roleName)
if experimentStationName:
url += '?experimentStationName=%s' % (experimentStationName)
responseData = self.sendSessionRequest(url=url, method='DELETE')
return UserSystemRole(responseData)
@DsRestApi.execute
def addUserExperimentRole(self, username, roleName, experimentName):
if not username:
raise InvalidRequest('Username must be provided.')
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not roleName:
raise InvalidRequest('Role name must be provided.')
url = '%s/userExperimentRoles/%s/%s/%s' % (self.getContextRoot(), username, roleName, experimentName)
responseData = self.sendSessionRequest(url=url, method='POST')
return UserExperimentRole(responseData)
@DsRestApi.execute
def deleteUserExperimentRole(self, username, roleName, experimentName):
if not username:
raise InvalidRequest('Username must be provided.')
if not experimentName:
raise InvalidRequest('Experiment name must be provided.')
if not roleName:
raise InvalidRequest('Role name must be provided.')
url = '%s/userExperimentRoles/%s/%s/%s' % (self.getContextRoot(), username, roleName, experimentName)
responseData = self.sendSessionRequest(url=url, method='DELETE')
return UserExperimentRole(responseData)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.ds_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.configurationManager import ConfigurationManager
from dsWebServiceSessionCli import DsWebServiceSessionCli
class AddExperimentCli(DsWebServiceSessionCli):
def __init__(self):
DsWebServiceSessionCli.__init__(self)
configManager = ConfigurationManager.getInstance()
self.allowedExperimentTypes = configManager.getAllowedExperimentTypes()
allowedExperimentTypesHelp = ''
if self.allowedExperimentTypes:
allowedExperimentTypesHelp = ' Allowed types: %s' % self.allowedExperimentTypes
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--station', dest='stationName', help='Experiment station name, can also be set via DM_STATION_NAME environment variable.')
self.addOption('', '--type', dest='typeName', help='Experiment type name.%s' % allowedExperimentTypesHelp)
self.addOption('', '--type-id', dest='typeId', help='Experiment type id (may be given instead of type name; type id is ignored if both type name and id are provided).')
self.addOption('', '--description', dest='description', help='Experiment description.')
self.addOption('', '--start-date', dest='startDate', help='Experiment start date in format DD-MMM-YY.')
self.addOption('', '--end-date', dest='endDate', help='Experiment end date in format DD-MMM-YY.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.getTypeName() is None:
raise InvalidRequest('Experiment type name must be provided.')
if self.getStationName() is None:
raise InvalidRequest('Experiment station name must be provided.')
# If allowed experiment types is not set, there are no restrictions
if self.allowedExperimentTypes:
if self.getTypeName() not in self.allowedExperimentTypes.split(','):
raise InvalidRequest('Experiment type %s is not allowed on this station. Allowed types are: %s.' % (self.getTypeName(), self.allowedExperimentTypes))
def getExperimentName(self):
return self.options.experimentName
def getStationName(self):
stationName = self.options.stationName
if not stationName:
configManager = ConfigurationManager.getInstance()
stationName = configManager.getStationName()
return stationName
def getTypeName(self):
typeName = self.options.typeName
if not typeName:
if self.options.typeId:
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experimentType = api.getExperimentTypeById(typeId)
typeName = experimentType.get('name')
self.options.typeName = typeName
return typeName
def getDescription(self):
return self.options.description
def getStartDate(self):
return self.options.startDate
def getEndDate(self):
return self.options.endDate
def runCommand(self):
self.parseArgs(usage="""
dm-add-experiment
--experiment=EXPERIMENTNAME --station=STATIONNAME --type=TYPENAME|--type-id=TYPEID
[--description=DESCRIPTION]
[--start-date=STARTDATE]
[--end-date=ENDDATE]
Description:
Add new experiment to the DM database.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
experiment = api.addExperiment(self.getExperimentName(), self.getStationName(), self.getTypeName(), self.getDescription(), self.getStartDate(), self.getEndDate())
print experiment.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = AddExperimentCli()
cli.run()