Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1122 additions and 0 deletions
#!/usr/bin/env python
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StopDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-stop-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
Description:
Stop DAQ for a given experiment and data directory.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.stopDaq(self.getExperimentName(), self.getDataDirectory())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopDaqCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class UploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
return self.options.dataDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[key1:value1, key2:value2, ...]
Description:
Schedules data upload for a given experiment. All existing files in the
specified directory will be uploaded to storage. Relative directory
structure will be preserved. All provided key/value pairs will be passed
to file processing plugins.
""")
self.checkArgs()
api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
uploadInfo = api.upload(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = UploadCli()
cli.run()
#!/usr/bin/env python
#
# DM DAQ Web Service
#
from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.daq_web_service.service.impl.daqProcessingCompleteNotificationPlugin import DaqProcessingCompleteNotificationPlugin
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
class DaqWebService(DmRestWebServiceBase):
def __init__(self):
DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper)
def initDmModules(self):
self.logger.debug('Initializing dm modules')
# Add modules that will be started.
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
# Requred processing plugin
notificationPlugin = DaqProcessingCompleteNotificationPlugin()
FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin)
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
return ConfigurationManager.getInstance().getServiceHost()
def getDefaultServerPort(self):
return ConfigurationManager.getInstance().getServicePort()
####################################################################
# Run service
if __name__ == '__main__':
ConfigurationManager.getInstance().setServiceName('daq-web-service')
service = DaqWebService();
service.run()
#!/usr/bin/env python
#
# Route mapper for DM DAQ web service.
#
import sys
import os
import cherrypy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor
from experimentRouteDescriptor import ExperimentRouteDescriptor
class DaqWebServiceRouteMapper:
@classmethod
def setupRoutes(cls):
""" Setup RESTFul routes. """
logger = LoggingManager.getInstance().getLogger(cls.__name__)
contextRoot = ConfigurationManager.getInstance().getContextRoot()
logger.debug('Using context root: %s' % contextRoot)
# Get routes.
routes = LoginRouteDescriptor.getRoutes()
routes += ExperimentRouteDescriptor.getRoutes()
# Add routes to dispatcher.
d = cherrypy.dispatch.RoutesDispatcher()
for route in routes:
logger.debug('Connecting route: %s' % route)
d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method']))
return d
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from experimentSessionController import ExperimentSessionController
class ExperimentRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
experimentSessionController = ExperimentSessionController()
# Define routes.
routes = [
# Start experiment daq
{
'name' : 'startDaq',
'path' : '%s/experimentsByName/:(experimentName)/startDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'startDaq',
'method' : ['POST']
},
# Stop experiment daq
{
'name' : 'stopDaq',
'path' : '%s/experimentsByName/:(experimentName)/stopDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'stopDaq',
'method' : ['POST']
},
# Get daq info
{
'name' : 'getDaqInfo',
'path' : '%s/experimentDaqs/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getDaqInfo',
'method' : ['GET']
},
# Upload experiment data
{
'name' : 'upload',
'path' : '%s/experimentsByName/:(experimentName)/upload/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'upload',
'method' : ['POST']
},
# Get upload info
{
'name' : 'getUploadInfo',
'path' : '%s/experimentUploads/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getUploadInfo',
'method' : ['GET']
},
]
return routes
#!/usr/bin/env python
import cherrypy
import json
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.utility.encoder import Encoder
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
class ExperimentSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.experimentSessionControllerImpl = ExperimentSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def startDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo is not None:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
response = self.experimentSessionControllerImpl.startDaq(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def stopDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.stopDaq(experimentName, dataDirectory).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getDaqInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getDaqInfo(id).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def upload(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
response = self.experimentSessionControllerImpl.upload(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.isAdministrator())
@DmSessionController.execute
def getUploadInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getUploadInfo(id).getFullJsonRep()
self.logger.debug('Returning: %s' % response)
return response
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
class DaqProcessingCompleteNotificationPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
uploadId = fileInfo.get('uploadId')
daqId = fileInfo.get('daqInfo', {}).get('id')
trackedInfo = None
if uploadId != None:
self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId))
trackedInfo = UploadTracker.getInstance().get(uploadId)
if daqId != None:
self.logger.debug('Daq id for file %s: %s' %(filePath, daqId))
trackedInfo = DaqTracker.getInstance().get(daqId)
if trackedInfo != None:
fileDict = trackedInfo.get('fileDict', {})
trackedFileInfo = fileDict.get(filePath)
if trackedFileInfo:
trackedFileInfo['processed'] = True
else:
self.logger.error('%s object does not have file path %s' %(trackedInfo, filePath))
trackedInfo.updateStatus()
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import uuid
import time
from dm.common.objects.daqInfo import DaqInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.utility.timeUtility import TimeUtility
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound
class DaqTracker(ObjectTracker):
# Cache configuration
objectClass = DaqInfo
cacheSize = 100
def __init__(self, *args, **kwargs):
ObjectTracker.__init__(self, args, kwargs)
self.activeDaqDict = {}
def startDaq(self, experiment, dataDirectory, daqInfo={}):
# Prevent second daq to be started in the same directory
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
oldDaqInfo = self.activeDaqDict.get(activeDaqKey)
if oldDaqInfo:
raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
daqId = str(uuid.uuid4())
daqInfo['id'] = daqId
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['dataDirectory'] = dataDirectory
daqInfo2 = DaqInfo(daqInfo)
startTime = time.time()
daqInfo2['startTime'] = startTime
daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimeStamp(startTime)
self.activeDaqDict[activeDaqKey] = daqInfo2
self.put(daqId, daqInfo2)
return daqInfo2
def stopDaq(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
daqInfo = self.activeDaqDict.get(activeDaqKey)
if not daqInfo:
raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
endTime = time.time()
daqInfo['endTime'] = endTime
daqInfo['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime)
daqInfo.updateStatus()
del self.activeDaqDict[activeDaqKey]
return daqInfo
def getDaqInfo(self, id):
return self.get(id)
def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
return self.activeDaqDict.get(activeDaqKey)
####################################################################
# Testing
if __name__ == '__main__':
tracker = DaqTracker.getInstance()
print tracker
experiment = {'name' : 'e1', 'owner' : 'sv'}
dataDirectory = 'ftp://wolf:2811/data/e1'
daqInfo = tracker.startDaq(experiment, dataDirectory)
daqId = daqInfo['id']
print 'DAQ ID: ', daqId
print 'DAQ INFO: ', tracker.getDaqInfo(daqId)
print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory)
dataDirectory = 'ftp:///wolf:2811///data/e1'
daqId = tracker.startDaq(experiment, dataDirectory)
print 'DAQ ID: ', daqId
#!/usr/bin/env python
import os
import glob
from watchdog.events import FileSystemEventHandler
from dm.common.utility.loggingManager import LoggingManager
class DmFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, fileSystemObserver, dataDirectory, experiment):
FileSystemEventHandler.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = fileSystemObserver
self.dataDirectory = dataDirectory
self.experiment = experiment
def dispatch(self, event):
FileSystemEventHandler.dispatch(self, event)
def on_any_event(self, event):
FileSystemEventHandler.on_any_event(self, event)
self.logger.debug('File system any_event event: %s' % (event.__dict__))
def on_created(self, event):
FileSystemEventHandler.on_created(self, event)
self.logger.debug('File system created event: %s' % (event.__dict__))
self.processEvent(event)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
self.logger.debug('File system moved event: %s' % (event.__dict__))
def on_deleted(self, event):
FileSystemEventHandler.on_deleted(self, event)
self.logger.debug('File system deleted event: %s' % (event.__dict__))
def on_modified(self, event):
FileSystemEventHandler.on_modified(self, event)
self.logger.debug('File system directory modified event: %s' % (event.__dict__))
self.processEvent(event)
def processEvent(self, event):
if event.is_directory:
self.logger.debug('Processing directory event: %s , src path: %s , latest files: %s' % (event.__dict__, event.src_path, files))
try:
files = glob.glob(os.path.join(event.src_path,'*.*'))
if len(files) > 0:
filePath = max(files, key=os.path.getctime)
self.logger.debug('Latest file: %s' % (filePath))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
except Exception, ex:
self.logger.error('Exception occured when searching for file in directory %s: %s' % (event.__dict__, ex))
else:
filePath = event.src_path
self.logger.debug('Processing file event: %s' % (event.__dict__))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
####################################################################
# Testing
if __name__ == '__main__':
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.observers.api import ObservedWatch
from watchdog.observers.api import EventQueue
from watchdog.observers.api import EventEmitter
from watchdog.events import LoggingEventHandler
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
eventHandler = DmFileSystemEventHandler()
observer = PollingObserver()
observedWatch = observer.schedule(eventHandler, path, recursive=True)
print 'OBSERVED WATCH: ', observedWatch
#observer.add_handler_for_watch(eventHandler2, observedWatch)
#observer._clear_emitters()
print observer.emitters
observer.start()
try:
while True:
time.sleep(1)
print time.time()
except KeyboardInterrupt:
observer.stop()
observer.join()
#!/usr/bin/env python
import os
import copy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class DsProcessFileNotificationPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
daqInfo = copy.deepcopy(fileInfo.get('daqInfo', {}))
if daqInfo.has_key('fileDict'):
del daqInfo['fileDict']
# Prepare dictionary for processing. Only send needed data.
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['daqInfo'] = daqInfo
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
#
# Implementation for experiment session controller.
#
import os
import time
import uuid
import copy
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.objectNotFound import ObjectNotFound
from dm.common.objects.observedFile import ObservedFile
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.dictUtility import DictUtility
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
from experimentTracker import ExperimentTracker
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
def startDaq(self, experimentName, dataDirectory, daqInfo):
FileSystemObserver.getInstance().createDirectory(dataDirectory)
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
daqInfo = DaqTracker.getInstance().startDaq(experiment, dataDirectory, daqInfo)
FileSystemObserver.getInstance().startObservingPath(dataDirectory, experiment)
return daqInfo
def stopDaq(self, experimentName, dataDirectory):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
daqInfo = DaqTracker.getInstance().stopDaq(experiment, dataDirectory)
FileSystemObserver.getInstance().stopObservingPath(dataDirectory, experiment)
return daqInfo.scrub()
def getDaqInfo(self, id):
daqInfo = DaqTracker.getInstance().getDaqInfo(id)
if not daqInfo:
raise ObjectNotFound('Daq id %s not found.' % id)
daqInfo.updateStatus()
return daqInfo.scrub()
def upload(self, experimentName, dataDirectory, daqInfo):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
raise InvalidRequest('Experiment %s has not been started.' % experimentName)
filePaths = FileSystemObserver.getInstance().getFiles(dataDirectory)
fileProcessingManager = FileProcessingManager.getInstance()
uploadId = str(uuid.uuid4())
self.logger.debug('Starting upload id %s' % uploadId)
uploadInfo = UploadInfo(daqInfo)
uploadInfo['id'] = uploadId
uploadInfo['experimentName'] = experimentName
uploadInfo['storageDirectory'] = experiment.get('storageDirectory')
uploadInfo['storageHost'] = experiment.get('storageHost')
uploadInfo['dataDirectory'] = dataDirectory
startTime = time.time()
uploadInfo['startTime'] = startTime
uploadInfo['startTimestamp '] = TimeUtility.formatLocalTimeStamp(startTime)
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['dataDirectory'] = dataDirectory
daqInfo['uploadId'] = uploadId
fileDict = {}
for filePath in filePaths:
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
FileUtility.statFile(filePath, fileUploadInfo)
fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo.scrub()
def getUploadInfo(self, id):
uploadInfo = UploadTracker.getInstance().get(id)
if not uploadInfo:
raise ObjectNotFound('Upload id %s not found.' % id)
uploadInfo.updateStatus()
return uploadInfo.scrub()
#!/usr/bin/env python
from dm.common.objects.experiment import Experiment
from dm.common.utility.objectTracker import ObjectTracker
class ExperimentTracker(ObjectTracker):
# Cache configuration
objectClass = Experiment
####################################################################
# Testing
if __name__ == '__main__':
et = ExperimentTracker.getInstance()
print et
et2 = ExperimentTracker.getInstance()
print et2
#!/usr/bin/env python
import threading
import time
from watchdog.observers.polling import PollingObserver
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.utility.threadingUtility import ThreadingUtility
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dmFileSystemEventHandler import DmFileSystemEventHandler
from daqTracker import DaqTracker
class FileSystemObserver(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileSystemObserver'
MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY = 'minfileprocessingdelayinseconds'
FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY = 'filesystemeventtimeoutinseconds'
FILE_SYSTEM_OBSERVER_AGENT_KEY = 'filesystemobserveragent'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileSystemObserver.__instanceLock.acquire()
try:
if FileSystemObserver.__instance:
return
FileSystemObserver.__instance = self
threading.Thread.__init__(self)
self.setName('FileSystemObserverThread')
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.eventFlag = threading.Event()
self.exitFlag = False
self.observedFileMap = {}
self.__configure()
self.fileProcessingManager = FileProcessingManager.getInstance()
self.logger.debug('Initialization complete')
finally:
FileSystemObserver.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileSystemObserver.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.minFileProcessingDelayInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.MIN_FILE_PROCESSING_DELAY_IN_SECONDS_KEY))
self.logger.debug('Minimum file processing delay: %s seconds' % self.minFileProcessingDelayInSeconds)
self.fileSystemEventTimeoutInSeconds = int(cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_EVENT_TIMEOUT_IN_SECONDS_KEY))
self.logger.debug('File system event timeout: %s seconds' % self.fileSystemEventTimeoutInSeconds)
agentClass = cm.getConfigOption(FileSystemObserver.CONFIG_SECTION_NAME, FileSystemObserver.FILE_SYSTEM_OBSERVER_AGENT_KEY)
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(agentClass)
self.logger.debug('Creating file system observer agent instance of class %s' % className)
self.fileSystemObserverAgent = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.fileSystemObserverAgent.setFileSystemObserver(self)
@ThreadingUtility.synchronize
def createDirectory(self, dataDirectory):
self.fileSystemObserverAgent.createDirectory(dataDirectory)
@ThreadingUtility.synchronize
def getFiles(self, dataDirectory):
self.logger.debug('Agent is retrieving files for %s' % dataDirectory)
return self.fileSystemObserverAgent.getFiles(dataDirectory)
@ThreadingUtility.synchronize
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is starting observer for %s' % dataDirectory)
self.fileSystemObserverAgent.startObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def stopObservingPath(self, dataDirectory, experiment):
self.logger.debug('Agent is stopping observer for %s' % dataDirectory)
self.fileSystemObserverAgent.stopObservingPath(dataDirectory, experiment)
@ThreadingUtility.synchronize
def fileUpdated(self, filePath, dataDirectory, experiment):
daqInfo = DaqTracker.getInstance().getDaqInfoByExperimentAndDataDirectory(experiment, dataDirectory)
observedFile = self.observedFileMap.get(filePath, ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment))
observedFile.setLastUpdateTimeToNow()
if daqInfo:
daqFileDict = daqInfo['fileDict']
daqFileDict[filePath] = observedFile
observedFile['daqInfo'] = daqInfo.toDictWithOriginalKeys()
self.observedFileMap[filePath] = observedFile
self.logger.debug('Observed file updated: %s', observedFile)
@ThreadingUtility.synchronize
def checkObservedFilesForProcessing(self):
now = time.time()
filePathsForProcessing = []
for (filePath,observedFile) in self.observedFileMap.items():
timestamp = observedFile.get('lastUpdateTime')
deltaT = now - timestamp
if deltaT > self.minFileProcessingDelayInSeconds:
self.logger.debug('File %s was last modified %s seconds ago, will process it.' % (filePath, deltaT))
filePathsForProcessing.append(filePath)
return filePathsForProcessing
@ThreadingUtility.synchronize
def processFile(self, filePath):
self.logger.debug('Processing file %s' % filePath)
observedFile = self.observedFileMap.get(filePath)
if observedFile is not None:
del self.observedFileMap[filePath]
self.fileProcessingManager.processFile(observedFile)
@ThreadingUtility.synchronize
def start(self):
self.logger.debug('Starting file observer thread')
threading.Thread.start(self)
self.logger.debug('Starting file observer agent')
self.fileSystemObserverAgent.start()
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
if self.exitFlag:
self.logger.debug('Exit flag set, %s done' % self.getName())
break
try:
filePathsForProcessing = self.checkObservedFilesForProcessing()
if len(filePathsForProcessing):
self.logger.debug('Checking observed files')
for filePath in filePathsForProcessing:
self.processFile(filePath)
except Exception, ex:
self.logger.exception(ex)
self.eventFlag.wait(timeout=self.fileSystemEventTimeoutInSeconds)
@ThreadingUtility.synchronize
def stop(self):
self.logger.debug('Stopping file observer agent')
self.fileSystemObserverAgent.stop()
self.logger.debug('Stopping file observer thread')
self.exitFlag = True
self.eventFlag.set()
self.logger.debug('Event is set, joining thread')
threading.Thread.join(self)
self.logger.debug('Module stopped')
@ThreadingUtility.synchronize
def setEvent(self):
self.eventFlag.set()
@ThreadingUtility.synchronize
def clearEvent(self):
self.eventFlag.clear()
####################################################################
# Testing
if __name__ == '__main__':
fp = FileSystemObserver.getInstance()
fp.start()
time.sleep(30)
fp.stop()
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class FileSystemObserverAgent:
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = None
def setFileSystemObserver(self, fileSystemObserver):
self.fileSystemObserver = fileSystemObserver
def createDirectory(self, dataDirectory):
pass
def getFiles(self, dataDirectory):
pass
def startObservingPath(self, dataDirectory, experiment):
pass
def stopObservingPath(self, dataDirectory, experiment):
pass
def start(self):
pass
def stop(self):
pass
#!/usr/bin/env python
from threading import Timer
from fileSystemObserverAgent import FileSystemObserverAgent
from dm.common.utility.ftpUtility import FtpUtility
class FtpFileSystemObserverAgent(FileSystemObserverAgent):
DEFAULT_POLLING_PERIOD_IN_SECONDS = 5
def __init__(self, host, port, username=None, password=None, pollingPeriod=DEFAULT_POLLING_PERIOD_IN_SECONDS):
FileSystemObserverAgent.__init__(self)
self.host = host
self.port = port
self.username = username
self.password = password
self.pollingPeriod = pollingPeriod
self.observedDirDict = {}
self.isDone = False
def getFiles(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultHost=self.host, defaultPort=self.port)
self.logger.debug('Retrieving files from FTP host: %s, port: %s, directory path: %s' % (host, port, dirPath))
ftpUtility = FtpUtility(host, port, self.username, self.password)
return ftpUtility.getFiles(dirPath, {})
def updateFile(self, filePath, dataDirectory, experiment):
if self.fileSystemObserver:
self.logger.debug('Processing file path: %s' % filePath)
self.fileSystemObserver.fileUpdated(filePath, dataDirectory, experiment)
def processFiles(self, fileDict, oldFileDict, dataDirectory, experiment):
for filePath in fileDict.keys():
if not oldFileDict.has_key(filePath):
# new file, must be updated
self.logger.debug('New file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
else:
# old file, check timestamp
oldFileInfo = oldFileDict.get(filePath)
oldModifyTime = oldFileInfo.get('Modify', '')
fileInfo = fileDict.get(filePath)
modifyTime = fileInfo.get('Modify')
if modifyTime != oldModifyTime:
# file has been modified, need to process it
self.logger.debug('Modified file path detected: %s' % filePath)
self.updateFile(filePath, dataDirectory, experiment)
def pollFileSystem(self, dataDirectory, experiment):
try:
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Polling cancelled for directory: %s' % dataDirectory)
return
oldFileDict = observedDirInfo.get('files')
observedDirInfo['files'] = fileDict
self.processFiles(fileDict, oldFileDict, dataDirectory, experiment)
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
except Exception, ex:
self.logger.error('Could not poll directory %s: %s' % (dataDirectory,ex))
def startPollingTimer(self, observedDirInfo, dataDirectory, experiment):
if self.isDone:
return
t = Timer(self.pollingPeriod, self.pollFileSystem, [dataDirectory, experiment])
observedDirInfo['pollTimer'] = t
t.start()
def startObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if observedDirInfo:
self.logger.debug('Observer for %s is already active' % dataDirectory)
return
self.logger.debug('Starting observer for %s' % dataDirectory)
fileDict = self.getFiles(dataDirectory)
observedDirInfo = self.observedDirDict.get(dataDirectory, {})
observedDirInfo['files'] = fileDict
observedDirInfo['experiment'] = experiment
self.observedDirDict[dataDirectory] = observedDirInfo
self.startPollingTimer(observedDirInfo, dataDirectory, experiment)
def stopObservingPath(self, dataDirectory, experiment):
observedDirInfo = self.observedDirDict.get(dataDirectory)
if not observedDirInfo:
self.logger.debug('Observer for %s is not active' % dataDirectory)
return
self.logger.debug('Stopping observer for %s' % dataDirectory)
t = observedDirInfo.get('pollTimer')
if t:
t.cancel()
del self.observedDirDict[dataDirectory]
def start(self):
self.logger.debug('Starting ftp observer agent')
def stop(self):
self.logger.debug('Stopping ftp observer agent')
self.isDone = True
for (dataDirectory,observedDirInfo) in self.observedDirDict.items():
experiment = observedDirInfo.get('experiment')
self.stopObservingPath(dataDirectory, experiment)
####################################################################
# Testing
if __name__ == '__main__':
import time
agent = FtpFileSystemObserverAgent('zagreb', 2811)
print 'ORIGINAL FILES: ', agent.getFiles('/tmp/test')
agent.startObservingPath('/tmp/test', 'e1')
time.sleep(100)
agent.stopObservingPath('/tmp/test', 'e1')
#!/usr/bin/env python
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.utility.objectTracker import ObjectTracker
class UploadTracker(ObjectTracker):
# Cache configuration
objectClass = UploadInfo
cacheSize = 100
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
#
# Implementation for user info controller.
#
#######################################################################
from dm.common.objects.dmObject import DmObject
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.db.api.userDbApi import UserDbApi
#######################################################################
class UserInfoControllerImpl(DmObjectManager):
""" User info controller implementation class. """
def __init__(self):
DmObjectManager.__init__(self)
self.userDbApi = UserDbApi()
def getUsers(self):
return self.userDbApi.getUsers()
def getUserById(self, id):
return self.userDbApi.getUserById(id)
def getUserByUsername(self, username):
return self.userDbApi.getUserByUsername(username)
#!/usr/bin/env python
from dm.common.utility.osUtility import OsUtility
from watchdog.observers.polling import PollingObserver
from fileSystemObserverAgent import FileSystemObserverAgent
from dmFileSystemEventHandler import DmFileSystemEventHandler
class WatchdogFileSystemObserverAgent(FileSystemObserverAgent):
DAQ_PERMISSIONS_MODE = 0777
def __init__(self):
FileSystemObserverAgent.__init__(self)
self.observer = PollingObserver()
self.observedWatchDict = {}
def createDirectory(self, dataDirectory):
try:
OsUtility.createDir(dataDirectory, mode=self.DAQ_PERMISSIONS_MODE)
except Exception, ex:
self.logger.warn('Unable to create directory %s: %s' % (dataDirectory, ex))
def getFiles(self, dataDirectory):
return OsUtility.findFiles(dataDirectory)
def startObservingPath(self, dataDirectory, experiment):
self.logger.debug('Starting observer for %s' % dataDirectory)
eventHandler = DmFileSystemEventHandler(self.fileSystemObserver, dataDirectory, experiment)
observedWatch = self.observer.schedule(eventHandler, dataDirectory, recursive=True)
self.observedWatchDict[dataDirectory] = observedWatch
def stopObservingPath(self, dataDirectory, experiment):
observedWatch = self.observedWatchDict.get(dataDirectory)
if observedWatch:
self.logger.debug('Stopping observer for %s' % dataDirectory)
self.observer.unschedule(observedWatch)
del self.observedWatchDict[dataDirectory]
else:
self.logger.debug('Observer for %s is not active' % dataDirectory)
def start(self):
self.logger.debug('Starting watchdog observer agent')
self.observer.start()
def stop(self):
self.logger.debug('Stopping watchdog observer agent')
self.observer.stop()
self.observer.join()