Skip to content
Snippets Groups Projects
Commit d98298f4 authored by sveseli's avatar sveseli
Browse files

merged from 0.8

parents 377ecd0e 396d2118
No related branches found
No related tags found
No related merge requests found
__version__ = "0.8 (2016.01.25)"
__version__ = "0.8 (2016.01.26)"
......@@ -42,6 +42,7 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
storageHost = uploadInfo['storageHost']
storageDirectory = uploadInfo['storageDirectory']
dataDirectory = uploadInfo['dataDirectory']
self.logger.debug('Upload info: %s', uploadInfo)
#(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(storageUrl, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(storageHost, self.DEFAULT_PORT)
storageFilePathsDict = ftpUtility.getFiles(storageDirectory, {})
......@@ -83,9 +84,11 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
(scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
ftpUtility = FtpUtility(host, port)
ftpUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
ftpUtility.getMd5Sum(filePath, fileInfo)
# Transfer file
self.logger.debug('Starting transfer: %s' % fileInfo)
......@@ -108,9 +111,9 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
self.logger.debug('Checksum test passed for file %s' % filePath)
# Remove file
if self.deleteOriginal:
self.logger.debug('Deleting file %s' % filePath)
OsUtility.removeFile(srcUrl)
#if self.deleteOriginal:
# self.logger.debug('Deleting file %s' % filePath)
# OsUtility.removeFile(srcUrl)
#######################################################################
# Testing.
......
......@@ -29,7 +29,7 @@ class MongoDbFileCatalogPlugin(FileProcessor):
# Prepare catalogging entry
fileInfo2 = {}
for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp']:
for key in ['md5Sum', 'fileSize', 'fileCreationTime', 'fileCreationTimeStamp', 'fileModificationTime', 'fileModificationTimeStamp']:
if fileInfo.has_key(key):
fileInfo2[key] = fileInfo.get(key, '')
fileLocations = fileInfo.get('fileLocations', [])
......
......@@ -54,6 +54,7 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
srcUrl = self.getSrcUrl(filePath, dataDirectory)
# Calculate checksum
FileUtility.statFile(filePath, fileInfo)
if self.localMd5Sum:
FileUtility.statFile(filePath, fileInfo)
FileUtility.getMd5Sum(filePath, fileInfo)
......
#!/usr/bin/env python
import copy
import os
import time
from ftplib import FTP
from dm.common.utility.timeUtility import TimeUtility
from dm.common.utility.loggingManager import LoggingManager
import urlparse
......@@ -13,6 +16,7 @@ class FtpUtility:
self.username = username
self.password = password
self.ftpClient = None
self.mlsdFileStatDict = {}
@classmethod
def parseFtpUrl(cls, url, defaultHost=None, defaultPort=None):
......@@ -64,6 +68,16 @@ class FtpUtility:
elif type == 'file':
self.mlsdFileDict[name] = parseDict
def __parseMlsdFileStat(self, line):
# ['Type=dir', 'Modify=20151018024430', 'Size=4096', 'Perm=el', 'UNIX.mode=0775', 'UNIX.owner=sveseli', 'UNIX.uid=500', 'UNIX.group=sveseli', 'UNIX.gid=500', 'Unique=fd00-c2e3e', ' dir2\r']
parts = line.split(';')
parseDict = {}
self.__parseKeyValue(parts[0], parseDict)
self.__parseKeyValue(parts[1], parseDict)
self.__parseKeyValue(parts[2], parseDict)
name = parts[-1].strip()
self.mlsdFileStatDict[name] = parseDict
def getFiles(self, dirPath, fileDict={}):
if not self.ftpClient:
self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password)
......@@ -79,10 +93,35 @@ class FtpUtility:
self.getFiles(dirPath2,fileDict)
return fileDict
def getMd5Sum(self, filePath, fileInfo={}):
if not self.ftpClient:
self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password)
md5Sum = self.ftpClient.sendcmd('CKSM MD5 0 -1 %s' % filePath).split()[-1]
fileInfo['md5Sum'] = md5Sum
return md5Sum
def statFile(self, filePath, fileInfo={}):
fileName = os.path.basename(filePath)
if not self.ftpClient:
self.ftpClient = self.getFtpClient(self.host, self.port, self.username, self.password)
# Need this to be class members for the callback function
self.ftpClient.retrlines('MLSD %s' % filePath, self.__parseMlsdFileStat)
fileStatDict = self.mlsdFileStatDict.get(fileName)
if fileStatDict:
fileInfo['fileSize'] = fileStatDict.get('Size')
modifyTime = fileStatDict.get('Modify')
modifyTime = time.mktime(time.strptime(modifyTime, '%Y%m%d%H%M%S'))
fileInfo['fileModificationTime'] = modifyTime
fileInfo['fileModificationTimeStamp'] = TimeUtility.formatLocalTimeStamp(modifyTime)
del self.mlsdFileStatDict[fileName]
return fileInfo
#######################################################################
# Testing.
if __name__ == '__main__':
ftpUtility = FtpUtility('dmstorage', 2811)
files=ftpUtility.getFiles('/opt/DM/data/ESAF/e1')
ftpUtility = FtpUtility('s8dserv', 2811)
files = ftpUtility.getFiles('/export/8-id-i/test')
print files
print ftpUtility.getMd5Sum('/export/8-id-i/test/testfile01')
print ftpUtility.statFile('/export/8-id-i/test/testfile01')
......@@ -27,7 +27,7 @@ class ExperimentSessionController(DmSessionController):
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith(os.sep):
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
......@@ -70,7 +70,7 @@ class ExperimentSessionController(DmSessionController):
if not dataDirectory:
raise InvalidRequest('Missing data directory.')
dataDirectory = Encoder.decode(dataDirectory)
if not dataDirectory.startswith(os.sep):
if not dataDirectory.startswith('/') and not dataDirectory.count('://'):
raise InvalidRequest('Data directory must be an absolute path.')
daqInfo = {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment