diff --git a/bin/dm-stop-upload b/bin/dm-stop-upload
new file mode 100755
index 0000000000000000000000000000000000000000..c018a7e5fded250297e66164e5877a5b86ecef8b
--- /dev/null
+++ b/bin/dm-stop-upload
@@ -0,0 +1,17 @@
+#!/bin/sh
+
+# Run command
+
+if [ -z $DM_ROOT_DIR ]; then
+    cd `dirname $0` && myDir=`pwd`
+    setupFile=$myDir/../setup.sh
+    if [ ! -f $setupFile ]; then
+        echo "Cannot find setup file: $setupFile"
+        exit 1
+    fi
+    source $setupFile > /dev/null
+fi
+
+$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/stopUploadCli.py $@
+
+
diff --git a/bin/dm-upload b/bin/dm-upload
index 909be4e6be4f99af82c2b11a0bf830d57f5d1115..7c1c14d2e78ce0798c5159359bf63f1118c551a7 100755
--- a/bin/dm-upload
+++ b/bin/dm-upload
@@ -11,7 +11,8 @@ if [ -z $DM_ROOT_DIR ]; then
     fi
     source $setupFile > /dev/null
 fi
+source dm_command_setup.sh
 
-$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/uploadCli.py $@
+eval "$DM_ROOT_DIR/src/python/dm/daq_web_service/cli/uploadCli.py $DM_COMMAND_ARGS"
 
 
diff --git a/doc/RELEASE_NOTES.txt b/doc/RELEASE_NOTES.txt
index b400025ab6d99e7856c68b95950017cf85547e43..ae985c9350f51dd249613b5cb98fd87b29e3daaa 100644
--- a/doc/RELEASE_NOTES.txt
+++ b/doc/RELEASE_NOTES.txt
@@ -5,6 +5,7 @@ Release 0.8 (01/26/2016)
   processed; for uploads system can detect files that had been processed 
   already; improved handling and reporting of processing errors)
 - Source file checksum is calculated for rsync/gridftp plugins by default
+- Added dm-stop-upload command
 - Resolved globus online user authorization delay issue 
 
 Release 0.7 (12/08/2015)
diff --git a/src/python/dm/common/objects/daqInfo.py b/src/python/dm/common/objects/daqInfo.py
index 2e127fff03eb54c01185b5cbf60861357bf1c0a1..2a2af92776d35d8d861fcfaa84399e5d0035b6c3 100755
--- a/src/python/dm/common/objects/daqInfo.py
+++ b/src/python/dm/common/objects/daqInfo.py
@@ -29,10 +29,10 @@ class DaqInfo(DmObject):
         self.lock.acquire()
         try:
             self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
-            lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0)
-            if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime:
+            lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
+            if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
                 self['lastFileProcessed'] = filePath
-                self['lastFileProcessingEndTime'] = processingEndTime
+                self['lastFileProcessedTime'] = processingEndTime
         finally:
             self.lock.release()
 
@@ -43,9 +43,9 @@ class DaqInfo(DmObject):
             processingErrors[filePath] = processingError
             self['processingErrors'] = processingErrors
             self['nProcessingErrors'] = len(processingErrors)
-            lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0)
-            if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime:
-                self['lastFileProcessingEndTime'] = processingEndTime
+            lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
+            if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
+                self['lastFileProcessingErrorTime'] = processingEndTime
         finally:
             self.lock.release()
 
@@ -60,10 +60,9 @@ class DaqInfo(DmObject):
         processingErrors = self.get('processingErrors', {})
         nCompletedFiles = nProcessedFiles+nProcessingErrors
 
-        startTime = self.get('startTime')
-        if startTime:
-            runTime = now - startTime
-            self['runTime'] = runTime
+        startTime = self.get('startTime', now)
+        runTime = now - startTime
+        endTime = None
 
         percentageComplete = 100.0
         percentageProcessed = 100.0
@@ -76,18 +75,21 @@ class DaqInfo(DmObject):
         self['percentageProcessed'] = '%.2f' % percentageProcessed
         self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
 
-	runTime = now - self.get('startTime')
-        self['runTime'] = runTime
-
         if self.get('endTime') and nCompletedFiles == nFiles:
             daqStatus = 'done'
-            lastFileProcessingEndTime = self.get('lastFileProcessingEndTime')
-            if lastFileProcessingEndTime > self['endTime']:
-                self['endTime'] = lastFileProcessingEndTime 
+            lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
+            lastFileProcessedTime = self.get('lastFileProcessedTime')
+            endTime = lastFileProcessedTime
+            if not endTime or lastFileProcessingErrorTime > endTime:
+                endTime = lastFileProcessingErrorTime
+            if self.get('endTime') > endTime:
+                endTime = self.get('endTime')
+
+        if endTime:
+            runTime = endTime - startTime
+            self['endTime'] = endTime
             self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
-            if startTime:
-                runTime = endTime - startTime
-                self['runTime'] = runTime
+        self['runTime'] = runTime
         self['status'] = daqStatus
 
     def toDictWithOriginalKeys(self):
diff --git a/src/python/dm/common/objects/uploadInfo.py b/src/python/dm/common/objects/uploadInfo.py
index 21c9ba9198f9ebe70b6ba49b107a171f27681ea8..236d1a15079ff28ac16ac411c969ccca616ffeac 100755
--- a/src/python/dm/common/objects/uploadInfo.py
+++ b/src/python/dm/common/objects/uploadInfo.py
@@ -19,10 +19,10 @@ class UploadInfo(DmObject):
         self.lock.acquire()
         try:
             self['nProcessedFiles'] = self.get('nProcessedFiles', 0) + 1
-            lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0)
-            if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime:
+            lastFileProcessedTime = self.get('lastFileProcessedTime', 0)
+            if processingEndTime is not None and processingEndTime > lastFileProcessedTime:
                 self['lastFileProcessed'] = filePath
-                self['lastFileProcessingEndTime'] = processingEndTime
+                self['lastFileProcessedTime'] = processingEndTime
         finally:
             self.lock.release()
 
@@ -33,47 +33,76 @@ class UploadInfo(DmObject):
             processingErrors[filePath] = processingError
             self['processingErrors'] = processingErrors
             self['nProcessingErrors'] = len(processingErrors)
-            lastFileProcessingEndTime = self.get('lastFileProcessingEndTime', 0)
-            if processingEndTime is not None and processingEndTime > lastFileProcessingEndTime:
-                self['lastFileProcessingEndTime'] = processingEndTime
+            lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime', 0)
+            if processingEndTime is not None and processingEndTime > lastFileProcessingErrorTime:
+                self['lastFileProcessingErrorTime'] = processingEndTime
+        finally:
+            self.lock.release()
+
+    def fileProcessingCancelled(self, filePath, processingEndTime):
+        self.lock.acquire()
+        try:
+            self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + 1
+            lastFileProcessingCancelledTime = self.get('lastFileProcessingCancelledTime', 0)
+            if processingEndTime is not None and processingEndTime > lastFileProcessingCancelledTime:
+                self['lastFileProcessingCancelledTime'] = processingEndTime
+        finally:
+            self.lock.release()
+
+    def uploadAborted(self, nCancelledFiles):
+        self.lock.acquire()
+        try:
+            self['nCancelledFiles'] = self.get('nCancelledFiles', 0) + nCancelledFiles
         finally:
             self.lock.release()
 
     def updateStatus(self):
         now = time.time()
         uploadStatus = self.get('status', 'running')
-        if uploadStatus == 'done':
+        if uploadStatus == 'done' or uploadStatus == 'aborted':
             return 
         nFiles = self.get('nFiles', 0)
         nProcessedFiles = self.get('nProcessedFiles', 0)
+        nCancelledFiles = self.get('nCancelledFiles', 0)
         nProcessingErrors = self.get('nProcessingErrors', 0)
         processingErrors = self.get('processingErrors', {})
         nCompletedFiles = nProcessedFiles+nProcessingErrors
 
-        startTime = self.get('startTime')
-        if startTime:
-            runTime = now - startTime
-            self['runTime'] = runTime
-
+        startTime = self.get('startTime', now)
+        runTime = now - startTime
+        endTime = None
         if nCompletedFiles == nFiles:
             uploadStatus = 'done'
-            endTime = self.get('lastFileProcessingEndTime', now)
+            lastFileProcessingErrorTime = self.get('lastFileProcessingErrorTime')
+            lastFileProcessedTime = self.get('lastFileProcessedTime')
+            endTime = lastFileProcessedTime
+            if not endTime or lastFileProcessingErrorTime > endTime:
+                endTime = lastFileProcessingErrorTime
+
+        if nCancelledFiles > 0 and nCancelledFiles+nCompletedFiles == nFiles:
+            uploadStatus = 'aborted'
+            endTime = self.get('lastFileProcessingCancelledTime', now)
+
+        if endTime:
+            runTime = endTime - startTime
             self['endTime'] = endTime
             self['endTimestamp'] = TimeUtility.formatLocalTimestamp(endTime)
-            if startTime:
-                runTime = endTime - startTime
-                self['runTime'] = runTime
+        self['runTime'] = runTime
         self['status'] = uploadStatus
 
         percentageComplete = 100.0
         percentageProcessed = 100.0
+        percentageCancelled = 0.0
         percentageProcessingErrors = 0.0
         if nFiles > 0:
-             percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
-             percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
-             percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
+            percentageComplete = float(nCompletedFiles)/float(nFiles)*100.0
+            percentageProcessed = float(nProcessedFiles)/float(nFiles)*100.0
+            percentageCancelled = float(nCancelledFiles)/float(nFiles)*100.0
+            percentageProcessingErrors = float(nProcessingErrors)/float(nFiles)*100.0
         self['percentageComplete'] = '%.2f' % percentageComplete
         self['percentageProcessed'] = '%.2f' % percentageProcessed
         self['percentageProcessingErrors'] = '%.2f' % percentageProcessingErrors
+        if nCancelledFiles > 0:
+            self['percentageCancelled'] = '%.2f' % percentageCancelled
 
 
diff --git a/src/python/dm/common/processing/fileProcessingThread.py b/src/python/dm/common/processing/fileProcessingThread.py
index a132b50adff3f098db02667d12336c1a63fd5be7..999207d97fb120cd699f6a4fb09b6ab9b0c9ef85 100755
--- a/src/python/dm/common/processing/fileProcessingThread.py
+++ b/src/python/dm/common/processing/fileProcessingThread.py
@@ -20,6 +20,70 @@ class FileProcessingThread(threading.Thread):
         self.fileProcessingQueue = fileProcessingQueue
         self.logger = LoggingManager.getInstance().getLogger(name)
 
+    def processFile(self, fileInfo):
+        filePath = fileInfo.get('filePath')
+        try:
+            statusMonitor = fileInfo['statusMonitor']
+            if statusMonitor.get('status') == 'aborting':
+                self.logger.debug('File %s processing is cancelled' % (filePath))
+                endProcessingTime = time.time() 
+                statusMonitor.fileProcessingCancelled(filePath, endProcessingTime) 
+                fileInfo.clear()
+                return 
+            self.logger.debug('Starting processing file %s' % filePath)
+            fileInfo['startProcessingTime'] = time.time() 
+            nProcessors = len(self.fileProcessorKeyList)
+            processedByDict = fileInfo.get('processedByDict', {})
+            fileInfo['processedByDict'] = processedByDict
+            processorNumber = 0
+            for processorKey in self.fileProcessorKeyList: 
+                processorNumber += 1
+                processor = self.fileProcessorDict.get(processorKey)
+                processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
+
+                if processedByDict.has_key(processorName):
+                    self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
+                    continue
+
+                self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
+                try:
+                    processor.processFile(fileInfo)
+                    processedByDict[processorName] = True
+                    self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
+                    if processorNumber == nProcessors:
+                        self.logger.debug('File %s processing is complete' % (filePath))
+                        endProcessingTime = time.time() 
+                        statusMonitor.fileProcessed(filePath, endProcessingTime) 
+                        fileInfo.clear()
+                except Exception, ex:
+                    self.logger.exception(ex)
+                    processingError = '%s processing error: %s' % (processorName, str(ex))
+                    self.logger.debug(processingError)
+                    processingDict = fileInfo.get('processingDict', {})
+                    fileInfo['processingDict'] = processingDict
+                    processorDict = processingDict.get(processorName, {}) 
+                    processingDict[processorName] = processorDict
+
+                    processorDict['lastError'] = str(ex)
+                    nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
+                    self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
+                    processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
+                    if nRetriesLeft <= 0:
+                        endProcessingTime = time.time() 
+                        statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) 
+                        fileInfo.clear()
+                        self.logger.debug('No more %s retries left for file %s' % (processorName, filePath))
+                    else:
+                        retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
+                        self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
+                        self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
+                        # Do not process this file further until 
+                        # this plugin is done
+                        break
+
+        except Exception, ex:
+            self.logger.exception(ex)
+
     def run(self):
         self.logger.debug('Starting thread: %s' % self.getName())
         while True:
@@ -29,63 +93,12 @@ class FileProcessingThread(threading.Thread):
                 break
 
             while True:
-                fileInfo = self.fileProcessingQueue.pop()
-                if fileInfo is None:
-                    break
-                filePath = fileInfo.get('filePath')
-                
                 try:
-                    statusMonitor = fileInfo['statusMonitor']
-                    fileInfo['startProcessingTime'] = time.time() 
-                    processorNumber = 0
-                    nProcessors = len(self.fileProcessorKeyList)
-                    processedByDict = fileInfo.get('processedByDict', {})
-                    fileInfo['processedByDict'] = processedByDict
-                    for processorKey in self.fileProcessorKeyList: 
-                        processorNumber += 1
-                        processor = self.fileProcessorDict.get(processorKey)
-                        processorName = '%s-%s' % (processor.__class__.__name__,processorNumber)
-
-                        if processedByDict.has_key(processorName):
-                            self.logger.debug('%s has already been processed by %s ' % (fileInfo, processorName))
-                            continue
-
-                        self.logger.debug('%s is about to process file %s ' % (processorName, filePath))
-                        try:
-                            processor.processFile(fileInfo)
-                            processedByDict[processorName] = True
-                            self.logger.debug('%s processed file at path %s ' % (processorName, filePath))
-                            if processorNumber == nProcessors:
-                                self.logger.debug('File %s processing is complete' % (filePath))
-                                endProcessingTime = time.time() 
-                                statusMonitor.fileProcessed(filePath, endProcessingTime) 
-                                fileInfo.clear()
-                        except Exception, ex:
-                            self.logger.exception(ex)
-                            processingError = '%s processing error: %s' % (processorName, str(ex))
-                            self.logger.debug(processingError)
-                            processingDict = fileInfo.get('processingDict', {})
-                            fileInfo['processingDict'] = processingDict
-                            processorDict = processingDict.get(processorName, {}) 
-                            processingDict[processorName] = processorDict
-
-                            processorDict['lastError'] = str(ex)
-                            nRetriesLeft = processorDict.get('numberOfRetriesLeft', processor.getNumberOfRetries()) 
-                            self.logger.debug('Number of %s retries left for file %s: %s' % (processorName, filePath, nRetriesLeft))
-                            processorDict['numberOfRetriesLeft'] = nRetriesLeft - 1
-                            if nRetriesLeft <= 0:
-                                endProcessingTime = time.time() 
-                                statusMonitor.fileProcessingError(filePath, processingError, endProcessingTime) 
-                                fileInfo.clear()
-                                self.logger.debug('No more %s retries left for file %s' % (processorName, filePath))
-                            else:
-                                retryWaitPeriod = processor.getRetryWaitPeriodInSeconds()
-                                self.logger.debug('%s will retry processing file %s in %s seconds' % (processorName, filePath, retryWaitPeriod))
-                                self.fileProcessingQueue.push(fileInfo, retryWaitPeriod)
-                            # Do not process this file further until 
-                            # this plugin is done
-                            break
-
+                    fileInfo = self.fileProcessingQueue.pop()
+                    if fileInfo is None:
+                        break
+                    self.logger.debug('Processing queue depth after pop: %s', self.fileProcessingQueue.getLength())
+                    self.processFile(fileInfo)
                 except Exception, ex:
                     self.logger.exception(ex)
 
diff --git a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
index 42f90fb1f30ab3058071e727aaee66e9e6ccd047..dc6c5740b33aca1b21f97e830e581bdeb71bd319 100755
--- a/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/gridftpFileTransferPlugin.py
@@ -100,7 +100,8 @@ class GridftpFileTransferPlugin(FileTransferPlugin):
         # Calculate checksum
         (scheme, host, port, dirPath) = FtpUtility.parseFtpUrl(dataDirectory, defaultPort=self.DEFAULT_PORT)
         ftpUtility = FtpUtility(host, port)
-        ftpUtility.statFile(filePath, fileInfo)
+        if not fileInfo.get('fileSize'):
+            ftpUtility.statFile(filePath, fileInfo)
         if self.localMd5Sum:
             ftpUtility.getMd5Sum(filePath, fileInfo)
 
diff --git a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py
index 7a80d0923e6b8dc0425a4505a17ec55a6c367dfb..6fd2bf9e0d62227098239d706f0d5bce24ae6d16 100755
--- a/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py
+++ b/src/python/dm/common/processing/plugins/mongoDbFileCatalogPlugin.py
@@ -20,48 +20,44 @@ class MongoDbFileCatalogPlugin(FileProcessor):
     def processFile(self, fileInfo):
         experimentFilePath = fileInfo.get('experimentFilePath')
         experimentName = fileInfo.get('experimentName')
-        self.logger.debug('Processing file %s for experiment %s' % (fileInfo, experimentName))
+        self.logger.debug('Processing file "%s" for experiment %s: %s' % (experimentFilePath, experimentName, fileInfo))
 
         daqInfo = fileInfo.get('daqInfo')
         storageDirectory = daqInfo.get('storageDirectory')
         storageHost = daqInfo.get('storageHost')
         storageUrl = daqInfo.get('storageUrl')
         storageFilePath = os.path.join(storageDirectory, experimentFilePath)
-        fileProcessingTime = time.time()
-        fileProcessingTimestamp = TimeUtility.formatLocalTimestamp(fileProcessingTime)
+        fileInfo['fileProcessingTime'] = time.time()
 
         # Prepare catalogging entry
         fileInfo2 = {}
         for key in ['md5Sum', 'fileSize']:
             if fileInfo.has_key(key):
                 fileInfo2[key] = fileInfo.get(key, '')
-        for key in ['fileCreationTime', 'fileModificationTime']:
+        for key in ['fileProcessingTime', 'fileCreationTime', 'fileModificationTime']:
             if fileInfo.has_key(key):
                 t = fileInfo.get(key, 0)
                 fileInfo2[key] = t
-		key2 = '%sstamp' % key
+                key2 = '%sstamp' % key
                 fileInfo2[key2] = TimeUtility.formatLocalTimestamp(t)
 
         fileLocations = fileInfo.get('fileLocations', [])
         fileLocations.append('%s/%s' % (storageUrl, experimentFilePath))
 
         fileInfo2['fileName'] = os.path.basename(experimentFilePath)
-        fileInfo2['storageDirectory'] = storageDirectory
-        fileInfo2['storageUrl'] = storageUrl
-        fileInfo2['storageHost'] = storageHost
-        fileInfo2['storageFilePath'] = storageFilePath
         fileInfo2['experimentFilePath'] = experimentFilePath
         fileInfo2['experimentName'] = experimentName
-        fileInfo2['fileProcessingTime'] = fileProcessingTime
-        fileInfo2['fileProcessingTimestamp'] = fileProcessingTimestamp
         fileInfo2['fileLocations'] = fileLocations
         self.logger.debug('Daq info: %s' % (daqInfo))
         fileInfo2.update(daqInfo)
         if daqInfo.has_key('id'): 
             fileInfo2['daqId'] = daqInfo.get('id')
             del fileInfo2['id']
+        for key in ['storageDirectory', 'storageUrl', 'storageHost']:
+            if fileInfo2.has_key(key):
+                del fileInfo2[key]
 
-        self.logger.debug('File %s catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
+        self.logger.debug('File "%s" catalog entry: %s' % (experimentFilePath, str(fileInfo2)))
         self.fileMongoDbApi.updateOrAddExperimentFile(fileInfo2)
 
 
diff --git a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
index 0522362ec1ee6e25e065ee768dd9222d44503b14..4bea1d19aa6d358ae8dfa4f6316dcbf91746c869 100755
--- a/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
+++ b/src/python/dm/common/processing/plugins/rsyncFileTransferPlugin.py
@@ -58,7 +58,8 @@ class RsyncFileTransferPlugin(FileTransferPlugin):
         srcUrl = self.getSrcUrl(filePath, dataDirectory)
 
         # Calculate checksum
-        FileUtility.statFile(filePath, fileInfo)
+        if not fileInfo.get('fileSize'):
+            FileUtility.statFile(filePath, fileInfo)
         if self.localMd5Sum:
             FileUtility.statFile(filePath, fileInfo)
             FileUtility.getMd5Sum(filePath, fileInfo)
diff --git a/src/python/dm/common/utility/ftpUtility.py b/src/python/dm/common/utility/ftpUtility.py
index 4514cf63a74d0246274a7d30f6d1e509e6dad29a..c82f5607c52ad7bc32cfcb14b86c9322013da147 100755
--- a/src/python/dm/common/utility/ftpUtility.py
+++ b/src/python/dm/common/utility/ftpUtility.py
@@ -87,7 +87,6 @@ class FtpUtility:
         if self.serverUsesUtcTime:
             modifyTime = TimeUtility.utcToLocalTime(modifyTime)
         fileStatDict['fileModificationTime'] = modifyTime
-        fileStatDict['fileModificationTimestamp'] = TimeUtility.formatLocalTimestamp(modifyTime)
         fileStatDict['fileSize'] = int(fileStatDict.get('fileSize'))
         del fileStatDict['Modify']
         del fileStatDict['Type']
@@ -134,11 +133,13 @@ class FtpUtility:
             fileSize = int(fileInfo.get('fileSize'))
             #print '%s %s %s' % (filePath, fileSize, modifyTime)
             fileDict[filePath] = {'fileSize' : fileSize, 'fileModificationTime' : modifyTime}
+        self.mlsdFileDict.clear()
         mlsdDirList = copy.copy(self.mlsdDirList)
         for d in mlsdDirList:
             dirPath2 = '%s/%s' % (dirPath,d)
             replacementDirPath2 = '%s/%s' % (replacementDirPath,d)
             self.getFiles(dirPath2, fileDict, replacementDirPath2)
+        self.mlsdDirList = []
         return fileDict
 
     def getMd5Sum(self, filePath, fileInfo={}):
diff --git a/src/python/dm/daq_web_service/api/experimentRestApi.py b/src/python/dm/daq_web_service/api/experimentRestApi.py
index 24ea8101267e1c6257f69142b4dd9dcac3ad4ef5..241dfccf0782f1af48e3e5905f966aed229c176f 100755
--- a/src/python/dm/daq_web_service/api/experimentRestApi.py
+++ b/src/python/dm/daq_web_service/api/experimentRestApi.py
@@ -64,6 +64,14 @@ class ExperimentRestApi(DaqRestApi):
         responseDict = self.sendSessionRequest(url=url, method='GET')
         return UploadInfo(responseDict)
 
+    @DaqRestApi.execute
+    def stopUpload(self, id):
+        url = '%s/experimentUploads/stopUpload/%s' % (self.getContextRoot(),id)
+        if not id:
+            raise InvalidRequest('Upload id must be provided.')
+        responseDict = self.sendSessionRequest(url=url, method='POST')
+        return UploadInfo(responseDict)
+
 #######################################################################
 # Testing.
 
diff --git a/src/python/dm/daq_web_service/cli/stopUploadCli.py b/src/python/dm/daq_web_service/cli/stopUploadCli.py
new file mode 100755
index 0000000000000000000000000000000000000000..c153555384b77b2f90e555abd74ca05edc6fe466
--- /dev/null
+++ b/src/python/dm/daq_web_service/cli/stopUploadCli.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+
+from daqWebServiceSessionCli import DaqWebServiceSessionCli
+from dm.daq_web_service.api.experimentRestApi import ExperimentRestApi
+from dm.common.exceptions.invalidRequest import InvalidRequest
+
+class StopUploadCli(DaqWebServiceSessionCli):
+    def __init__(self):
+        DaqWebServiceSessionCli.__init__(self, validArgCount=self.ANY_NUMBER_OF_POSITIONAL_ARGS)
+        self.addOption('', '--id', dest='id', help='Upload id.')
+
+    def checkArgs(self):
+        if self.options.id is None:
+            raise InvalidRequest('Upload id must be provided.')
+
+    def getId(self):
+        return self.options.id
+
+    def runCommand(self):
+        self.parseArgs(usage="""
+    dm-stop-upload --id=ID
+
+Description:
+    Aborts specified data upload.
+        """)
+        self.checkArgs()
+        api = ExperimentRestApi(self.getLoginUsername(), self.getLoginPassword(), self.getServiceHost(), self.getServicePort(), self.getServiceProtocol())
+        uploadInfo = api.stopUpload(self.getId())
+        print uploadInfo.getDisplayString(self.getDisplayKeys(), self.getDisplayFormat())
+
+#######################################################################
+# Run command.
+if __name__ == '__main__':
+    cli = StopUploadCli()
+    cli.run()
+
diff --git a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py
index 7fc1ae82498dc43ba177451b8db12956d5e21557..04c019e41941b39133b1a1858c0c723d826b531b 100755
--- a/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py
+++ b/src/python/dm/daq_web_service/service/experimentRouteDescriptor.py
@@ -63,6 +63,15 @@ class ExperimentRouteDescriptor:
                 'action' : 'getUploadInfo',
                 'method' : ['GET']
             },
+
+            # Stop upload
+            {
+                'name' : 'stopUpload',
+                'controller' : experimentSessionController,
+                'path' : '%s/experimentUploads/stopUpload/:(id)' % contextRoot,
+                'action' : 'stopUpload',
+                'method' : ['POST']
+            },
         ]
        
         return routes
diff --git a/src/python/dm/daq_web_service/service/experimentSessionController.py b/src/python/dm/daq_web_service/service/experimentSessionController.py
index 09786ff61b5aa64c87683864576546599ba1b0ac..80036d9a52ed23bab3d2bbc17ab49d3ea14064b1 100755
--- a/src/python/dm/daq_web_service/service/experimentSessionController.py
+++ b/src/python/dm/daq_web_service/service/experimentSessionController.py
@@ -89,3 +89,10 @@ class ExperimentSessionController(DmSessionController):
         self.logger.debug('Returning info for upload id %s' % id)
         return response
 
+    @cherrypy.expose
+    @DmSessionController.require(DmSessionController.isAdministrator())
+    @DmSessionController.execute
+    def stopUpload(self, id, **kwargs):
+        response = self.experimentSessionControllerImpl.stopUpload(id).getFullJsonRep()
+        self.logger.debug('Stopped upload id %s' % id)
+        return response
diff --git a/src/python/dm/daq_web_service/service/impl/daqTracker.py b/src/python/dm/daq_web_service/service/impl/daqTracker.py
index 18bd49baaf407611afc0898f214c7f4b5145b335..d538ad10fff1935baa7c07f5ee0957133cdb1388 100755
--- a/src/python/dm/daq_web_service/service/impl/daqTracker.py
+++ b/src/python/dm/daq_web_service/service/impl/daqTracker.py
@@ -30,16 +30,20 @@ class DaqTracker(ObjectTracker):
             raise ObjectAlreadyExists('DAQ is already active for experiment %s in data directory %s.' % (experimentName, dataDirectory))
         daqId = str(uuid.uuid4())
         daqInfo['id'] =  daqId 
-        daqInfo['nFiles'] =  0
-	daqInfo['nProcessedFiles'] = 0
-        daqInfo['nProcessingErrors'] = 0
         daqInfo['experimentName'] = experimentName
         daqInfo['storageDirectory'] = experiment.get('storageDirectory')
         daqInfo['storageHost'] = experiment.get('storageHost')
         daqInfo['storageUrl'] = experiment.get('storageUrl')
         daqInfo['dataDirectory'] = dataDirectory
+
+        # Create DaqInfo object with keys that we want to save with file
+        # metadata, and add other keys later
         daqInfo2 = DaqInfo(daqInfo)
 
+        daqInfo2['nFiles'] =  0
+	daqInfo2['nProcessedFiles'] = 0
+        daqInfo2['nProcessingErrors'] = 0
+
         startTime = time.time()
         daqInfo2['startTime'] = startTime
         daqInfo2['startTimestamp'] = TimeUtility.formatLocalTimestamp(startTime)
diff --git a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
index e0c51b6cdde360a8980fb8f7bce6837364fba3a3..651e5bed4f80ed11c9da5c0901bb36bf1944d0fe 100755
--- a/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
+++ b/src/python/dm/daq_web_service/service/impl/experimentSessionControllerImpl.py
@@ -111,13 +111,23 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         self.logger.debug('Preparing upload id: %s' % uploadId)
         dataDirectory = uploadInfo.get('dataDirectory')
         fileProcessingManager = FileProcessingManager.getInstance()
-        for filePath in filePathsDict.keys():
+        nProcessedFiles = 0
+        nFiles = len(filePathsDict)
+        for (filePath,filePathDict) in filePathsDict.items():
             fileInfo = ObservedFile(filePath=filePath, dataDirectory=dataDirectory, experiment=experiment)
+            fileInfo.update(filePathDict)
             fileInfo['daqInfo'] = daqInfo
             fileInfo['uploadId'] = uploadId
             fileInfo['statusMonitor'] = uploadInfo
             try: 
-                fileProcessingManager.processFile(fileInfo)
+                if uploadInfo.get('status') != 'aborting':
+                    fileProcessingManager.processFile(fileInfo)
+                    nProcessedFiles += 1
+                else:
+                    nCancelledFiles = nFiles - nProcessedFiles
+                    uploadInfo.uploadAborted(nCancelledFiles)
+                    self.logger.debug('Upload id %s aborted, will not process %s files)' % (uploadId, nCancelledFiles))
+                    break
             except Exception, ex:
                 self.logger.error('Processing error: %s', ex)
         self.logger.debug('Done preparing upload id: %s (total of %s files)' % (uploadId, len(filePathsDict)))
@@ -129,4 +139,12 @@ class ExperimentSessionControllerImpl(DmObjectManager):
         uploadInfo.updateStatus()
         return uploadInfo
 
+    def stopUpload(self, id):
+        uploadInfo = UploadTracker.getInstance().get(id)
+        if not uploadInfo:
+            raise ObjectNotFound('Upload id %s not found.' % id)
+        uploadInfo['status'] = 'aborting'
+        uploadInfo.updateStatus()
+        return uploadInfo
+