Skip to content
Snippets Groups Projects
Commit 4c37150b authored by sveseli's avatar sveseli
Browse files

merged fixes from 0.8

parents e5f6b8dd 4d37fa61
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import copy
import time
from dmObject import DmObject
from dm.common.utility.dictUtility import DictUtility
......@@ -15,6 +16,7 @@ class DaqInfo(DmObject):
self['fileDict'] = self.get('fileDict', {})
def updateStatus(self):
now = time.time()
daqStatus = self.get('status', 'running')
if daqStatus == 'done':
return
......@@ -49,6 +51,9 @@ class DaqInfo(DmObject):
self['percentageProcessed'] = '%.2f' % percentageProcessed
self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
runTime = now - self.get('startTime')
self['runTime'] = runTime
if self.get('endTime'):
daqStatus = 'done'
self['runTime'] = self.get('endTime') - self.get('startTime')
......
......@@ -14,11 +14,12 @@ class UploadInfo(DmObject):
self['fileDict'] = self.get('fileDict', {})
def updateStatus(self):
now = time.time()
uploadStatus = self.get('status', 'running')
if uploadStatus == 'done':
return
fileDict = self.get('fileDict')
nFiles = len(fileDict)
nFiles = self.get('nFiles', len(fileDict))
nProcessedFiles = 0
nProcessingErrors = 0
processingErrors = {}
......@@ -36,22 +37,26 @@ class UploadInfo(DmObject):
if len(processingErrors):
self['processingErrors'] = processingErrors
startTime = self.get('startTime')
if startTime:
runTime = now - startTime
self['runTime'] = runTime
# need to handle 'failed' uploads
nCompletedFiles = nProcessedFiles+nProcessingErrors
if nCompletedFiles == nFiles:
uploadStatus = 'done'
if not endTime:
endTime = time.time()
endTime = now
self['endTime'] = endTime
self['endTimestamp'] = TimeUtility.formatLocalTimeStamp(endTime)
startTime = self.get('startTime')
if startTime:
runTime = endTime - startTime
self['runTime'] = runTime
self['status'] = uploadStatus
self['nProcessedFiles'] = '%s' % (nProcessedFiles)
self['nProcessingErrors'] = '%s' % (nProcessingErrors)
self['nFiles'] = '%s' % (nFiles)
#self['nFiles'] = '%s' % (nFiles)
percentageComplete = 100.0
percentageProcessed = 100.0
......
......@@ -31,13 +31,13 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
lines = subprocess.getStdOut().split('\n')
pluginFilePathsDict = {}
pathBase = dataDirectory
filePaths = filePathsDict.keys()
for line in lines:
if line.endswith(os.sep):
continue
filePath = os.path.join(pathBase, line)
if filePath in filePaths:
pluginFilePathsDict[filePath] = filePathsDict.get(filePath)
filePathDict = filePathsDict.get(filePath)
if filePathDict:
pluginFilePathsDict[filePath] = filePathDict
self.logger.debug('Number of original files: %s, number of plugin files: %s', len(filePathsDict), len(pluginFilePathsDict))
return pluginFilePathsDict
......
......@@ -6,6 +6,22 @@ class DictUtility:
@classmethod
def deepCopy(cls, dict, includeKeys=[], excludeKeys=[]):
dict2 = {}
if len(includeKeys):
for key in includeKeys:
value = dict.get(key)
if value is not None:
dict2[key] = copy.deepcopy(value)
elif len(excludeKeys):
for key in dict.keys():
if key not in excludeKeys:
dict2[key] = copy.deepcopy(dict[key])
else:
dict2 = copy.deepcopy(dict)
return dict2
@classmethod
def deepCopy2(cls, dict, includeKeys=[], excludeKeys=[]):
dict2 = copy.deepcopy(dict)
if len(includeKeys):
for key in dict2.keys():
......
......@@ -8,6 +8,7 @@ import os
import time
import uuid
import copy
import threading
from dm.common.objects.experiment import Experiment
from dm.common.objects.dmObjectManager import DmObjectManager
......@@ -28,6 +29,8 @@ from fileSystemObserver import FileSystemObserver
class ExperimentSessionControllerImpl(DmObjectManager):
""" Experiment session controller implementation class. """
UPLOAD_DELAY_IN_SECONDS = 1.0
def __init__(self):
DmObjectManager.__init__(self)
self.dsExperimentApi = DsRestApiFactory.getExperimentRestApi()
......@@ -58,6 +61,8 @@ class ExperimentSessionControllerImpl(DmObjectManager):
def upload(self, experimentName, dataDirectory, daqInfo, includeFileDetails=False):
experiment = self.dsExperimentApi.getExperimentByName(experimentName)
UploadTracker.getInstance().checkForActiveUpload(experiment, dataDirectory)
experiment['daqInfo'] = daqInfo
storageDirectory = experiment.get('storageDirectory')
if storageDirectory is None:
......@@ -91,24 +96,31 @@ class ExperimentSessionControllerImpl(DmObjectManager):
raise InvalidRequest('There are no new files for upload in directory %s.' % dataDirectory)
fileDict = {}
#UploadTracker.getInstance().put(uploadId, uploadInfo)
UploadTracker.getInstance().startUpload(uploadId, uploadInfo)
uploadInfo['fileDict'] = fileDict
uploadInfo['nFiles'] = len(filePathsDict)
timer = threading.Timer(self.UPLOAD_DELAY_IN_SECONDS, self.prepareUpload, args=[uploadInfo, daqInfo, experiment, filePathsDict])
timer.start()
return uploadInfo
def prepareUpload(self, uploadInfo, daqInfo, experiment, filePathsDict):
uploadId = uploadInfo.get('id')
self.logger.debug('Preparing upload id: %s' % uploadId)
fileDict = uploadInfo.get('fileDict')
dataDirectory = uploadInfo.get('dataDirectory')
fileProcessingManager = FileProcessingManager.getInstance()
for filePath in filePathsDict.keys():
fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
fileInfo['daqInfo'] = daqInfo
fileInfo['uploadId'] = uploadId
fileUploadInfo = { 'processed' : False }
# Stat should be done by agent, not by observer.
try:
FileUtility.statFile(filePath, fileUploadInfo)
except:
# Ok, may be remote file
pass
fileDict[filePath] = fileUploadInfo
fileProcessingManager.processFile(fileInfo)
uploadInfo['fileDict'] = fileDict
#self.logger.debug('Upload info %s' % uploadInfo)
UploadTracker.getInstance().put(uploadId, uploadInfo)
return uploadInfo.scrub(includeFileDetails)
try:
fileProcessingManager.processFile(fileInfo)
except Exception, ex:
self.logger.error('Processing error: %s', ex)
self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
def getUploadInfo(self, id, includeFileDetails=False):
uploadInfo = UploadTracker.getInstance().get(id)
......
#!/usr/bin/env python
import os
from dm.common.objects.uploadInfo import UploadInfo
from dm.common.utility.objectTracker import ObjectTracker
from dm.common.exceptions.objectAlreadyExists import ObjectAlreadyExists
class UploadTracker(ObjectTracker):
......@@ -9,6 +11,30 @@ class UploadTracker(ObjectTracker):
objectClass = UploadInfo
cacheSize = 100
def __init__(self, *args, **kwargs):
ObjectTracker.__init__(self, args, kwargs)
self.activeUploadDict = {}
def checkForActiveUpload(self, experiment, dataDirectory):
experimentName = experiment.get('name')
dataDir = os.path.normpath(dataDirectory)
activeUploadKey = experimentName + dataDir
uploadId = self.activeUploadDict.get(activeUploadKey)
if uploadId:
uploadInfo = self.get(uploadId)
if uploadInfo is not None:
if uploadInfo.get('status') == 'running':
raise ObjectAlreadyExists('Upload id %s is already active for experiment %s in data directory %s.' % (uploadId, experimentName, dataDir))
del self.activeUploadDict[activeUploadKey]
def startUpload(self, uploadId, uploadInfo):
experimentName = uploadInfo.get('experimentName')
dataDirectory = uploadInfo.get('dataDirectory')
dataDir = os.path.normpath(dataDirectory)
activeUploadKey = experimentName + dataDir
self.activeUploadDict[activeUploadKey] = uploadId
self.put(uploadId, uploadInfo)
####################################################################
# Testing
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment