From 7fea7b37162be87f75e1924738efaa816099d0e8 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Thu, 9 Jul 2015 07:13:59 +0530
Subject: [PATCH] OAK-3087 update removeDescendantsAndSelf api to remove
 documents upwards breadth wise from a calculated maxDepth under the given
 path

---
 oak-run/src/main/js/oak-mongo.js | 87 +++++++++++++++++++++++++++++-----------
 1 file changed, 64 insertions(+), 23 deletions(-)

diff --git a/oak-run/src/main/js/oak-mongo.js b/oak-run/src/main/js/oak-mongo.js
index 050be40..31fb0c1 100644
--- a/oak-run/src/main/js/oak-mongo.js
+++ b/oak-run/src/main/js/oak-mongo.js
@@ -222,37 +222,46 @@ var oak = (function(global){
      * @memberof oak
      * @method removeDescendantsAndSelf
      * @param {string} path the path of the subtree to remove.
+     * @param {number} optional parameter to suggest max depth for removal of descendants.
+     *                 If this parameter is specified, the method would move upwards (breadth-wise)
+     *                 from max depth to the top path (unlike its default breadth first behavior).
+     *                 This is useful because it doesn't leave orphan documents in case the operation is
+     *                 killed mid-way.
      */
     api.removeDescendantsAndSelf = function(path) {
+        var maxDepth = findMaxDepthUnderPath(path);
+        var moveUpwards = true;
+
         var count = 0;
-        var depth = pathDepth(path);
-        var id = depth + ":" + path;
-        // current node at path
-        var result = db.nodes.remove({_id: id});
-        count += result.nRemoved;
-        // might be a long path
-        result = db.nodes.remove(longPathQuery(path));
-        count += result.nRemoved;
+        var minDepth = pathDepth(path);
+        var depth;
+        var result;
+
         // descendants
         var prefix = path + "/";
-        depth++;
-        while (true) {
+        var removeDepth = function () {
+            var onePathCnt = 0;
             result = db.nodes.remove(longPathFilter(depth, prefix));
-            count += result.nRemoved;
-            result = db.nodes.remove({_id: pathFilter(depth++, prefix)});
-            count += result.nRemoved;
-            if (result.nRemoved == 0) {
-                break;
-            }
+            onePathCnt += result.nRemoved;
+            result = db.nodes.remove({_id: pathFilter(depth, prefix)});
+            onePathCnt += result.nRemoved;
+            return onePathCnt;
         }
-        // descendants further down the hierarchy with long path
-        while (true) {
-            result = db.nodes.remove(longPathFilter(depth++, prefix));
-            if (result.nRemoved == 0) {
-                break;
-            }
-            count += result.nRemoved;
+
+        for (depth = maxDepth; depth > minDepth; depth--) {
+            var onePathCnt = removeDepth();
+            count += onePathCnt;
         }
+
+        //Delete the document at path itself
+        var id = minDepth + ":" + path;
+        // current node at path
+        result = db.nodes.remove({_id: id});
+        count += result.nRemoved;
+        // might be a long path
+        result = db.nodes.remove(longPathQuery(path));
+        count += result.nRemoved;
+
         return {nRemoved : count};
     };
 
@@ -975,6 +984,38 @@ var oak = (function(global){
         return s.replace(/[-\/\\^$*+?.()|[\]{}]/g, '\\$&');
     };
 
+    var findMaxDepthUnderPath = function(path) {
+        var lowDepth = pathDepth(path);
+        var jump = 1000;
+        while (true) {
+            var doc = db.nodes.findOne({_id: pathFilter(lowDepth+jump, path)});
+            if (doc != null) {
+                lowDepth += jump;
+                jump *= 2;
+            } else {
+                break;
+            }
+        }
+
+        var highDepth = lowDepth + jump;
+        while (highDepth > lowDepth) {
+            var midDepth = Math.floor((highDepth + lowDepth)/2);
+            if (midDepth == lowDepth) {
+                break;
+            }
+
+            var doc = db.nodes.findOne({_id: pathFilter(midDepth, path)});
+
+            if (doc == null) {
+                highDepth = midDepth;
+            } else {
+                lowDepth = midDepth;
+            }
+        }
+
+        return lowDepth;
+    }
+
     var getDocAndHierarchyQuery = function (path) {
         var paths = getHierarchyPaths(path);
 
-- 
2.1.4

