Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1533 additions and 0 deletions
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.utility.osUtility import OsUtility
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
NOOP_COMMAND = '/bin/true'
def __init__(self, command, src=None, dest=None, dependsOn=[]):
FileProcessor.__init__(self, dependsOn)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
pass
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
destDirectory = fileInfo.get('destDirectory')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
def getSrcUrl(self, filePath, dataDirectory):
srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
if destDirectory:
destUrl = '%s/%s/' % (destUrl, destDirectory)
return destUrl
def getSrcDirUrl(self, dataDirectory):
return '%s/' % dataDirectory
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
if destDirectory:
destUrl = '%s/%s/' % (destUrl, destDirectory)
return destUrl
def getFullCommand(self, src, dest, command=None):
if command:
return '%s "%s" "%s"' % (command, src, dest)
else:
return '%s "%s" "%s"' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None, command=None, templateInfo={}, cwd=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
fileDest = dest
if dest is None:
fileDest = self.dest
# If destination is local, attempt to create it
if self.dest is not None and self.dest.find(':') < 0:
destDir = self.replaceTemplates(self.dest, templateInfo)
try:
OsUtility.createDir(destDir)
except Exception, ex:
self.logger.warn('Transfer may fail due to failure to create destination directory %s: %s' % (destDir, str(ex)))
fileSrc = self.replaceTemplates(fileSrc, templateInfo)
fileDest = self.replaceTemplates(fileDest, templateInfo)
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
# Determine normalized directory path for source and dest, and
# skip transfer if they are the same
normSrcDir = os.path.normpath(os.path.dirname(fileSrc))
fileName = os.path.basename(fileSrc)
if fileDest.endswith(fileName):
normDestDir = os.path.normpath(os.path.dirname(fileDest))
else:
normDestDir = os.path.normpath(fileDest)
if normSrcDir == normDestDir:
self.logger.debug('Skipping file transfer %s -> %s, source and destination are the same.' % (fileSrc, fileDest))
self.subprocess = DmSubprocess.getSubprocess(self.NOOP_COMMAND)
else:
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest, command), cwd=cwd)
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import os
import copy
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
#DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
#DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileDsApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles
if nRetries is not None:
self.setNumberOfRetries(nRetries)
self.setSkipOnFailure(skipOnFailure)
def replaceSpecialCharacters(self, url):
replacementMap = {
'#' : '%23',
' ' : '%20',
'~' : '%7E',
}
for (original, replacement) in replacementMap.items():
url = url.replace(original,replacement)
return url
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
elif self.src is None:
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % (self.src,filePath)
return self.replaceSpecialCharacters(srcUrl)
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory, destDirectory=None):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath)
if self.dest:
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
if destDirectory:
destUrl = 'sshftp://%s/%s/%s/%s/%s' % (storageHost, storageDirectory, destDirectory, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return self.replaceSpecialCharacters(destUrl)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
destDirectory = uploadInfo.get('destDirectory')
self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Replacement dir path: %s' % replacementDirPath)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
ftpUtility = SftpUtility(storageHost)
targetDirectory = storageDirectory
if destDirectory:
targetDirectory = '%s/%s/' % (storageDirectory, destDirectory)
storageFilePathsDict = {}
try:
self.logger.debug('Looking for existing files in %s' % targetDirectory)
storageFilePathsDict = ftpUtility.getFiles(targetDirectory, {}, replacementDirPath)
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), targetDirectory))
except Exception, ex:
self.logger.warn('Could not find existing files in %s, assuming there are none (got error: %s)' % (targetDirectory,ex))
# Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath)
if filePathDict is None:
# We are not attempting to transfer this file
# No need to change plugin file dict
continue
# Check size
fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('fileSize')
if fSize is None or sfSize is None or fSize != sfSize:
# Sizes differ, need to transfer file
continue
# Sizes are the same, check modify time
mTime = filePathDict.get('fileModificationTime')
smTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not smTime or mTime > smTime:
# Source time is later than storage time, need to transfer file
continue
# No need to transfer file
del filePathsDict[filePath]
self.logger.debug('Number of files that require processing: %s' % len(filePathsDict))
return filePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destDirectory = fileInfo.get('destDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
statUtility = self.statUtility
if not statUtility:
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(srcUrl, defaultPort=self.DEFAULT_PORT)
statUtility = FtpUtility(host, port)
if not fileInfo.get('fileSize'):
statUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
statUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.localMd5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
#if self.deleteOriginal:
# self.logger.debug('Deleting file %s' % filePath)
# OsUtility.removeFile(srcUrl)
def getSrcDirUrl(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s/' % (dataDirectory)
elif self.src is None:
srcUrl = 'file://%s/' % (dataDirectory)
else:
srcUrl = '%s/%s/' % (self.src,dataDirectory)
return srcUrl
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory, destDirectory=None):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
if destDirectory:
destUrl = '%s/%s/' % (destUrl, destDirectory)
return destUrl
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destDirectory = uploadInfo.get('destDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import os
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processStat(self, filePath, fileInfo={}):
if not self.statUtility:
return fileInfo
if not fileInfo.has_key('fileSize'):
self.statUtility.statFile(filePath)
return fileInfo
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand:
return fileInfo
experimentName = fileInfo.get('experimentName', '')
if not filePath.endswith('.h5'):
return fileInfo
command = '%s %s' % (self.hdf5MetadataCommand, filePath)
subprocess = DmSubprocess.getSubprocess(command)
subprocess.run()
stdout = subprocess.getStdOut().replace('\n', ';')
parts = stdout.split(';')
for part in parts:
keyValue = part.split('=')
key = keyValue[0]
if not len(key):
continue
value = ''
if len(keyValue) > 1:
value = '='.join(keyValue[1:])
if not fileInfo.has_key(key):
fileInfo[key] = value
else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
storageHost = daqInfo.get('storageHost')
storageUrl = daqInfo.get('storageUrl')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['fileProcessingTime'] = time.time()
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']:
if fileInfo.has_key(key):
t = fileInfo.get(key, 0)
fileInfo2[key] = t
key2 = '%sstamp' % key
fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t)
fileLocations = fileInfo.get('fileLocations', [])
fileLocations.append('%s/%s' % (storageUrl, experimentFilePath))
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['fileLocations'] = fileLocations
self.logger.debug('Daq info: %s' % (daqInfo))
fileInfo2.update(daqInfo)
if daqInfo.has_key('id'):
fileInfo2['daqId'] = daqInfo.get('id')
del fileInfo2['id']
for key in ['storageDirectory', 'storageUrl', 'storageHost']:
if fileInfo2.has_key(key):
del fileInfo2[key]
self.processStat(filePath, fileInfo2)
self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
daqInfo = directoryInfo.get('daqInfo')
experiment = directoryInfo.get('experiment')
filePathsDict = directoryInfo.get('filePathsDict')
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
destDirectory = uploadInfo.get('destDirectory')
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment, destDirectory=destDirectory)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
if uploadInfo.get('status') != dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.processFile(fileInfo)
nProcessedFiles += 1
else:
nCancelledFiles = nFiles - nProcessedFiles
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
processingInfo = uploadInfo.get('processingInfo')
processingInfo[self.name]['status'] = 'aborted'
break
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR --'
RSYNC_WITH_MKDIR_COMMAND = 'rsync -arvlPR --rsync-path="mkdir -p %s && rsync" --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND = 'rsync -arvlP --rsync-path="mkdir -p %s && rsync" --'
DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[], nRetries=None, skipOnFailure=False):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileDsApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
if nRetries is not None:
self.setNumberOfRetries(nRetries)
self.setSkipOnFailure(skipOnFailure)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
destDirectory = uploadInfo.get('destDirectory')
dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
if destDirectory:
dryRunCommand = '%s/%s/' % (dryRunCommand, destDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run()
lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {}
pathBase = dataDirectory
for line in lines:
if line.endswith(os.sep):
continue
filePath = os.path.join(pathBase, line)
filePathDict = filePathsDict.get(filePath)
if filePathDict:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s' % (len(filePathsDict), len(pluginFilePathsDict)))
return pluginFilePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destDirectory = fileInfo.get('destDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory, destDirectory)
# Use relative path with respect to data directory as a source
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
if not fileInfo.get('fileSize'):
FileUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
command = self.command
if destDirectory:
(scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl)
command = self.RSYNC_WITH_MKDIR_COMMAND % targetDirectory
command = self.replaceTemplates(command, fileInfo)
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=fileInfo, cwd=dataDirectory)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.localMd5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
OsUtility.removeFile(filePath)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destDirectory = uploadInfo.get('destDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory, destDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
command = self.DIRECTORY_TRANSFER_COMMAND
if destDirectory:
(scheme, host, port, targetDirectory) = FtpUtility.parseUrl(destUrl)
command = self.DIRECTORY_TRANSFER_WITH_MKDIR_COMMAND % targetDirectory
command = self.replaceTemplates(command, uploadInfo)
self.start(src=srcUrl, dest=destUrl, command=command, templateInfo=uploadInfo, cwd=dataDirectory)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class ScriptProcessingPlugin(FileProcessor):
PROCESSING_SCRIPT_KEY = 'processingScript'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
processingScript = daqInfo.get(self.PROCESSING_SCRIPT_KEY)
if not processingScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('%s %s' % (processingScript, storageFilePath))
p.run()
stdOut = p.getStdOut()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
fileInfo2['processingScriptOutput'] = '%s' % stdOut.strip()
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
class SddsParameterProcessingPlugin(FileProcessor):
PROCESS_SDDS_PARAMETERS_KEY = 'processSddsParameters'
def __init__(self):
FileProcessor.__init__(self)
self.fileMongoDbApi = FileMongoDbApi()
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
daqInfo = fileInfo.get('daqInfo', {})
processSddsParameters = daqInfo.get(self.PROCESS_SDDS_PARAMETERS_KEY)
if not processSddsParameters:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Load file
try:
import sddsdata
from sdds import SDDS
s = SDDS(0)
self.logger.error('Loading file %s for experiment %s' % (experimentFilePath, experimentName))
s.load(storageFilePath)
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
# Prepare catalogging entry
fileInfo2 = {}
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentName'] = experimentName
for i in range(0,len(s.parameterName)):
parameterName = s.parameterName[i]
parameterData = s.parameterData[i][0]
fileInfo2[parameterName] = parameterData
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
self.logger.error('SDDS terminate file %s for experiment %s' % (experimentFilePath, experimentName))
sddsdata.Terminate(0)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import time
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.utility.dmSubprocess import DmSubprocess
class SgeJobSubmissionPlugin(FileProcessor):
SGE_JOB_SCRIPT_KEY = 'sgeJobScript'
def __init__(self, sgeRoot):
FileProcessor.__init__(self)
self.sgeRoot = sgeRoot
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processFile(self, fileInfo):
experimentName = fileInfo.get('experimentName')
experimentFilePath = fileInfo.get('experimentFilePath')
daqInfo = fileInfo.get('daqInfo', {})
sgeJobScript = daqInfo.get(self.SGE_JOB_SCRIPT_KEY)
if not sgeJobScript:
self.logger.debug('Ignoring file %s for experiment %s' % (experimentFilePath, experimentName))
return
self.logger.debug('Processing file %s for experiment %s' % (experimentFilePath, experimentName))
storageDirectory = fileInfo.get('storageDirectory')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
# Process file
try:
p = DmSubprocess('. %s/default/common/settings.sh ; qsub -v DM_EXPERIMENT_NAME=%s %s %s' % (self.sgeRoot, experimentName, sgeJobScript, storageFilePath))
p.run()
except Exception, ex:
self.logger.error('Cannot process file %s for experiment %s: %s' % (experimentFilePath, experimentName, ex))
return
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class AuthorizationPrincipalAuthenticator:
def __init__(self, name=None):
self.name = name
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def getName(self):
return self.name
def authenticatePrincipal(self, principal, password):
return None
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
from dm.common.exceptions.authorizationError import AuthorizationError
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectCache import ObjectCache
from dm.common.utility.cryptUtility import CryptUtility
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
class AuthorizationPrincipalManager(DmObjectManager, Singleton):
DEFAULT_CACHE_SIZE = 10000 # number of items
DEFAULT_CACHE_OBJECT_LIFETIME = 3600 # seconds
CONFIG_SECTION_NAME = 'AuthorizationPrincipalManager'
PRINCIPAL_RETRIEVER_KEY = 'principalretriever'
PRINCIPAL_AUTHENTICATOR_KEY = 'principalauthenticator'
# Singleton instance.
__instance = None
def __init__(self):
if AuthorizationPrincipalManager.__instance:
return
AuthorizationPrincipalManager.__instance = self
DmObjectManager.__init__(self)
self.configurationManager = ConfigurationManager.getInstance()
self.principalRetriever = None
self.principalAuthenticatorList = []
self.objectCache = ObjectCache(AuthorizationPrincipalManager.DEFAULT_CACHE_SIZE, AuthorizationPrincipalManager.DEFAULT_CACHE_OBJECT_LIFETIME)
self.configure()
@classmethod
def cryptPassword(cls, cleartext):
return CryptUtility.cryptPassword(cleartext)
@classmethod
def cryptPasswordWithPbkdf2(cls, cleartext):
return CryptUtility.cryptPasswordWithPbkdf2(cleartext)
def configure(self):
configItems = self.configurationManager.getConfigItems(AuthorizationPrincipalManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
# Create principal retriever
principalRetriever = self.configurationManager.getConfigOption(AuthorizationPrincipalManager.CONFIG_SECTION_NAME, AuthorizationPrincipalManager.PRINCIPAL_RETRIEVER_KEY)
(moduleName,className,constructor) = self.configurationManager.getModuleClassConstructorTuple(principalRetriever, AuthorizationPrincipalManager)
self.logger.debug('Creating principal retriever class: %s' % className)
self.principalRetriever = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Authorization principal retriever: %s' % (self.principalRetriever))
# Create principal authenticators
for (key,value) in configItems:
if key.startswith(AuthorizationPrincipalManager.PRINCIPAL_AUTHENTICATOR_KEY):
(moduleName,className,constructor) = self.configurationManager.getModuleClassConstructorTuple(value, AuthorizationPrincipalManager)
self.logger.debug('Creating principal authenticator class: %s' % className)
principalAuthenticator = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.addAuthorizationPrincipalAuthenticator(principalAuthenticator)
self.logger.debug('Authorization principal authenticator: %s' % (principalAuthenticator))
def addAuthorizationPrincipalAuthenticator(self, principalAuthenticator):
self.principalAuthenticatorList.append(principalAuthenticator)
def getAuthenticatedAuthorizationPrincipal(self, username, password):
""" Get principal based on a username and password """
# First try cache.
#self.logger.debug('Trying username %s from the cache' % username)
principal = None
principal = self.objectCache.get(username)
if principal is None:
# Try principal retriever
principal = self.principalRetriever.getAuthorizationPrincipal(username)
if principal is None:
self.logger.debug('No principal for username: %s' % username)
return
# Try all authorization principal authenticators.
for principalAuthenticator in self.principalAuthenticatorList:
self.logger.debug('Attempting to authenticate %s by %s' % (username, principalAuthenticator.getName()))
authenticatedPrincipal = principalAuthenticator.authenticatePrincipal(principal, password)
if authenticatedPrincipal is not None:
self.logger.debug('Adding authorization principal %s to the cache, authenticated by %s' % (principal.getName(),principalAuthenticator.getName()))
self.objectCache.put(username, authenticatedPrincipal)
return authenticatedPrincipal
return None
def removeAuthorizationPrincipal(self, username):
""" Clear principal from the cache. """
self.objectCache.remove(username)
#######################################################################
# Testing.
if __name__ == '__main__':
am = AuthorizationPrincipalManager.getInstance()
authPrincipal = am.getAuthorizationPrincipal('sveseli', 'sv')
print 'Auth principal: ', authPrincipal
#!/usr/bin/env python
from dm.common.constants import dmRole
from dm.common.utility.loggingManager import LoggingManager
class AuthorizationPrincipalRetriever:
def __init__(self, name=None):
self.name = name
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def getName(self):
return self.name
def getAuthorizationPrincipal(self, username):
return None
def setAuthorizationPrincipalSessionRole(self, principal):
if principal is None:
return
for userSystemRoleId in principal.get('userSystemRoleDict', {}).keys():
if userSystemRoleId == dmRole.DM_ADMIN_SYSTEM_ROLE_ID:
principal.setSessionRole(dmRole.DM_ADMIN_SESSION_ROLE)
return
principal.setSessionRole(dmRole.DM_USER_SESSION_ROLE)
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.cryptUtility import CryptUtility
from authorizationPrincipalAuthenticator import AuthorizationPrincipalAuthenticator
class CryptedPasswordPrincipalAuthenticator(AuthorizationPrincipalAuthenticator):
def __init__(self):
AuthorizationPrincipalAuthenticator.__init__(self, self.__class__.__name__)
def authenticatePrincipal(self, principal, password):
if principal is not None:
principalToken = principal.getToken()
if principalToken is not None and len(principalToken):
if CryptUtility.verifyPasswordWithPbkdf2(password, principalToken):
self.logger.debug('Authentication successful for %s' % principal.getName())
return principal
else:
self.logger.debug('Authentication failed for %s' % principal.getName())
else:
self.logger.debug('Token is empty for %s, authentication not performed' % principal.getName())
return None
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.constants import dmRole
from dm.common.objects.authorizationPrincipal import AuthorizationPrincipal
from dm.common.db.api.userDbApi import UserDbApi
from authorizationPrincipalRetriever import AuthorizationPrincipalRetriever
class DbPrincipalRetriever(AuthorizationPrincipalRetriever):
def __init__(self):
AuthorizationPrincipalRetriever.__init__(self, self.__class__.__name__)
self.dbApi = UserDbApi()
def getAuthorizationPrincipal(self, username):
principal = None
try:
user = self.dbApi.getUserWithPasswordByUsername(username)
principal = AuthorizationPrincipal(name=username, token=user.get('password'))
principal.setUserSystemRoleDict(user.get('userSystemRoleDict', {}))
principal.setUserExperimentRoleDict(user.get('userExperimentRoleDict', {}))
self.setAuthorizationPrincipalSessionRole(principal)
except Exception, ex:
self.logger.debug(ex)
return principal
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.ldapUtility import LdapUtility
from authorizationPrincipalAuthenticator import AuthorizationPrincipalAuthenticator
class LdapPasswordPrincipalAuthenticator(AuthorizationPrincipalAuthenticator):
def __init__(self, serverUrl, dnFormat):
AuthorizationPrincipalAuthenticator.__init__(self, self.__class__.__name__)
self.ldapUtility = LdapUtility(serverUrl, dnFormat)
def authenticatePrincipal(self, principal, password):
if principal is not None:
try:
self.logger.debug('Checking credentials for %s' % principal.getName())
self.ldapUtility.checkCredentials(principal.getName(), password)
return principal
except Exception, ex:
self.logger.debug(ex)
return None
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.constants import dmRole
from dm.common.objects.authorizationPrincipal import AuthorizationPrincipal
from dm.common.utility.cryptUtility import CryptUtility
from authorizationPrincipalRetriever import AuthorizationPrincipalRetriever
class NoOpPrincipalRetriever(AuthorizationPrincipalRetriever):
def __init__(self):
AuthorizationPrincipalRetriever.__init__(self, self.__class__.__name__)
def getAuthorizationPrincipal(self, username):
# Set password to be the same as username
noOpPassword = CryptUtility.cryptPasswordWithPbkdf2(username)
principal = AuthorizationPrincipal(name=username, token=noOpPassword)
principal.setUserSystemRoleDict({dmRole.DM_ADMIN_SYSTEM_ROLE_ID:[]})
principal.setUserExperimentRoleDict({})
self.setAuthorizationPrincipalSessionRole(principal)
return principal
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dm.common.utility.loggingManager import LoggingManager
class SessionManager:
def __init__(self, name=None):
self.name = name
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def getName(self):
return self.name
def addSession(self, sessionId, sessionInfo):
return None
def checkSession(self, sessionId):
return None
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.objects.dmObjectManager import DmObjectManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.singleton import Singleton
from dm.common.exceptions.objectNotFound import ObjectNotFound
class SingleSignOnManager(DmObjectManager, Singleton):
DEFAULT_SESSION_TIMEOUT_IN_SECONDS = 3600 # seconds
CONFIG_SECTION_NAME = 'SingleSignOnManager'
SESSION_MANAGER_KEY = 'sessionManager'
SESSION_TIMEOUT_IN_SECONDS_KEY = 'sessionTimeoutInSeconds'
# Singleton instance.
__instance = None
def __init__(self):
if SingleSignOnManager.__instance:
return
SingleSignOnManager.__instance = self
DmObjectManager.__init__(self)
self.configurationManager = ConfigurationManager.getInstance()
self.sessionManager = None
self.configure()
def configure(self):
configItems = self.configurationManager.getConfigItems(SingleSignOnManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
if not configItems:
return
sessionTimeout = self.configurationManager.getConfigOption(SingleSignOnManager.CONFIG_SECTION_NAME, SingleSignOnManager.SESSION_TIMEOUT_IN_SECONDS_KEY, SingleSignOnManager.DEFAULT_SESSION_TIMEOUT_IN_SECONDS)
self.logger.debug('Session timeout: %s' % sessionTimeout)
# Create session manager
sessionManager = self.configurationManager.getConfigOption(SingleSignOnManager.CONFIG_SECTION_NAME, SingleSignOnManager.SESSION_MANAGER_KEY)
(moduleName,className,constructor) = self.configurationManager.getModuleClassConstructorTuple(sessionManager, SingleSignOnManager)
self.logger.debug('Creating session manager class: %s' % className)
self.sessionManager = ObjectUtility.createObjectInstance(moduleName, className, constructor)
def addSession(self, sessionId, sessionInfo):
if self.sessionManager:
self.logger.debug('Adding session id %s: %s' % (sessionId,sessionInfo))
try:
return self.sessionManager.addSession(sessionId, sessionInfo)
except Exception, ex:
self.logger.error(ex)
return None
def checkSession(self, sessionId):
if self.sessionManager:
self.logger.debug('Checking session id: %s' % sessionId)
try:
return self.sessionManager.checkSession(sessionId)
except ObjectNotFound, ex:
self.logger.debug(ex)
except Exception, ex:
self.logger.error(ex)
return None
#######################################################################
# Testing.
if __name__ == '__main__':
am = AuthorizationPrincipalManager.getInstance()
authPrincipal = am.getAuthorizationPrincipal('sveseli', 'sv')
print 'Auth principal: ', authPrincipal
#!/usr/bin/env python
#
# Base DM controller class.
#
#######################################################################
import cherrypy
import httplib
import json
from sys import exc_info
from dm.common.utility.loggingManager import LoggingManager
from dm.common.constants import dmStatus
from dm.common.constants import dmHttpHeaders
from dm.common.exceptions.dmException import DmException
from dm.common.exceptions import dmExceptionMap
from dm.common.exceptions.internalError import InternalError
#######################################################################
class DmController(object):
""" Base controller class. """
def __init__(self):
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
@classmethod
def getLogger(cls):
logger = LoggingManager.getInstance().getLogger(cls.__name__)
return logger
@classmethod
def addDmResponseHeaders(cls, status=dmStatus.DM_OK, msg='Success', exceptionType=None):
cherrypy.response.headers[dmHttpHeaders.DM_STATUS_CODE_HTTP_HEADER] = status
cherrypy.response.headers[dmHttpHeaders.DM_STATUS_MESSAGE_HTTP_HEADER] = msg
if exceptionType is not None:
cherrypy.response.headers[dmHttpHeaders.DM_EXCEPTION_TYPE_HTTP_HEADER] = exceptionType
@classmethod
def addDmSessionRoleHeaders(cls, role):
cherrypy.response.headers[dmHttpHeaders.DM_SESSION_ROLE_HTTP_HEADER] = role
@classmethod
def addDmExceptionHeaders(cls, ex):
cls.handleException(ex)
@classmethod
def handleCpException(cls):
cherrypy.response.status = httplib.OK
ex = exc_info()[1]
if ex == None:
ex = InternalError('Internal Webservice Error')
cls.handleException(ex)
@classmethod
def handleException(cls, ex):
exClass = ex.__class__.__name__.split('.')[-1]
status = None
msg = '%s' % ex
msg = msg.split('\n')[0]
for code in dmExceptionMap.DM_EXCEPTION_MAP.keys():
exStr = dmExceptionMap.DM_EXCEPTION_MAP.get(code).split('.')[-1]
if exStr == exClass:
status = code
if not status:
status = dmStatus.DM_ERROR
cls.addDmResponseHeaders(status, msg, exClass)
@classmethod
def formatJsonResponse(cls, response):
cherrypy.response.headers['Content-Type'] = 'application/json'
return '%s' % (response)
@classmethod
def toJson(cls, o):
return json.dumps(o)
@classmethod
def fromJson(cls, s):
return json.loads(s)
@classmethod
def listToJson(cls, dmObjectList):
jsonList = []
for dmObject in dmObjectList:
jsonList.append(dmObject.getDictRep(keyList='ALL'))
return json.dumps(jsonList)
@classmethod
def getSessionUser(cls):
return cherrypy.serving.session.get('user')
@classmethod
def getSessionUsername(cls):
return cherrypy.serving.session.get('_cp_username')
# Exception decorator for all exposed method calls
@classmethod
def execute(cls, func):
def decorate(*args, **kwargs):
try:
response = func(*args, **kwargs)
except DmException, ex:
cls.getLogger().error('%s' % ex)
cls.handleException(ex)
response = ex.getFullJsonRep()
except Exception, ex:
cls.getLogger().error('%s' % ex)
cls.handleException(ex)
response = InternalError(ex).getFullJsonRep()
return cls.formatJsonResponse(response)
return decorate
#!/usr/bin/env python
#
# Base web service class.
#
####################################################################
import sys
import os
import cherrypy
from cherrypy.process import plugins
from cherrypy import server
from dm.common.constants import dmStatus
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmModuleManager import DmModuleManager
from dm.common.exceptions.configurationError import ConfigurationError
####################################################################
class DmRestWebServiceBase:
DEFAULT_N_SERVER_REQUEST_THREADS = 50
DEFAULT_SERVER_SOCKET_TIMEOUT = 30
CONFIG_SECTION_NAME = 'WebService'
CONFIG_OPTION_NAME_LIST = [ 'serviceHost', 'servicePort',
'sslCertFile', 'sslKeyFile', 'sslCaCertFile', 'stationName' ]
class SignalHandler:
def __init__(self, signal, oldSignalHandler):
self.signal = signal
self.oldSignalHandler = oldSignalHandler
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def signalHandler(self):
self.logger.debug('%s signal handler called' % self.signal)
DmModuleManager.getInstance().stopModules()
self.oldSignalHandler()
def __init__(self, routeMapper):
self.configurationManager = ConfigurationManager.getInstance()
self.routeMapper = routeMapper
self.options = None
self.args = None
self.logger = None
def prepareOptions(self):
from optparse import OptionParser
p = OptionParser()
p.add_option('-d', '--daemon', action="store_true",
dest='daemonFlag', default=False,
help="Run as a daemon.")
p.add_option('-p', '--pid-file',
dest='pidFile', default=None,
help="Store process id in the given file.")
p.add_option('', '--config-file',
dest='configFile', default=None,
help="Service configuration file.")
p.add_option('-P', '--port',
dest='servicePort', default=None,
help="Service port.")
p.add_option('-H', '--host',
dest='serviceHost', default=None,
help="Service host.")
p.add_option('-C', '--ssl-ca-cert',
dest='sslCaCertFile', default=None,
help='SSL CA certificate path (used for client SSL certificate verification). Requires --ssl-key and --ssl-cert.')
p.add_option('-c', '--ssl-cert',
dest='sslCertFile', default=None,
help='SSL certificate path. SSL operation requires both --ssl-key and --ssl-cert. Client SSL certificate verification also requires --ssl-ca-cert.')
p.add_option('-k', '--ssl-key',
dest='sslKeyFile', default=None,
help='SSL key path. SSL operation requires both --ssl-key and --ssl-cert. Client SSL certificate verification also requires --ssl-ca-cert.')
p.add_option('', '--n-server-threads',
dest='nServerThreads', default=DmRestWebServiceBase.DEFAULT_N_SERVER_REQUEST_THREADS,
help='Number of service request handler threads (defaut: %s).' % DmRestWebServiceBase.DEFAULT_N_SERVER_REQUEST_THREADS)
return p
def initDmModules(self):
return None
def getDefaultServerHost(self):
return None
def getDefaultServerPort(self):
return None
# Instantiate modified signal handler that stops dm modules first,
# and then does the default action.
def modifySignalHandlers(self, engine):
pluginsSignalHandler = plugins.SignalHandler(engine)
handlers = pluginsSignalHandler.handlers
# Add handler for interrupt
handlers['SIGINT'] = engine.exit
# Modify all signal handlers
for signal in handlers.keys():
self.logger.debug('Modifying signal: %s' % signal)
oldSignalHandler = handlers[signal]
self.logger.debug('Old signal handler: %s' % oldSignalHandler)
signalHandler = DmRestWebServiceBase.SignalHandler(signal, oldSignalHandler)
self.logger.debug('Setting signal handler to: %s' % signalHandler.signalHandler)
handlers[signal] = signalHandler.signalHandler
pluginsSignalHandler.subscribe()
def initServerLog(self):
cherrypyLogLevel = self.configurationManager.getCherrypyLogLevel()
cherrypy.log.error_log.setLevel(cherrypyLogLevel)
cherrypy.log.error_file = self.configurationManager.getCherrypyLogFile()
cherrypy.log.error_log.propagate = False
cherrypy.log.access_log.setLevel(cherrypyLogLevel)
cherrypy.log.access_file = self.configurationManager.getCherrypyAccessFile()
cherrypy.log.access_log.propagate = False
def updateServerConfig(self):
serviceHost = self.configurationManager.getServiceHost()
servicePort = int(self.configurationManager.getServicePort())
nServerThreads = int(self.options.nServerThreads)
configDict = {
'server.socket_host' : serviceHost,
'server.socket_port' : servicePort,
'server.thread_pool' : nServerThreads,
'log.screen' : (self.options.daemonFlag != True),
}
cherrypy.config.update(configDict)
def readConfigFile(self, configFile):
configFile = self.options.configFile
if not configFile:
configFile = self.configurationManager.getConfigFile()
else:
self.configurationManager.setConfigFile(configFile)
if not os.path.exists(configFile):
raise ConfigurationError('Configuration file %s does not exist.' % configFile)
# Read file and set config options
self.configurationManager.setOptionsFromConfigFile(DmRestWebServiceBase.CONFIG_SECTION_NAME, DmRestWebServiceBase.CONFIG_OPTION_NAME_LIST, configFile)
def readCommandLineOptions(self):
# This method should be called after reading config file
# in case some options are overridden
if self.options.sslCaCertFile != None:
self.configurationManager.setSslCaCertFile(self.options.sslCaCertFile)
if self.options.sslCertFile != None:
self.configurationManager.setSslCertFile(self.options.sslCertFile)
if self.options.sslKeyFile != None:
self.configurationManager.setSslKeyFile(self.options.sslKeyFile)
if self.options.serviceHost != None:
self.configurationManager.setServiceHost(self.options.serviceHost)
if self.options.servicePort != None:
self.configurationManager.setServicePort(self.options.servicePort)
def prepareServer(self):
try:
optionParser = self.prepareOptions()
(self.options, self.args) = optionParser.parse_args()
# Read config file and override with command line options
self.readConfigFile(self.options.configFile)
self.readCommandLineOptions()
# Turn off console log for daemon mode.
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if self.options.daemonFlag:
LoggingManager.getInstance().setConsoleLogLevel('CRITICAL')
dispatch = self.routeMapper.setupRoutes()
self.logger.debug('Using route dispatch: %s' % dispatch)
config = {
'/' : {
'request.dispatch' : dispatch,
},
}
# No root controller as we provided our own.
cherrypy.tree.mount(root=None, config=config)
self.initServerLog()
self.updateServerConfig()
self.logger.info('Using host %s' % self.configurationManager.getServiceHost())
self.logger.info('Using port %s' % self.configurationManager.getServicePort())
self.logger.debug('Using %s request handler threads' % self.options.nServerThreads)
except Exception, ex:
if self.logger is not None:
self.logger.exception(ex)
else:
import traceback
print '\n%s' % sys.exc_info()[1]
traceback.print_exc(file=sys.stderr)
sys.exit(dmStatus.DM_ERROR)
# Run server.
def __runServer(self):
self.logger.info('Starting service')
engine = cherrypy.engine
# Set up Deamonization
if self.options.daemonFlag:
plugins.Daemonizer(engine).subscribe()
self.logger.debug('Daemon mode: %s' % self.options.daemonFlag)
if self.options.pidFile != None:
plugins.PIDFile(engine, self.options.pidFile).subscribe()
self.logger.debug('Using PID file: %s' % self.options.pidFile)
sslCertFile = self.configurationManager.getSslCertFile()
sslKeyFile = self.configurationManager.getSslKeyFile()
sslCaCertFile = self.configurationManager.getSslCaCertFile()
if sslCertFile != None and sslKeyFile != None:
server.ssl_ca_certificate = None
if sslCaCertFile != None:
server.ssl_ca_certificate = self.options.sslCaCertFile
self.logger.info('Using SSL CA cert file: %s' % sslCaCertFile)
server.ssl_certificate = sslCertFile
self.logger.info('Using SSL cert file: %s' % sslCertFile)
server.ssl_private_key = sslKeyFile
self.logger.info('Using SSL key file: %s' % sslKeyFile)
server.ssl_module = 'builtin'
#server.ssl_module = 'pyopenssl'
# Increase timeout to prevent early SSL connection terminations
server.socket_timeout = DmRestWebServiceBase.DEFAULT_SERVER_SOCKET_TIMEOUT
# Setup the signal handler to stop the application while running.
if hasattr(engine, 'signal_handler'):
self.logger.debug('Subscribing signal handler')
engine.signal_handler.subscribe()
self.modifySignalHandlers(engine)
# Turn off autoreloader.
self.logger.debug('Turning off autoreloader')
engine.autoreload.unsubscribe()
# Start the engine.
try:
self.logger.debug('Starting engine')
engine.start()
# Prepare dm services.
self.logger.debug('Starting modules')
self.initDmModules()
DmModuleManager.getInstance().startModules()
except Exception, ex:
self.logger.exception('Service exiting: %s' % ex)
DmModuleManager.getInstance().stopModules()
return dmStatus.DM_ERROR
self.logger.info('Service ready')
engine.block()
DmModuleManager.getInstance().stopModules()
self.logger.info('Service done')
return dmStatus.DM_OK
# Run server instance.
def run(self):
self.prepareServer()
sys.exit(self.__runServer())
####################################################################
# Testing
if __name__ == '__main__':
pass