Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
import os
from dm.common.utility.loggingManager import LoggingManager
from dm.common.utility.dmSubprocess import DmSubprocess
from dm.common.exceptions.invalidArgument import InvalidArgument
from dm.common.exceptions.invalidRequest import InvalidRequest
from fileProcessor import FileProcessor
class FileTransferPlugin(FileProcessor):
def __init__(self, command, src=None, dest=None):
FileProcessor.__init__(self)
self.src = src
self.dest = dest
self.logger = LoggingManager.getInstance().getLogger(self.__class__.__name__)
if command is None or not len(command):
raise InvalidArgument('File transfer command must be non-empty string.')
self.command = command
self.subprocess = None
def processFile(self, filePath, daqPath, experiment):
storageHost = experiment.get('storageHost')
storageDirectory = experiment.get('storageDirectory')
dest = '%s:%s' % (storageHost, storageDirectory)
# Use relative path with respect to daq directory as a source
os.chdir(daqPath)
src = os.path.relpath(filePath, daqPath)
self.start(src, dest)
def getFullCommand(self, src, dest):
return '%s %s %s' % (self.command, src, dest)
def setSrc(self, src):
self.src = src
def setDest(self, dest):
self.dest = dest
def start(self, src=None, dest=None):
# Use preconfigured source if provided source is None
fileSrc = src
if src is None:
fileSrc = self.src
# Use provided destination only if preconfigured destination is None
# Plugins may have desired destination preconfigured for all files
fileDest = self.dest
if self.dest is None:
fileDest = dest
if not fileSrc or not fileDest:
raise InvalidRequest('Both source and destination must be non-empty strings.')
self.subprocess = DmSubprocess.getSubprocess(self.getFullCommand(fileSrc, fileDest))
return self.subprocess.run()
def wait(self):
if self.subprocess:
return self.subprocess.wait()
return None
def poll(self):
if self.subprocess:
return self.subprocess.poll()
return None
def getStdOut(self):
if self.subprocess:
return self.subprocess.getStdOut()
return None
def getStdErr(self):
if self.subprocess:
return self.subprocess.getStdErr()
return None
def getExitStatus(self):
if self.subprocess:
return self.subprocess.getExitStatus()
return None
def reset(self):
self.subprocess = None
#######################################################################
# Testing.
if __name__ == '__main__':
ft = FileTransfer('rsync -arv', '/tmp/xyz', '/tmp/xyz2')
ft.start()
print 'StdOut: ', ft.getStdOut()
print 'StdErr: ', ft.getStdErr()
print 'Exit Status: ', ft.getExitStatus()