Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1054 additions and 0 deletions
#!/usr/bin/env python
import os
from dm.common.cli.dmRestSessionCli import DmRestSessionCli
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.configurationManager import ConfigurationManager
class DaqWebServiceSessionCli(DmRestSessionCli):
""" DM DAQ web service session cli class. """
DM_FILE_SERVER_URL_ENV_VAR = 'DM_FILE_SERVER_URL'
DEFAULT_SESSION_CACHE_FILE = OsUtility.getUserHomeDir() + '/.dm/.daq.session.cache'
def __init__(self, validArgCount=0):
DmRestSessionCli.__init__(self, validArgCount)
ConfigurationManager.getInstance().setSessionCacheFile(DaqWebServiceSessionCli.DEFAULT_SESSION_CACHE_FILE)
def getDefaultServiceHost(self):
return ConfigurationManager.getInstance().getDaqWebServiceHost()
def getDefaultServicePort(self):
return ConfigurationManager.getInstance().getDaqWebServicePort()
def getId(self):
return self.options.id
def getExperimentName(self):
return self.options.experimentName
def getDataDirectory(self):
dataDirectory = self.options.dataDirectory
# Make sure data directory already does not have url scheme
if dataDirectory and dataDirectory.find('://') < 0:
fileServerUrl = os.environ.get(self.DM_FILE_SERVER_URL_ENV_VAR, '')
dataDirectory = '%s%s' % (fileServerUrl, dataDirectory)
return dataDirectory
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetDaqInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Daq id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Daq id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-daq-info --id=ID
Description:
Retrieves detailed information for the specified data acquisition.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.getDaqInfo(self.getId())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetDaqInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class GetUploadInfoCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-get-upload-info --id=ID
Description:
Retrieves detailed information for the specified data upload id.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.getUploadInfo(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = GetUploadInfoCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListDaqsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-daqs [--status=STATUS]
Description:
Retrieves all known daqs.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqs = api.listDaqs(self.getStatus())
for daq in daqs:
print daq.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListDaqsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class ListProcessingPluginsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
def runCommand(self):
self.parseArgs(usage="""
dm-list-processing-plugins
Description:
Retrieves list of known processing plugins.
""")
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
plugins = api.listProcessingPlugins()
for plugin in plugins:
print plugin.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListProcessingPluginsCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.common.constants import dmProcessingStatus
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class ListUploadsCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--status', dest='status', default=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, help='Processing status, must be one of %s (default: %s).' % (dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST,dmProcessingStatus.DM_PROCESSING_STATUS_ANY))
def checkArgs(self):
if self.options.status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Processing status must be one of %s.' % dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST)
def getStatus(self):
return self.options.status
def runCommand(self):
self.parseArgs(usage="""
dm-list-uploads [--status=STATUS]
Description:
Retrieves all known uploads.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploads = api.listUploads(self.getStatus())
for upload in uploads:
print upload.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = ListUploadsCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StartDaqCli(DaqWebServiceSessionCli):
HOURS_PER_DAY = 24
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.')
self.addOption('', '--run-time', dest='runTime', help='Maximum run time; it must be specified in hours (h) or days (d). Examples: "8h", "14d".')
self.addOption('', '--upload-data-directory-on-exit', dest='uploadDataDirectoryOnExit', help='Data directory that will be uploaded automatically after DAQ is stopped.')
self.addOption('', '--upload-target-directory-on-exit', dest='uploadTargetDirectoryOnExit', help='Target directory relative to experiment root path for automatic upload after DAQ is stopped. Requires upload data directory to be specified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.runTime:
runTime = self.options.runTime
if runTime.endswith('h'):
daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])
elif runTime.endswith('d'):
daqInfo['maxRunTimeInHours'] = int(runTime[0:-1])*self.HOURS_PER_DAY
else:
raise InvalidRequest('Maximum run time must contain valid unit specifier: "h" for hours or "d" for days.')
if self.options.targetDirectory:
daqInfo['targetDirectory'] = self.options.targetDirectory
if self.options.uploadDataDirectoryOnExit:
daqInfo['uploadDataDirectoryOnExit'] = self.options.uploadDataDirectoryOnExit
if self.options.uploadTargetDirectoryOnExit:
if not self.options.uploadDataDirectoryOnExit:
raise InvalidRequest('Upload target directory on exit requires that upload data directory is specified as well.')
daqInfo['uploadTargetDirectoryOnExit'] = self.options.uploadTargetDirectoryOnExit
def runCommand(self):
self.parseArgs(usage="""
dm-start-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--run-time=RUNTIME]
[--target-directory=TARGETDIRECTORY]
[--upload-data-directory-on-exit=UPLOADDATADIRECTORYONEXIT]
[--upload-target-directory-on-exit=UPLOADTARGETDIRECTORYONEXIT]
[--process-hidden]
[key1:value1, key2:value2, ...]
Description:
Starts DAQ for a given experiment. Specified data directory will be
monitored for data files. All provided key/value pairs will be passed to
file processing plugins.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
daqInfo = api.startDaq(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StartDaqCli()
cli.run()
#!/usr/bin/env python
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from daqWebServiceSessionCli import DaqWebServiceSessionCli
class StopDaqCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
def runCommand(self):
self.parseArgs(usage="""
dm-stop-daq --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
Description:
Stop DAQ for a given experiment and data directory.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = api.stopDaq(self.getExperimentName(), self.getDataDirectory())
print daqInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopDaqCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
class StopUploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--id', dest='id', help='Upload id.')
def checkArgs(self):
if self.options.id is None:
raise InvalidRequest('Upload id must be provided.')
def getId(self):
return self.options.id
def runCommand(self):
self.parseArgs(usage="""
dm-stop-upload --id=ID
Description:
Aborts specified data upload.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
uploadInfo = api.stopUpload(self.getId())
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = StopUploadCli()
cli.run()
#!/usr/bin/env python
from daqWebServiceSessionCli import DaqWebServiceSessionCli
from dm.daq_web_service.api.experimentDaqApi import ExperimentDaqApi
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.constants import dmProcessingMode
class UploadCli(DaqWebServiceSessionCli):
def __init__(self):
DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
self.addOption('', '--experiment', dest='experimentName', help='Experiment name.')
self.addOption('', '--data-directory', dest='dataDirectory', help='Experiment data directory. If specified string does not already contain file server URL, value of the %s environment variable will be prepended to it.' % self.DM_FILE_SERVER_URL_ENV_VAR)
self.addOption('', '--target-directory', dest='targetDirectory', help='Target directory relative to experiment root path.')
self.addOption('', '--reprocess', dest='reprocess', action='store_true', default=False, help='Reprocess source files that are already in storage, even if they have not been modified.')
self.addOption('', '--process-hidden', dest='processHidden', action='store_true', default=False, help='Process hidden source files.')
self.addOption('', '--processing-mode', dest='processingMode', default=dmProcessingMode.DM_PROCESSING_MODE_FILES, help='Processing mode can be one of %s (default: %s). In the "%s" mode files are processed individually, while in the "%s" mode processing plugins work on directories (if possible).' % (dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_FILES, dmProcessingMode.DM_PROCESSING_MODE_DIRECTORY))
self.addOption('', '--skip-plugins', dest='skipPlugins', help='Comma-separated list of plugins which should not process the given directory.')
def checkArgs(self):
if self.options.experimentName is None:
raise InvalidRequest('Experiment name must be provided.')
if self.options.dataDirectory is None:
raise InvalidRequest('Experiment data directory must be provided.')
if self.options.processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Processing mode must be one of %s.' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
def updateDaqInfoFromOptions(self, daqInfo):
if self.options.reprocess:
daqInfo['reprocessFiles'] = True
if self.options.processHidden:
daqInfo['processHiddenFiles'] = True
if self.options.skipPlugins:
daqInfo['skipPlugins'] = self.options.skipPlugins
daqInfo['processingMode'] = self.options.processingMode
if self.options.targetDirectory:
daqInfo['targetDirectory'] = self.options.targetDirectory
def runCommand(self):
self.parseArgs(usage="""
dm-upload --experiment=EXPERIMENTNAME --data-directory=DATADIRECTORY
[--target-directory=TARGETDIRECTORY]
[--reprocess]
[--process-hidden]
[--processing-mode=PROCESSINGMODE]
[--skip-plugins=SKIPPLUGINS]
[key1:value1, key2:value2, ...]
Description:
Schedules data upload for a given experiment. All existing files in the
specified directory will be uploaded to storage. Relative directory
structure will be preserved. All provided key/value pairs will be passed
to file processing plugins.
""")
self.checkArgs()
api = ExperimentDaqApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
daqInfo = self.splitArgsIntoDict()
self.updateDaqInfoFromOptions(daqInfo)
uploadInfo = api.upload(self.getExperimentName(), self.getDataDirectory(), daqInfo=daqInfo)
print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
#######################################################################
# Run command.
if __name__ == '__main__':
cli = UploadCli()
cli.run()
#!/usr/bin/env python
#
# DM DAQ Web Service
#
from dm.common.service.dmRestWebServiceBase import DmRestWebServiceBase
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.processing.fileProcessingManager import FileProcessingManager
from dm.daq_web_service.service.impl.daqProcessingCompleteNotificationPlugin import DaqProcessingCompleteNotificationPlugin
from dm.daq_web_service.service.impl.fileSystemObserver import FileSystemObserver
from daqWebServiceRouteMapper import DaqWebServiceRouteMapper
class DaqWebService(DmRestWebServiceBase):
def __init__(self):
DmRestWebServiceBase.__init__(self, DaqWebServiceRouteMapper)
def initDmModules(self):
self.logger.debug('Initializing dm modules')
# Add modules that will be started.
moduleManager = DmModuleManager.getInstance()
moduleManager.addModule(FileSystemObserver.getInstance())
moduleManager.addModule(FileProcessingManager.getInstance())
# Requred processing plugin
#notificationPlugin = DaqProcessingCompleteNotificationPlugin()
#FileProcessingManager.getInstance().appendFileProcessor(notificationPlugin)
self.logger.debug('Initialized dm modules')
def getDefaultServerHost(self):
return ConfigurationManager.getInstance().getServiceHost()
def getDefaultServerPort(self):
return ConfigurationManager.getInstance().getServicePort()
####################################################################
# Run service
if __name__ == '__main__':
ConfigurationManager.getInstance().setServiceName('daq-web-service')
service = DaqWebService();
service.run()
#!/usr/bin/env python
#
# Route mapper for DM DAQ web service.
#
import sys
import os
import cherrypy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.service.loginRouteDescriptor import LoginRouteDescriptor
from experimentRouteDescriptor import ExperimentRouteDescriptor
class DaqWebServiceRouteMapper:
@classmethod
def setupRoutes(cls):
""" Setup RESTFul routes. """
logger = LoggingManager.getInstance().getLogger(cls.__name__)
contextRoot = ConfigurationManager.getInstance().getContextRoot()
logger.debug('Using context root: %s' % contextRoot)
# Get routes.
routes = LoginRouteDescriptor.getRoutes()
routes += ExperimentRouteDescriptor.getRoutes()
# Add routes to dispatcher.
d = cherrypy.dispatch.RoutesDispatcher()
for route in routes:
logger.debug('Connecting route: %s' % route)
d.connect(route['name'], route['path'], action=route['action'], controller=route['controller'], conditions=dict(method=route['method']))
return d
#!/usr/bin/env python
#
# User route descriptor.
#
from dm.common.utility.configurationManager import ConfigurationManager
from experimentSessionController import ExperimentSessionController
class ExperimentRouteDescriptor:
@classmethod
def getRoutes(cls):
contextRoot = ConfigurationManager.getInstance().getContextRoot()
# Static instances shared between different routes
experimentSessionController = ExperimentSessionController()
# Define routes.
routes = [
# Start experiment daq
{
'name' : 'startDaq',
'path' : '%s/experimentsByName/:(experimentName)/startDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'startDaq',
'method' : ['POST']
},
# Stop experiment daq
{
'name' : 'stopDaq',
'path' : '%s/experimentsByName/:(experimentName)/stopDaq/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'stopDaq',
'method' : ['POST']
},
# Get daq info
{
'name' : 'getDaqInfo',
'path' : '%s/experimentDaqs/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getDaqInfo',
'method' : ['GET']
},
# List DAQs
{
'name' : 'listDaqs',
'path' : '%s/experimentDaqsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listDaqs',
'method' : ['GET']
},
# Upload experiment data
{
'name' : 'upload',
'path' : '%s/experimentsByName/:(experimentName)/upload/:(dataDirectory)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'upload',
'method' : ['POST']
},
# Get upload info
{
'name' : 'getUploadInfo',
'path' : '%s/experimentUploads/:(id)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getUploadInfo',
'method' : ['GET']
},
# List uploads
{
'name' : 'listUploads',
'path' : '%s/experimentUploadsByStatus/:(status)' % contextRoot,
'controller' : experimentSessionController,
'action' : 'listUploads',
'method' : ['GET']
},
# Stop upload
{
'name' : 'stopUpload',
'controller' : experimentSessionController,
'path' : '%s/experimentUploads/stopUpload/:(id)' % contextRoot,
'action' : 'stopUpload',
'method' : ['POST']
},
# Get processing plugins
{
'name' : 'getProcessingPlugins',
'path' : '%s/processingPlugins' % contextRoot,
'controller' : experimentSessionController,
'action' : 'getProcessingPlugins',
'method' : ['GET']
},
]
return routes
#!/usr/bin/env python
import cherrypy
import json
import os
from dm.common.constants import dmProcessingMode
from dm.common.constants import dmProcessingStatus
from dm.common.service.dmSessionController import DmSessionController
from dm.common.exceptions.invalidRequest import InvalidRequest
from dm.common.exceptions.authorizationError import AuthorizationError
from dm.common.utility.encoder import Encoder
from dm.common.utility.dictUtility import DictUtility
from dm.daq_web_service.service.impl.experimentSessionControllerImpl import ExperimentSessionControllerImpl
class ExperimentSessionController(DmSessionController):
def __init__(self):
DmSessionController.__init__(self)
self.experimentSessionControllerImpl = ExperimentSessionControllerImpl()
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def startDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo is not None:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
response = self.experimentSessionControllerImpl.startDaq(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Started DAQ: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def stopDaq(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
response = self.experimentSessionControllerImpl.stopDaq(experimentName, dataDirectory).getFullJsonRep()
self.logger.debug('Stopped DAQ: %s' % response)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getDaqInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getDaqInfo(id).getFullJsonRep()
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def listDaqs(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listDaqs(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def upload(self, experimentName, dataDirectory, **kwargs):
if not experimentName:
raise InvalidRequest('Missing experiment name.')
experimentName = Encoder.decode(experimentName)
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path: %s' % dataDirectory)
daqInfo = {}
encodedDaqInfo = kwargs.get('daqInfo')
if encodedDaqInfo:
daqInfo = json.loads(Encoder.decode(encodedDaqInfo))
processingMode = DictUtility.getAndRemoveKey(daqInfo, 'processingMode', dmProcessingMode.DM_PROCESSING_MODE_FILES)
if processingMode not in dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST:
raise InvalidRequest('Allowed processing modes: %s' % dmProcessingMode.DM_ALLOWED_PROCESSING_MODE_LIST)
if processingMode == dmProcessingMode.DM_PROCESSING_MODE_FILES:
response = self.experimentSessionControllerImpl.uploadFiles(experimentName, dataDirectory, daqInfo).getFullJsonRep()
else:
response = self.experimentSessionControllerImpl.uploadDirectory(experimentName, dataDirectory, daqInfo).getFullJsonRep()
self.logger.debug('Returning upload info for directory %s' % dataDirectory)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getUploadInfo(self, id, **kwargs):
response = self.experimentSessionControllerImpl.getUploadInfo(id).getFullJsonRep()
self.logger.debug('Returning info for upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def listUploads(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_ANY, **kwargs):
if not status:
status = dmProcessingStatus.DM_PROCESSING_STATUS_ANY
if status not in dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST:
raise InvalidRequest('Invalid processing status "%s". Status must be one of %s.' % (status,dmProcessingStatus.DM_ALLOWED_PROCESSING_STATUS_LIST))
return self.listToJson(self.experimentSessionControllerImpl.listUploads(status))
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def stopUpload(self, id, **kwargs):
response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep()
self.logger.debug('Stopped upload id %s' % id)
return response
@cherrypy.expose
@DmSessionController.require(DmSessionController.canManageStation())
@DmSessionController.execute
def getProcessingPlugins(self, **kwargs):
return self.listToJson(self.experimentSessionControllerImpl.getProcessingPlugins())
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from uploadTracker import UploadTracker
from daqTracker import DaqTracker
class DaqProcessingCompleteNotificationPlugin(FileProcessor):
def __init__(self):
FileProcessor.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
uploadId = fileInfo.get('uploadId')
daqId = fileInfo.get('daqInfo', {}).get('id')
trackedInfo = None
if uploadId != None:
self.logger.debug('Upload id for file %s: %s' %(filePath, uploadId))
trackedInfo = UploadTracker.getInstance().get(uploadId)
if daqId != None:
self.logger.debug('Daq id for file %s: %s' %(filePath, daqId))
trackedInfo = DaqTracker.getInstance().get(daqId)
if trackedInfo != None:
fileDict = trackedInfo.get('fileDict', {})
trackedFileInfo = fileDict.get(filePath)
if trackedFileInfo:
trackedFileInfo['processed'] = True
else:
self.logger.error('%s object does not have file path %s' %(trackedInfo, filePath))
trackedInfo.updateStatus()
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import uuid
import time
from dm.common.constants import dmProcessingStatus
from dm.common.objects.daqInfo import DaqInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.utility.timeUtility import TimeUtility
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
from dm.common.exceptions.objectNotFound import ObjectNotFound
class DaqTracker(ObjectTracker):
# Cache configuration
objectClass = DaqInfo
cacheSize = 100
def __init__(self, *args, **kwargs):
ObjectTracker.__init__(self, args, kwargs)
self.activeDaqDict = {}
def startDaq(self, experiment, dataDirectory, daqInfo={}):
# Prevent second daq to be started in the same directory
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
oldDaqInfo = self.activeDaqDict.get(activeDaqKey)
if oldDaqInfo:
raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
daqId = str(uuid.uuid4())
daqInfo['id'] = daqId
daqInfo['experimentName'] = experimentName
daqInfo['storageDirectory'] = experiment.get('storageDirectory')
daqInfo['storageHost'] = experiment.get('storageHost')
daqInfo['storageUrl'] = experiment.get('storageUrl')
daqInfo['dataDirectory'] = dataDirectory
# Create DaqInfo object with keys that we want to save with file
# metadata, and add other keys later
daqInfo2 = DaqInfo(daqInfo)
daqInfo2['nFiles'] = 0
daqInfo2['nProcessedFiles'] = 0
daqInfo2['nProcessingErrors'] = 0
daqInfo2['status'] = dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING
startTime = time.time()
daqInfo2['startTime'] = startTime
daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
self.activeDaqDict[activeDaqKey] = daqInfo2
self.put(daqId, daqInfo2)
return daqInfo2
def stopDaq(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
daqInfo = self.activeDaqDict.get(activeDaqKey)
if not daqInfo:
raise ObjectNotFound('DAQ is not active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
endTime = time.time()
daqInfo['endTime'] = endTime
daqInfo['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
daqInfo.updateStatus()
del self.activeDaqDict[activeDaqKey]
return daqInfo
def getDaqInfo(self, id):
return self.get(id)
def updateDaqInfos(self, status=dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING):
daqInfoList = self.getAll()
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
daqInfo.updateStatus()
def getDaqInfos(self, status=None):
daqInfoList = self.getAll()
if status is None or status == dmProcessingStatus.DM_PROCESSING_STATUS_ANY:
return daqInfoList
filteredDaqInfoList = []
for daqInfo in daqInfoList:
if daqInfo.get('status', '') == status:
filteredDaqInfoList.append(daqInfo)
return filteredDaqInfoList
def getDaqInfoByExperimentAndDataDirectory(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeDaqKey = experimentName + dataDir
return self.activeDaqDict.get(activeDaqKey)
####################################################################
# Testing
if __name__ == '__main__':
tracker = DaqTracker.getInstance()
print tracker
experiment = {'name' : 'e1', 'owner' : 'sv'}
dataDirectory = 'ftp://wolf:2811/data/e1'
daqInfo = tracker.startDaq(experiment, dataDirectory)
daqId = daqInfo['id']
print 'DAQ ID: ', daqId
print 'DAQ INFO: ', tracker.getDaqInfo(daqId)
print 'DAQS: ', tracker.getDaqInfos()
print 'REMOVED DAQ: ', tracker.stopDaq(experiment, dataDirectory)
dataDirectory = 'ftp:///wolf:2811///data/e1'
daqId = tracker.startDaq(experiment, dataDirectory)
print 'DAQ ID: ', daqId
#!/usr/bin/env python
import os
import glob
from watchdog.events import FileSystemEventHandler
from dm.common.utility.loggingManager import LoggingManager
class DmFileSystemEventHandler(FileSystemEventHandler):
def __init__(self, fileSystemObserver, dataDirectory, experiment):
FileSystemEventHandler.__init__(self)
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.fileSystemObserver = fileSystemObserver
self.dataDirectory = dataDirectory
self.experiment = experiment
def dispatch(self, event):
FileSystemEventHandler.dispatch(self, event)
def on_any_event(self, event):
FileSystemEventHandler.on_any_event(self, event)
self.logger.debug('File system any_event event: %s' % (event.__dict__))
def on_created(self, event):
FileSystemEventHandler.on_created(self, event)
self.logger.debug('File system created event: %s' % (event.__dict__))
self.processEvent(event)
def on_moved(self, event):
FileSystemEventHandler.on_moved(self, event)
self.logger.debug('File system moved event: %s' % (event.__dict__))
def on_deleted(self, event):
FileSystemEventHandler.on_deleted(self, event)
self.logger.debug('File system deleted event: %s' % (event.__dict__))
def on_modified(self, event):
FileSystemEventHandler.on_modified(self, event)
self.logger.debug('File system directory modified event: %s' % (event.__dict__))
self.processEvent(event)
def processEvent(self, event):
if event.is_directory:
try:
files = glob.glob(os.path.join(event.src_path,'*'))
self.logger.debug('Processing directory event: %s , src path: %s' % (event.__dict__, event.src_path))
if len(files) > 0:
sortedFiles = sorted(files, key=os.path.getctime, reverse=True)
for filePath in sortedFiles:
if os.path.isfile(filePath):
self.logger.debug('Latest file: %s' % (filePath))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
return
self.logger.debug('No new files found in %s' % (event.src_path))
except Exception, ex:
self.logger.error('Exception occured when searching for file in directory %s: %s' % (event.__dict__, ex))
else:
filePath = event.src_path
self.logger.debug('Processing file event: %s' % (event.__dict__))
self.fileSystemObserver.fileUpdated(filePath, self.dataDirectory, self.experiment)
####################################################################
# Testing
if __name__ == '__main__':
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.observers.api import ObservedWatch
from watchdog.observers.api import EventQueue
from watchdog.observers.api import EventEmitter
from watchdog.events import LoggingEventHandler
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
eventHandler = DmFileSystemEventHandler()
observer = PollingObserver()
observedWatch = observer.schedule(eventHandler, path, recursive=True)
print 'OBSERVED WATCH: ', observedWatch
#observer.add_handler_for_watch(eventHandler2, observedWatch)
#observer._clear_emitters()
print observer.emitters
observer.start()
try:
while True:
time.sleep(1)
print time.time()
except KeyboardInterrupt:
observer.stop()
observer.join()
#!/usr/bin/env python
import os
import copy
from dm.common.utility.loggingManager import LoggingManager
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class DsProcessFileNotificationPlugin(FileProcessor):
def __init__(self, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
daqInfo = fileInfo.get('daqInfo', {})
md5Sum = fileInfo.get('md5Sum')
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
# Prepare dictionary for processing. Only send needed data.
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
if md5Sum:
fileInfo2['md5Sum'] = md5Sum
fileInfo2['daqInfo'] = daqInfo
self.logger.debug('File info sent to DS service: %s' % (str(fileInfo2)))
self.dsFileApi.processFile(experimentFilePath, experimentName, fileInfo2)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
experimentName = uploadInfo.get('experimentName')
experimentDirectoryPath = ''
daqInfo = directoryInfo.get('daqInfo')
directoryInfo2 = {}
directoryInfo['experimentDirectoryPath'] = experimentDirectoryPath
directoryInfo2['experimentName'] = experimentName
directoryInfo2['daqInfo'] = daqInfo
self.logger.debug('Directory info sent to DS service: %s' % (str(directoryInfo2)))
self.dsFileApi.processDirectory(experimentDirectoryPath, experimentName, directoryInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass