Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • DM/dm-docs
  • hammonds/dm-docs
  • hparraga/dm-docs
3 results
Show changes
Showing
with 1211 additions and 0 deletions
#!/usr/bin/env python
from dmObject import DmObject
class FileMetadata(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'fileName', 'experimentName', 'locationList', 'filePath', 'fileSize', 'md5Sum', 'experimentFilePath' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class LdapUserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'userDn', 'userAttrs' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
def getUserPassword(self):
return self['userAttrs']['userPassword'][0]
#!/usr/bin/env python
import time
import os
from dmObject import DmObject
import urlparse
class ObservedFile(DmObject):
DEFAULT_KEY_LIST = [ 'filePath', 'lastUpdateTime' ]
def __init__(self, dict={}, filePath=None, dataDirectory=None, experiment=None):
DmObject.__init__(self, dict)
if filePath:
self['filePath'] = filePath
if dataDirectory:
self['dataDirectory'] = dataDirectory
if filePath:
parseResult = urlparse.urlparse(dataDirectory)
self['experimentFilePath'] = os.path.relpath(filePath, parseResult.path)
if experiment:
self['experimentName'] = experiment.get('name')
self['storageHost'] = experiment.get('storageHost')
self['storageDirectory'] = experiment.get('storageDirectory')
def setLastUpdateTimeToNow(self):
self['lastUpdateTime'] = time.time()
def getLastUpdateTime(self):
self.get('lastUpdateTime')
def getFilePath(self):
return self.get('filePath')
def getDataDirectory(self):
return self.get('dataDirectory')
####################################################################
# Testing
if __name__ == '__main__':
of = ObservedFile(filePath='tmp/xyz')
print of
of.setLastUpdateTimeToNow()
print of
#!/usr/bin/env python
from dmObject import DmObject
class PluginInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'dependsOn' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class RoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
from dmObject import DmObject
class Storage(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description', 'defaultScheme' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
from dmObject import DmObject
class SystemRoleType(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'name', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import time
import threading
from dmObject import DmObject
from dm.common.constants import dmProcessingStatus
from dm.common.utility.dictUtility import DictUtility
from dm.common.utility.timeUtility import TimeUtility
class UploadInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'experimentName', 'dataDirectory', 'status', 'nProcessedFiles', 'nProcessingErrors', 'nCancelledFiles', 'nCompletedFiles', 'nWaitingFiles', 'nFiles', 'percentageComplete', 'percentageProcessed', 'percentageProcessingErrors', 'percentageCancelled', 'startTime', 'endTime', 'runTime', 'startTimestamp', 'endTimestamp', 'errorMessage' ]
def __init__(self, dict={}):
DmObject.__init__(self, dict)
self.lock = threading.RLock()
def fileProcessed(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
self['lastFileProcessed'] = filePath
self['lastFileProcessedTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingError(self, filePath, processingError, processingEndTime):
self.lock.acquire()
try:
self['nProcessingErrors'] = self.get('nProcessingErrors', 0) + 1
processingErrors = self.get('processingErrors', {})
processingErrors[filePath] = processingError
self['processingErrors'] = processingErrors
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
self['lastFileProcessingErrorTime'] = processingEndTime
finally:
self.lock.release()
def fileProcessingCancelled(self, filePath, processingEndTime):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + 1
lastFileProcessingCancelledTime = self.get('lastFileProcessingCancelledTime', 0)
if processingEndTime is not None and processingEndTime > lastFileProcessingCancelledTime:
self['lastFileProcessingCancelledTime'] = processingEndTime
finally:
self.lock.release()
def uploadAborted(self, nCancelledFiles):
self.lock.acquire()
try:
self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + nCancelledFiles
finally:
self.lock.release()
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING)
if uploadStatus in dmProcessingStatus.DM_INACTIVE_PROCESSING_STATUS_LIST:
return
nFiles = self.get('nFiles', 0)
nProcessedFiles = self.get('nProcessedFiles', 0)
nCancelledFiles = self.get('nCancelledFiles', 0)
nProcessingErrors = self.get('nProcessingErrors', 0)
processingErrors = self.get('processingErrors', {})
nCompletedFiles = nProcessedFiles+nProcessingErrors
self['nCompletedFiles'] = nCompletedFiles
nWaitingFiles = nFiles-nCompletedFiles-nCancelledFiles
self['nWaitingFiles'] = nWaitingFiles
startTime = self.get('startTime', now)
runTime = now - startTime
endTime = None
if nCompletedFiles == nFiles and uploadStatus != dmProcessingStatus.DM_PROCESSING_STATUS_PENDING:
uploadStatus = 'done'
if nProcessingErrors:
uploadStatus = 'failed'
lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
lastFileProcessedTime = self.get('lastFileProcessedTime')
endTime = lastFileProcessedTime
if not endTime or lastFileProcessingErrorTime > endTime:
endTime = lastFileProcessingErrorTime
if nCancelledFiles > 0 and nCancelledFiles+nCompletedFiles == nFiles:
uploadStatus = dmProcessingStatus.DM_PROCESSING_STATUS_ABORTED
endTime = self.get('lastFileProcessingCancelledTime', now)
if endTime:
runTime = endTime - startTime
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
self['runTime'] = runTime
self['status'] = uploadStatus
percentageComplete = 100.0
percentageProcessed = 100.0
percentageCancelled = 0.0
percentageProcessingErrors = 0.0
if nFiles > 0:
percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
percentageCancelled = float(nCancelledFiles)/float(nFiles)*100.0
percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
self['percentageComplete'] = '%.2f' % percentageComplete
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
if nCancelledFiles > 0:
self['percentageCancelled'] = '%.2f' % percentageCancelled
#!/usr/bin/env python
from dmObject import DmObject
class UserExperimentRole(DmObject):
DEFAULT_KEY_LIST = [ 'userId', 'experimentId', 'roleTypeId' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserInfo(DmObject):
DEFAULT_KEY_LIST = [ 'id', 'username', 'firstName', 'lastName', 'middleName', 'email', 'description' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
from dmObject import DmObject
class UserSystemRole(DmObject):
DEFAULT_KEY_LIST = [ 'userId', 'experimentStationId', 'roleTypeId' ]
def __init__(self, dict):
DmObject.__init__(self, dict)
#!/usr/bin/env python
import threading
import time
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.configurationManager import ConfigurationManager
from dm.common.utility.objectUtility import ObjectUtility
from dm.common.utility.valueUtility import ValueUtility
from dm.common.utility.timeBasedProcessingQueue import TimeBasedProcessingQueue
from dm.common.utility.singleton import Singleton
from fileProcessingThread import FileProcessingThread
class FileProcessingManager(threading.Thread,Singleton):
CONFIG_SECTION_NAME = 'FileProcessingManager'
NUMBER_OF_PROCESSING_THREADS_KEY = 'numberofprocessingthreads'
DEFAULT_NUMBER_OF_RETRIES_KEY = 'defaultnumberofretries'
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY = 'defaultretrywaitperiodinseconds'
STAT_UTILITY_KEY = 'statutility'
FILE_PROCESSOR_KEY = 'fileprocessor'
# Singleton.
__instanceLock = threading.RLock()
__instance = None
def __init__(self):
FileProcessingManager.__instanceLock.acquire()
try:
if FileProcessingManager.__instance:
return
FileProcessingManager.__instance = self
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
self.logger.debug('Initializing')
self.lock = threading.RLock()
self.fileProcessingThreadDict = {}
self.eventFlag = threading.Event()
self.fileProcessorDict = {}
self.fileProcessorKeyList = []
self.fileProcessingQueue = TimeBasedProcessingQueue()
self.__configure()
self.logger.debug('Initialization complete')
finally:
FileProcessingManager.__instanceLock.release()
def __configure(self):
cm = ConfigurationManager.getInstance()
configItems = cm.getConfigItems(FileProcessingManager.CONFIG_SECTION_NAME)
self.logger.debug('Got config items: %s' % configItems)
self.nProcessingThreads = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.NUMBER_OF_PROCESSING_THREADS_KEY))
self.defaultNumberOfRetries = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_NUMBER_OF_RETRIES_KEY))
self.defaultRetryWaitPeriodInSeconds = int(cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS_KEY))
statUtility = cm.getConfigOption(FileProcessingManager.CONFIG_SECTION_NAME, FileProcessingManager.STAT_UTILITY_KEY)
if statUtility:
(statUtilityModuleName,statUtilityClassName,statUtilityConstructor) = cm.getModuleClassConstructorTuple(statUtility)
# Create processors
for (key,value) in configItems:
if key.startswith(FileProcessingManager.FILE_PROCESSOR_KEY):
(moduleName,className,constructor) = cm.getModuleClassConstructorTuple(value)
self.logger.debug('Creating file processor instance of class %s' % className)
fileProcessor = ObjectUtility.createObjectInstance(moduleName, className, constructor)
self.logger.debug('Configuring file processor %s' % fileProcessor)
fileProcessor.setNumberOfRetries(self.defaultNumberOfRetries)
fileProcessor.setRetryWaitPeriodInSeconds(self.defaultRetryWaitPeriodInSeconds)
statUtilityObject = None
if statUtility:
statUtilityObject = ObjectUtility.createObjectInstance(statUtilityModuleName, statUtilityClassName, statUtilityConstructor)
self.logger.debug('Using stat utility object %s' % str(statUtilityObject))
fileProcessor.statUtility = statUtilityObject
fileProcessor.configure()
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList = self.fileProcessorDict.keys()
self.fileProcessorKeyList.sort()
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
# Assign processor names
processorNumber = 0
for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey)
processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
processor.name = processorName
# Corect processor dependenciens
for processorKey in self.fileProcessorKeyList:
self.logger.debug('Determining dependencies for processor %s' % (processorKey))
processor = self.fileProcessorDict.get(processorKey)
dependsOn = []
for depProcessorKey in processor.dependsOn:
depProcessor = self.fileProcessorDict.get(depProcessorKey.lower())
if depProcessor:
dependsOn.append(depProcessor.name)
processor.dependsOn = dependsOn
self.logger.debug('Processor %s depends on: %s' % (processor.name, processor.dependsOn))
# Remove hidden files from dictionary of files to be processed
def removeHiddenFilesFromProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('processHiddenFiles')):
del uploadInfo['processHiddenFiles']
return filePathsDict
self.logger.debug('Checking for hidden files')
nRemoved = 0
for (filePath,filePathDict) in filePathsDict.items():
fileName = os.path.basename(filePath)
if fileName.startswith('.'):
self.logger.debug('File path %s is hidden file, will not process it' % filePath)
del filePathsDict[filePath]
nRemoved += 1
self.logger.debug('Removed %s hidden files, %s candidates remaining' % (nRemoved, len(filePathsDict)))
return filePathsDict
# Each plugin calculates list of files that need to be processed
# Final result is union of all plugins
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if ValueUtility.toBoolean(uploadInfo.get('reprocessFiles')):
del uploadInfo['reprocessFiles']
return filePathsDict
self.logger.debug('Checking files with processor plugins')
checkedFilePathsDict = {}
for processorKey in self.fileProcessorKeyList:
processor = self.fileProcessorDict.get(processorKey)
# Processor will return list of files it must process
pluginFilePathsDict = processor.checkUploadFilesForProcessing(filePathsDict, uploadInfo)
if len(pluginFilePathsDict):
checkedFilePathsDict.update(pluginFilePathsDict)
self.logger.debug('There are %s processing candidates remaining' % (len(filePathsDict)))
return checkedFilePathsDict
def processFile(self, fileInfo):
self.fileProcessingQueue.push(fileInfo)
self.eventFlag.set()
def appendFileProcessor(self, fileProcessor):
key = fileProcessor.__class__.__name__
self.logger.debug('Adding file processor: %s' % key)
self.fileProcessorDict[key] = fileProcessor
self.fileProcessorKeyList.append(key)
self.logger.debug('File processor keys: %s' % self.fileProcessorKeyList)
def start(self):
self.lock.acquire()
try:
self.logger.debug('Starting file processing threads')
for i in range(0, self.nProcessingThreads):
tName = 'FileProcessingThread-%s' % i
t = FileProcessingThread(tName, self, self.fileProcessorDict, self.fileProcessorKeyList, self.fileProcessingQueue)
t.start()
self.fileProcessingThreadDict[tName] = t
finally:
self.lock.release()
def stop(self):
self.logger.debug('Stopping file processing threads')
for (tName, t) in self.fileProcessingThreadDict.items():
t.stop()
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
for (tName, t) in self.fileProcessingThreadDict.items():
t.join()
def setEvent(self):
self.lock.acquire()
try:
self.eventFlag.set()
finally:
self.lock.release()
def clearEvent(self):
self.lock.acquire()
try:
self.eventFlag.clear()
finally:
self.lock.release()
def waitOnEvent(self, timeoutInSeconds=None):
self.eventFlag.wait(timeoutInSeconds)
####################################################################
# Testing
if __name__ == '__main__':
fp = FileProcessingManager.getInstance()
print fp
#fp.start()
#time.sleep(30)
#fp.stop()
#!/usr/bin/env python
import threading
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager
class FileProcessingThread(threading.Thread):
THREAD_EVENT_TIMEOUT_IN_SECONDS = 10.0
def __init__ (self, name, fileProcessingManager, fileProcessorDict, fileProcessorKeyList, fileProcessingQueue):
threading.Thread.__init__(self)
self.setName(name)
self.exitFlag = False
self.fileProcessingManager = fileProcessingManager
self.fileProcessorDict = fileProcessorDict
self.fileProcessorKeyList = fileProcessorKeyList
self.fileProcessingQueue = fileProcessingQueue
self.logger = LoggingManager.getInstance().getLogger(name)
def processFile(self, fileInfo):
if not len(self.fileProcessorKeyList):
return
filePath = fileInfo.get('filePath')
if not filePath:
self.logger.warn('Refusing to process empty file path')
return
try:
statusMonitor = fileInfo.get('statusMonitor')
if statusMonitor and statusMonitor.get('status') == dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.logger.debug('File %s processing is cancelled' % (filePath))
endProcessingTime = time.time()
statusMonitor.fileProcessingCancelled(filePath, endProcessingTime)
statusMonitor.updateStatus()
return
self.logger.debug('Starting to process file %s' % filePath)
startProcessingTime = fileInfo.get('startProcessingTime', time.time())
fileInfo['startProcessingTime'] = startProcessingTime
nProcessors = len(self.fileProcessorKeyList)
processedByDict = fileInfo.get('processedByDict', {})
fileInfo['processedByDict'] = processedByDict
skipPlugins = fileInfo.get('skipPlugins', [])
processorNumber = 0
for processorKey in self.fileProcessorKeyList:
processorNumber += 1
processor = self.fileProcessorDict.get(processorKey)
processorName = processor.name
if processorName in skipPlugins:
self.logger.debug('%s will be skipped by %s ' % (filePath, processorName))
continue
if processedByDict.has_key(processorName):
self.logger.debug('%s has already been processed by %s ' % (filePath, processorName))
continue
self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
try:
processor.processFile(fileInfo)
processedByDict[processorName] = True
self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
if processorNumber == nProcessors:
self.logger.debug('File %s processing is complete' % (filePath))
endProcessingTime = time.time()
if statusMonitor:
statusMonitor.fileProcessed(filePath, endProcessingTime)
statusMonitor.updateStatus()
except Exception, ex:
self.logger.exception(ex)
processingError = '%s processing error: %s' % (processorName, str(ex))
self.logger.debug(processingError)
processingDict = fileInfo.get('processingDict', {})
fileInfo['processingDict'] = processingDict
processorDict = processingDict.get(processorName, {})
processingDict[processorName] = processorDict
processorDict['lastError'] = str(ex)
nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries())
self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
if nRetriesLeft <= 0:
endProcessingTime = time.time()
if statusMonitor:
statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime)
statusMonitor.updateStatus()
self.logger.debug('No more %s retries left for file %s, remaining plugins will not process it' % (processorName, filePath))
return
else:
retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
self.logger.debug('%s may retry processing file %s after at least %s seconds' % (processorName, filePath, retryWaitPeriod))
self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
# Do not process this file further until
# this plugin is done
break
except Exception, ex:
self.logger.exception(ex)
def run(self):
self.logger.debug('Starting thread: %s' % self.getName())
while True:
self.fileProcessingManager.clearEvent()
if self.exitFlag:
self.logger.debug('Exit flag is set')
break
while True:
try:
fileInfo = self.fileProcessingQueue.pop()
if fileInfo is None:
break
self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength())
self.processFile(fileInfo)
except Exception, ex:
self.logger.exception(ex)
self.fileProcessingManager.waitOnEvent(self.THREAD_EVENT_TIMEOUT_IN_SECONDS)
self.logger.debug('%s is done' % self.getName())
def stop(self):
self.exitFlag = True
####################################################################
# Testing
if __name__ == '__main__':
pass
#!/usr/bin/env python
import abc
class FileProcessor:
DEFAULT_NUMBER_OF_RETRIES = 0
DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS = 60
def __init__(self, dependsOn=[]):
self.configDict = {}
self.processorName = self.__class__.__name__
self.dependsOn = dependsOn
self.statUtility = None
@abc.abstractmethod
def processDirectory(self, directoryInfo):
return NotImplemented
@abc.abstractmethod
def processFile(self, fileInfo):
return NotImplemented
def replaceTemplates(self, inputString, fileInfo):
experimentName = fileInfo.get('experimentName', '')
outputString = inputString.replace('EXPERIMENT_NAME', experimentName)
return outputString
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
return {}
def configure(self):
# Use this method for processor configuration
pass
def setConfigKeyValue(self, key, value):
self.configDict[key] = value
def getConfigKeyValue(self, key):
return self.configDict.get(key)
def setNumberOfRetries(self, nRetries):
self.configDict['numberOfRetries'] = nRetries
def getNumberOfRetries(self):
return self.configDict.get('numberOfRetries', self.DEFAULT_NUMBER_OF_RETRIES)
def setRetryWaitPeriodInSeconds(self, waitPeriod):
self.configDict['retryWaitPeriodInSeconds'] = waitPeriod
def getRetryWaitPeriodInSeconds(self):
return self.configDict.get('retryWaitPeriodInSeconds', self.DEFAULT_RETRY_WAIT_PERIOD_IN_SECONDS)
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.utility.osUtility import OsUtility
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
def __init__(self, command, src=None, dest=None, dependsOn=[]):
FileProcessor.__init__(self, dependsOn)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def checkUploadFilesForProcessing(self, filePaths, uploadInfo):
pass
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
def getSrcUrl(self, filePath, dataDirectory):
srcUrl = '%s/./%s' % (dataDirectory, os.path.relpath(filePath, dataDirectory))
return srcUrl
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
return destUrl
def getSrcDirUrl(self, dataDirectory):
return '%s/' % dataDirectory
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = '%s:%s/' % (storageHost, storageDirectory)
return destUrl
def getFullCommand(self, src, dest, command=None):
if command:
return '%s "%s" "%s"' % (command, src, dest)
else:
return '%s "%s" "%s"' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None, command=None, templateInfo={}, cwd=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
fileDest = dest
if dest is None:
fileDest = self.dest
# If destination is local, attempt to create it
if self.dest is not None and self.dest.find(':') < 0:
destDir = self.replaceTemplates(self.dest, templateInfo)
try:
OsUtility.createDir(destDir)
except Exception, ex:
self.logger.warn('Transfer may fail due to failure to create destination directory %s: %s' % (destDir, str(ex)))
fileSrc = self.replaceTemplates(fileSrc, templateInfo)
fileDest = self.replaceTemplates(fileDest, templateInfo)
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest, command), cwd=cwd)
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import os
import copy
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.fileUtility import FileUtility
from dm.common.utility.ftpUtility import FtpUtility
from dm.common.utility.sftpUtility import SftpUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class GridftpFileTransferPlugin(FileTransferPlugin):
#DEFAULT_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 3 -sync -sync-level 2'
#DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -c -cd -r -tcp-bs 512K -p 8 -sync -sync-level 2'
DEFAULT_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8'
DIRECTORY_TRANSFER_COMMAND = 'globus-url-copy -r -cd -sync -sync-level 2 -fast -bs 2M -tcp-bs 2M -p 8 -cc 8'
DEFAULT_PORT = 2811
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, directoryTransferCommand=DIRECTORY_TRANSFER_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.directoryTransferCommand = directoryTransferCommand
self.pluginMustProcessFiles = pluginMustProcessFiles
def replaceSpecialCharacters(self, url):
replacementMap = {
'#' : '%23',
' ' : '%20',
'~' : '%7E',
}
for (original, replacement) in replacementMap.items():
url = url.replace(original,replacement)
return url
def getSrcUrl(self, filePath, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s://%s:%s/%s' % (scheme, host, port, filePath)
elif self.src is None:
srcUrl = 'file://%s' % filePath
else:
srcUrl = '%s/%s' % (self.src,filePath)
return self.replaceSpecialCharacters(srcUrl)
def getDestUrl(self, filePath, dataDirectory, storageHost, storageDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
dirName = os.path.dirname(os.path.relpath(filePath, dirPath)).strip()
fileName = os.path.basename(filePath)
if self.dest:
destUrl = '%s/%s/%s' % (self.dest, dirName, fileName)
else:
destUrl = 'sshftp://%s/%s/%s/%s' % (storageHost, storageDirectory, dirName, fileName)
return self.replaceSpecialCharacters(destUrl)
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s' % uploadInfo)
# Original data directory may contain host/port
(scheme, host, port, replacementDirPath) = FtpUtility.parseFtpUrl(dataDirectory)
self.logger.debug('Replacement dir path: %s' % replacementDirPath)
self.logger.debug('Number of original files: %s' % len(filePathsDict))
self.logger.debug('Looking for existing files in %s' % storageDirectory)
ftpUtility = SftpUtility(storageHost)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {}, replacementDirPath)
self.logger.debug('There are %s files in %s' % (len(storageFilePathsDict), storageDirectory))
# Remove file from plugin dict if we do not need to transfer it
for (filePath,storageFilePathDict) in storageFilePathsDict.items():
filePathDict = filePathsDict.get(filePath)
if filePathDict is None:
# We are not attempting to transfer this file
# No need to change plugin file dict
continue
# Check size
fSize = filePathDict.get('fileSize')
sfSize = storageFilePathDict.get('fileSize')
if fSize is None or sfSize is None or fSize != sfSize:
# Sizes differ, need to transfer file
continue
# Sizes are the same, check modify time
mTime = filePathDict.get('fileModificationTime')
smTime = storageFilePathDict.get('fileModificationTime')
if not mTime or not smTime or mTime > smTime:
# Source time is later than storage time, need to transfer file
continue
# No need to transfer file
del filePathsDict[filePath]
self.logger.debug('Number of files that require processing: %s' % len(filePathsDict))
return filePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
statUtility = self.statUtility
if not statUtility:
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(srcUrl, defaultPort=self.DEFAULT_PORT)
statUtility = FtpUtility(host, port)
if not fileInfo.get('fileSize'):
statUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
statUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.localMd5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
#if self.deleteOriginal:
# self.logger.debug('Deleting file %s' % filePath)
# OsUtility.removeFile(srcUrl)
def getSrcDirUrl(self, dataDirectory):
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
if scheme:
srcUrl = '%s/' % (dataDirectory)
elif self.src is None:
srcUrl = 'file://%s/' % (dataDirectory)
else:
srcUrl = '%s/%s/' % (self.src,dataDirectory)
return srcUrl
def getDestDirUrl(self, dataDirectory, storageHost, storageDirectory):
if self.dest:
destUrl = '%s/' % (self.dest)
else:
destUrl = 'sshftp://%s/%s/' % (storageHost, storageDirectory)
return destUrl
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.directoryTransferCommand, templateInfo=uploadInfo)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = GridftpFileTransferPlugin('/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()
#!/usr/bin/env python
import os
import time
from dm.common.constants import dmProcessingStatus
from dm.common.utility.loggingManager import LoggingManager
from dm.common.objects.observedFile import ObservedFile
from dm.common.utility.timeUtility import TimeUtility
from dm.common.processing.plugins.fileProcessor import FileProcessor
from dm.common.mongodb.api.fileMongoDbApi import FileMongoDbApi
from dm.common.utility.dmSubprocess import DmSubprocess
class MongoDbFileCatalogPlugin(FileProcessor):
DEFAULT_HDF5_METADATA_COMMAND = None
def __init__(self, hdf5MetadataCommand=DEFAULT_HDF5_METADATA_COMMAND, md5Sum=False, dependsOn=[]):
FileProcessor.__init__(self, dependsOn=dependsOn)
self.fileMongoDbApi = FileMongoDbApi()
self.hdf5MetadataCommand = hdf5MetadataCommand
self.md5Sum = md5Sum
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
def processStat(self, filePath, fileInfo={}):
if not self.statUtility:
return fileInfo
if not fileInfo.has_key('fileSize'):
self.statUtility.statFile(filePath)
return fileInfo
def processMd5Sum(self, filePath, fileInfo={}):
if not self.md5Sum or not self.statUtility:
return fileInfo
if not fileInfo.has_key('md5Sum'):
fileInfo['md5Sum'] = self.statUtility.getMd5Sum(filePath)
return fileInfo
def processHdf5Metadata(self, filePath, fileInfo={}):
if not self.hdf5MetadataCommand:
return fileInfo
experimentName = fileInfo.get('experimentName', '')
if not filePath.endswith('.h5'):
return fileInfo
command = '%s %s' % (self.hdf5MetadataCommand, filePath)
subprocess = DmSubprocess.getSubprocess(command)
subprocess.run()
stdout = subprocess.getStdOut().replace('\n', ';')
parts = stdout.split(';')
for part in parts:
keyValue = part.split('=')
key = keyValue[0]
if not len(key):
continue
value = ''
if len(keyValue) > 1:
value = '='.join(keyValue[1:])
if not fileInfo.has_key(key):
fileInfo[key] = value
else:
self.logger.warn('Key %s already exists for file %s (experiment: %s)' % (key, filePath, experimentName))
return fileInfo
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
self.logger.debug('Processing file "%s" for experiment %s' % (experimentFilePath, experimentName))
daqInfo = fileInfo.get('daqInfo')
storageDirectory = daqInfo.get('storageDirectory')
storageHost = daqInfo.get('storageHost')
storageUrl = daqInfo.get('storageUrl')
storageFilePath = os.path.join(storageDirectory, experimentFilePath)
fileInfo['fileProcessingTime'] = time.time()
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']:
if fileInfo.has_key(key):
t = fileInfo.get(key, 0)
fileInfo2[key] = t
key2 = '%sstamp' % key
fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t)
fileLocations = fileInfo.get('fileLocations', [])
fileLocations.append('%s/%s' % (storageUrl, experimentFilePath))
fileInfo2['fileName'] = os.path.basename(experimentFilePath)
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileInfo2['fileLocations'] = fileLocations
self.logger.debug('Daq info: %s' % (daqInfo))
fileInfo2.update(daqInfo)
if daqInfo.has_key('id'):
fileInfo2['daqId'] = daqInfo.get('id')
del fileInfo2['id']
for key in ['storageDirectory', 'storageUrl', 'storageHost']:
if fileInfo2.has_key(key):
del fileInfo2[key]
self.processStat(filePath, fileInfo2)
self.processHdf5Metadata(filePath, fileInfo2)
self.processMd5Sum(filePath, fileInfo2)
self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
daqInfo = directoryInfo.get('daqInfo')
experiment = directoryInfo.get('experiment')
filePathsDict = directoryInfo.get('filePathsDict')
uploadId = uploadInfo.get('id')
dataDirectory = uploadInfo.get('dataDirectory')
nProcessedFiles = 0
nFiles = len(filePathsDict)
for (filePath,filePathDict) in filePathsDict.items():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo.update(filePathDict)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
if uploadInfo.get('status') != dmProcessingStatus.DM_PROCESSING_STATUS_ABORTING:
self.processFile(fileInfo)
nProcessedFiles += 1
else:
nCancelledFiles = nFiles - nProcessedFiles
self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
processingInfo = uploadInfo.get('processingInfo')
processingInfo[self.name]['status'] = 'aborted'
break
#######################################################################
# Testing.
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
from fileTransferPlugin import FileTransferPlugin
from dm.common.utility.osUtility import OsUtility
from dm.common.utility.fileUtility import FileUtility
from dm.common.exceptions.fileProcessingError import FileProcessingError
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.ds_web_service.api.dsRestApiFactory import DsRestApiFactory
class RsyncFileTransferPlugin(FileTransferPlugin):
DEFAULT_COMMAND = 'rsync -arvlPR --'
DIRECTORY_TRANSFER_COMMAND = 'rsync -arvlP --'
DRY_RUN_COMMAND = 'rsync -arvlP --dry-run --'
def __init__(self, src=None, dest=None, command=DEFAULT_COMMAND, localMd5Sum=True, remoteMd5Sum=False, deleteOriginal=False, pluginMustProcessFiles=True, dependsOn=[]):
FileTransferPlugin.__init__(self, command, src, dest, dependsOn=dependsOn)
self.dsFileApi = DsRestApiFactory.getFileRestApi()
self.localMd5Sum = localMd5Sum
self.remoteMd5Sum = remoteMd5Sum
self.deleteOriginal = deleteOriginal
self.pluginMustProcessFiles = pluginMustProcessFiles
def checkUploadFilesForProcessing(self, filePathsDict, uploadInfo):
if not self.pluginMustProcessFiles:
return {}
storageDirectory = uploadInfo['storageDirectory']
storageHost = uploadInfo['storageHost']
dataDirectory = uploadInfo['dataDirectory']
dryRunCommand = '%s %s/ %s:%s' % (self.DRY_RUN_COMMAND, dataDirectory, storageHost, storageDirectory)
subprocess = DmSubprocess.getSubprocess(dryRunCommand)
subprocess.run()
lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {}
pathBase = dataDirectory
for line in lines:
if line.endswith(os.sep):
continue
filePath = os.path.join(pathBase, line)
filePathDict = filePathsDict.get(filePath)
if filePathDict:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s' % (len(filePathsDict), len(pluginFilePathsDict)))
return pluginFilePathsDict
def processFile(self, fileInfo):
filePath = fileInfo.get('filePath')
dataDirectory = fileInfo.get('dataDirectory')
experimentFilePath = fileInfo.get('experimentFilePath')
experimentName = fileInfo.get('experimentName')
storageHost = fileInfo.get('storageHost')
storageDirectory = fileInfo.get('storageDirectory')
destUrl = self.getDestUrl(filePath, dataDirectory, storageHost, storageDirectory)
# Use relative path with respect to data directory as a source
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
if not fileInfo.get('fileSize'):
FileUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, templateInfo=fileInfo, cwd=dataDirectory)
# Get remote checksum
if self.remoteMd5Sum:
fileInfo2 = {}
fileInfo2['experimentFilePath'] = experimentFilePath
fileInfo2['experimentName'] = experimentName
fileMetadata = self.dsFileApi.statFile(experimentFilePath, experimentName, fileInfo2)
# If we have not done md5 locally, update file info
if not self.localMd5Sum:
fileInfo['md5Sum'] = fileMetadata.get('md5Sum')
# Verify checksum
if fileMetadata.get('md5Sum') != fileInfo.get('md5Sum'):
self.logger.error('Checksum mismatch for file: %s' % filePath)
raise FileProcessingError('Checksum mismatch detected for file: %s' % filePath)
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
OsUtility.removeFile(filePath)
def processDirectory(self, directoryInfo):
uploadInfo = directoryInfo.get('uploadInfo')
dataDirectory = uploadInfo.get('dataDirectory')
experimentName = uploadInfo.get('experimentName')
storageHost = uploadInfo.get('storageHost')
storageDirectory = uploadInfo.get('storageDirectory')
destUrl = self.getDestDirUrl(dataDirectory, storageHost, storageDirectory)
srcUrl = self.getSrcDirUrl(dataDirectory)
# Transfer directory
self.logger.debug('Starting transfer: %s -> %s' % (srcUrl, destUrl))
self.start(src=srcUrl, dest=destUrl, command=self.DIRECTORY_TRANSFER_COMMAND, templateInfo=uploadInfo, cwd=dataDirectory)
#######################################################################
# Testing.
if __name__ == '__main__':
ft = RsyncFileTransferPlugin(command='rsync -arvlPR --exclude "*.svn" --exclude "*.pyc"')
ft.checkUploadFilesForProcessing(['/home/sveseli/Work/DM/dev/src/python/dm/common/processing/plugins/fileProcessor.py'], {'storageDirectory' : '/opt/DM/dev', 'storageHost' : 'dm@dmstorage', 'dataDirectory' : '/home/sveseli/Work/DM/dev/src'})