diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CancelableDiff.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CancelableDiff.java index 2ceff60402..df8f5d266b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CancelableDiff.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CancelableDiff.java @@ -29,7 +29,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; * Supplier}. If the {@code Supplier} returns {@code true}, the diffing process * will be canceled at the first possible occasion. */ -class CancelableDiff implements NodeStateDiff { +public class CancelableDiff implements NodeStateDiff { private final NodeStateDiff delegate; diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java index e9fa6ce723..afcc729a2d 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java @@ -21,16 +21,16 @@ package org.apache.jackrabbit.oak.segment.standby.client; import static org.apache.jackrabbit.oak.api.Type.BINARIES; import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import com.google.common.base.Supplier; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; -import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.CancelableDiff; import org.apache.jackrabbit.oak.segment.SegmentBlob; import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -57,153 +57,134 @@ class StandbyDiff implements NodeStateDiff { private final Supplier running; - /** - * read-only traversal of the diff that has 2 properties: one is to log all - * the content changes, second is to drill down to properly level, so that - * missing binaries can be sync'ed if needed - */ - private final boolean logOnly; - StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient client, Supplier running) { - this(builder, store, client, "/", false, running); + this(builder, store, client, "/", running); } - private StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient client, String path, boolean logOnly, Supplier running) { + private StandbyDiff(NodeBuilder builder, FileStore store, StandbyClient client, String path, + Supplier running) { this.builder = builder; this.store = store; this.hasDataStore = store.getBlobStore() != null; this.client = client; this.path = path; - this.logOnly = logOnly; this.running = running; } - private boolean stop() { - return !running.get(); - } - @Override public boolean propertyAdded(PropertyState after) { - if (stop()) { - return false; - } - - if (!logOnly) { - builder.setProperty(after); - } - + builder.setProperty(after); return true; } @Override public boolean propertyChanged(PropertyState before, PropertyState after) { - if (stop()) { - return false; - } - - if (!logOnly) { - builder.setProperty(after); - } - + builder.setProperty(after); return true; } @Override public boolean propertyDeleted(PropertyState before) { - if (stop()) { - return false; - } - - if (!logOnly) { - builder.removeProperty(before.getName()); - } - + builder.removeProperty(before.getName()); return true; } @Override public boolean childNodeAdded(String name, NodeState after) { - return process(name, "childNodeAdded", EmptyNodeState.EMPTY_NODE, after); + SegmentNodeState processed = process(name, EMPTY_NODE, after, EMPTY_NODE.builder()); + if (processed != null) { + builder.setChildNode(name, processed); + return true; + } else { + return false; + } } @Override public boolean childNodeChanged(String name, NodeState before, NodeState after) { - return process(name, "childNodeChanged", before, after); + SegmentNodeState processed = process(name, before, after, builder.getChildNode(name)); + if (processed != null) { + builder.setChildNode(name, processed); + return true; + } else { + return false; + } } - + @Override public boolean childNodeDeleted(String name, NodeState before) { - log.trace("childNodeDeleted {}, RO:{}", path + name, logOnly); - - if (!logOnly) { - builder.getChildNode(name).remove(); - } - + builder.getChildNode(name).remove(); return true; } - private boolean process(String name, String op, NodeState before, NodeState after) { - if (stop()) { - return false; - } + public SegmentNodeState process(String name, NodeState before, NodeState after, NodeBuilder onto) { + return new StandbyDiff(onto, store, client, path + name + "/", running).diff(name, before, after); + } + SegmentNodeState diff(String name, NodeState before, NodeState after) { if (after instanceof SegmentNodeState) { - if (log.isTraceEnabled()) { - log.trace("{} {}, readonly binary check {}", op, path + name, logOnly); - } - - if (!logOnly) { - RecordId id = ((SegmentNodeState) after).getRecordId(); - builder.setChildNode(name, store.getReader().readNode(id)); - } - if ("checkpoints".equals(name)) { // if we're on the /checkpoints path, there's no need for a deep // traversal to verify binaries - return true; + return (SegmentNodeState) after; } if (!hasDataStore) { - return true; + return (SegmentNodeState) after; } // has external data store, we need a deep // traversal to verify binaries for (PropertyState propertyState : after.getProperties()) { - binaryCheck(propertyState); + fetchBinary(propertyState); } - return after.compareAgainstBaseState(before, - new StandbyDiff(builder.getChildNode(name), store, client, path + name + "/", true, running)); + boolean success = after.compareAgainstBaseState(before, new CancelableDiff(this, newCanceledSupplier())); + if (success) { + return (SegmentNodeState) after; + } else { + return null; + } + } else { + return null; } + } + + private Supplier newCanceledSupplier() { + return new Supplier() { - return false; + @Override + public Boolean get() { + return !running.get(); + } + + }; } - - private PropertyState binaryCheck(PropertyState property) { + + private PropertyState fetchBinary(PropertyState property) { Type type = property.getType(); if (type == BINARY) { - binaryCheck(property.getValue(Type.BINARY), property.getName()); + fetchBinary(property.getValue(Type.BINARY), property.getName()); } else if (type == BINARIES) { for (Blob blob : property.getValue(BINARIES)) { - binaryCheck(blob, property.getName()); + fetchBinary(blob, property.getName()); } } return property; } - private void binaryCheck(Blob b, String pName) { + private void fetchBinary(Blob b, String pName) { if (b instanceof SegmentBlob) { - binaryCheck((SegmentBlob) b, pName); + fetchBinary((SegmentBlob) b, pName); } else { log.warn("Unknown Blob {} at {}, ignoring", b.getClass().getName(), path + "#" + pName); } } - private void binaryCheck(SegmentBlob sb, String pName) { + private void fetchBinary(SegmentBlob sb, String pName) { if (sb.isExternal() && hasDataStore && sb.getReference() == null) { String blobId = sb.getBlobId(); @@ -212,14 +193,14 @@ class StandbyDiff implements NodeStateDiff { } try { - readBlob(blobId, pName); + fetchAndStoreBlob(blobId, pName); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } - - private void readBlob(String blobId, String pName) throws InterruptedException { + + private void fetchAndStoreBlob(String blobId, String pName) throws InterruptedException { InputStream in = client.getBlob(blobId); if (in == null) {