Index: src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java =================================================================== --- src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java (revision 1437931) +++ src/main/java/org/apache/jackrabbit/mongomk/impl/action/FetchNodesActionNew.java (working copy) @@ -21,8 +21,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.regex.Pattern; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.mongomk.impl.MongoNodeStore; import org.apache.jackrabbit.mongomk.impl.model.MongoCommit; import org.apache.jackrabbit.mongomk.impl.model.MongoNode; @@ -34,6 +40,8 @@ import com.mongodb.DBObject; import com.mongodb.QueryBuilder; +import static com.google.common.base.Preconditions.checkArgument; + /** * FIXME - This is same as FetchNodesAction except that it does not require * the list of all valid commits upfront. It also has some optimizations on how @@ -43,16 +51,27 @@ */ public class FetchNodesActionNew extends BaseAction> { + /** + * Trunk commits within this time frame are considered in doubt and are + * checked more thoroughly whether they are valid. + */ + private static final long IN_DOUBT_TIME_FRAME = 10000; + public static final int LIMITLESS_DEPTH = -1; private static final Logger LOG = LoggerFactory.getLogger(FetchNodesActionNew.class); private final Set paths; - private long revisionId = -1; + private long revisionId; private String branchId; private int depth = LIMITLESS_DEPTH; /** + * Maps valid commit revisionId to the baseRevId of the commit. + */ + private final SortedMap validCommits = new TreeMap(); + + /** * Constructs a new {@code FetchNodesAction} to fetch a node and optionally * its descendants under the specified path. * @@ -64,6 +83,7 @@ public FetchNodesActionNew(MongoNodeStore nodeStore, String path, int depth, long revisionId) { super(nodeStore); + checkArgument(revisionId >= 0, "revisionId must be >= 0"); paths = new HashSet(); paths.add(path); this.depth = depth; @@ -80,6 +100,7 @@ */ public FetchNodesActionNew(MongoNodeStore nodeStore, Set paths, long revisionId) { super(nodeStore); + checkArgument(revisionId >= 0, "revisionId must be >= 0"); this.paths = paths; this.revisionId = revisionId; } @@ -101,7 +122,7 @@ // FIXME - Should deal with multiple paths as long as depth = 0 if (paths.size() == 1 && depth == 0) { - String path = paths.toArray(new String[0])[0]; + String path = paths.iterator().next(); MongoNode node = nodeStore.getFromCache(path, branchId, revisionId); if (node != null) { Map nodes = new HashMap(); @@ -123,7 +144,7 @@ if (paths.size() > 1) { queryBuilder = queryBuilder.in(paths); } else { - String path = paths.toArray(new String[0])[0]; + String path = paths.iterator().next(); if (depth == 0) { queryBuilder = queryBuilder.is(path); } else { @@ -133,9 +154,7 @@ } // FIXME - This needs to be improved to not fetch all revisions of a path. - if (revisionId > -1) { - queryBuilder = queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId); - } + queryBuilder = queryBuilder.and(MongoNode.KEY_REVISION_ID).lessThanEquals(revisionId); if (branchId == null) { DBObject query = new BasicDBObject(MongoNode.KEY_BRANCH_ID, new BasicDBObject("$exists", false)); @@ -189,9 +208,15 @@ numberOfNodesToFetch = 1; } + // make sure we read from a valid commit + MongoCommit commit = fetchCommit(revisionId); + if (commit != null) { + validCommits.put(revisionId, commit.getBaseRevisionId()); + } else { + throw new MicroKernelException("Invalid revision: " + revisionId); + } + Map nodes = new HashMap(); - Map commits = new HashMap(); - while (dbCursor.hasNext() && (numberOfNodesToFetch == -1 || nodes.size() < numberOfNodesToFetch)) { MongoNode node = (MongoNode)dbCursor.next(); String path = node.getPath(); @@ -200,47 +225,105 @@ if (nodes.containsKey(path)) { LOG.debug("Converted node @{} with path {} was not put into map" + " because a newer version is available", node.getRevisionId(), path); - continue; } else { long revisionId = node.getRevisionId(); - LOG.debug("Converting node {} (@{})", path, revisionId); - - if (!commits.containsKey(revisionId) && nodeStore.getFromCache(revisionId) == null) { - LOG.debug("Fetching commit @{}", revisionId); - FetchCommitAction action = new FetchCommitAction(nodeStore, revisionId); - try { - MongoCommit commit = action.execute(); - commits.put(revisionId, commit); - } catch (Exception e) { - LOG.debug("Node will not be converted as it is not part of a valid commit {} ({})", - path, revisionId); - continue; - } + if (isValid(node)) { + nodes.put(path, node); + LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); + } else { + LOG.debug("Node will not be converted as it is not part of a valid commit {} ({})", + path, revisionId); } - nodes.put(path, node); - LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); } + } + dbCursor.close(); + return nodes; + } - // This is for unordered revision ids. - /* - MongoNode existingNodeMongo = nodeMongos.get(path); - if (existingNodeMongo != null) { - long existingRevId = existingNodeMongo.getRevisionId(); - - if (revisionId > existingRevId) { - nodeMongos.put(path, nodeMongo); - LOG.debug("Converted nodes was put into map and replaced {} ({})", path, revisionId); + /** + * @param node the node to check. + * @return true if the given node is from a valid commit; + * false otherwise. + */ + private boolean isValid(@Nonnull MongoNode node) { + long revisionId = node.getRevisionId(); + if (!validCommits.containsKey(revisionId) && nodeStore.getFromCache(revisionId) == null) { + if (branchId == null) { + // check if the given revisionId is a valid trunk commit + return isValidTrunkCommit(revisionId); + } else { + // for branch commits we only check the failed flag and + // assume there are no concurrent branch commits + // FIXME: may need to revisit this + MongoCommit commit = fetchCommit(revisionId); + if (commit != null) { + validCommits.put(revisionId, commit.getBaseRevisionId()); } else { - LOG.debug("Converted nodes was not put into map because a newer version" - + " is available {} ({})", path, revisionId); + return false; } + } + } + return true; + } + + /** + * Checks if the given revisionId is from a valid trunk + * commit. + * + * @param revisionId the commit revision. + * @return whether the revisionId is valid. + */ + private boolean isValidTrunkCommit(long revisionId) { + if (validCommits.containsKey(revisionId)) { + return true; + } + // if there is a lower valid revision than revisionId, we + // know it is invalid + if (!validCommits.headMap(revisionId).isEmpty()) { + return false; + } + // at this point we know the validCommits does not go + // back in history far enough to know if the revisionId is valid. + // need to fetch base commit of oldest valid commit + long inDoubt = System.currentTimeMillis() - IN_DOUBT_TIME_FRAME; + MongoCommit commit; + do { + // base revision of the oldest known valid commit + long baseRev = validCommits.values().iterator().next(); + commit = fetchCommit(baseRev); + if (commit.getBaseRevisionId() != null) { + validCommits.put(commit.getRevisionId(), commit.getBaseRevisionId()); } else { - nodeMongos.put(path, nodeMongo); - LOG.debug("Converted node @{} with path {} was put into map", revisionId, path); + // end of commit history } - */ + if (commit.getRevisionId() == revisionId) { + return true; + } else if (commit.getRevisionId() < revisionId) { + // given revisionId is between two valid revisions -> invalid + return false; + } + } while (commit.getTimestamp() > inDoubt); + // revisionId is past in doubt time frame + // perform simple check + return fetchCommit(revisionId) != null; + } + + /** + * Fetches the commit with the given revisionId. + * + * @param revisionId the revisionId of a commit. + * @return the commit or null if the commit does not exist or + * is marked as failed. + */ + @CheckForNull + private MongoCommit fetchCommit(long revisionId) { + LOG.debug("Fetching commit @{}", revisionId); + FetchCommitAction action = new FetchCommitAction(nodeStore, revisionId); + try { + return action.execute(); + } catch (Exception e) { + // not a valid commit } - dbCursor.close(); - return nodes; + return null; } } \ No newline at end of file Index: src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (revision 1437931) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (working copy) @@ -166,7 +166,6 @@ * a child node with name 'a'" error due to previous commit. */ @Test - @Ignore("OAK-566") public void leakedInvalidChild() throws Exception { mk.commit("/", "+\"c\" : {}", null, null); int n = 3; Index: src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java (revision 1437931) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/MongoMKConcurrentAddTest.java (working copy) @@ -67,7 +67,6 @@ * threads do not overlap / conflict. */ @Test - @Ignore("OAK-566") public void testConcurrentAdd() throws Exception { // create workers List> cs = new LinkedList>();