Index: oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java =================================================================== --- oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (revision 1676729) +++ oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (working copy) @@ -25,8 +25,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.EventExecutorGroup; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -97,7 +95,7 @@ } try { - store.setLoader(this); + store.preSync(this); SegmentNodeState before = store.getHead(); SegmentNodeBuilder builder = before.builder(); @@ -118,14 +116,7 @@ } log.debug("did reread locally corrupt segment " + id + " with size " + s.size()); - ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size()); - try { - s.writeTo(bout); - } catch (IOException f) { - log.error("can't wrap segment to output stream", f); - throw e; - } - store.writeSegment(s.getSegmentId(), bout.toByteArray(), 0, s.size()); + store.persist(s.getSegmentId(), s); } } while(true); boolean ok = store.setHead(before, builder.getNodeState()); @@ -144,6 +135,7 @@ } } } finally { + store.postSync(); close(); } } Index: oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStore.java =================================================================== --- oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStore.java (revision 1676729) +++ oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStore.java (working copy) @@ -16,11 +16,16 @@ */ package org.apache.jackrabbit.oak.plugins.segment.standby.store; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.jackrabbit.oak.api.Blob; @@ -34,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class StandbyStore implements SegmentStore { private static final Logger log = LoggerFactory.getLogger(StandbyStore.class); @@ -73,32 +80,85 @@ Deque ids = new ArrayDeque(); ids.offer(sid); int err = 0; - Set seen = new HashSet(); + Set persisted = new HashSet(); + + Map cache = new HashMap(); + long cacheWeight = 0; + long cacheOps = 0; while (!ids.isEmpty()) { SegmentId id = ids.remove(); - if (!seen.contains(id) && !delegate.containsSegment(id)) { - log.debug("trying to read segment " + id); - Segment s = loader.readSegment(id.toString()); + if (!persisted.contains(id) && !delegate.containsSegment(id)) { + Segment s; + boolean logRefs = true; + if (cache.containsKey(id)) { + s = cache.remove(id); + cacheWeight -= s.size(); + cacheOps++; + logRefs = false; + } else { + log.debug("transferring segment {}", id); + s = loader.readSegment(id.toString()); + } + if (s != null) { - log.debug("got segment " + id + " with size " + s.size()); - ByteArrayOutputStream bout = new ByteArrayOutputStream( + log.debug("processing segment {} with size {}", id, s.size()); if (id.isDataSegmentId()) { - ids.addAll(s.getReferencedIds()); - } - try { - s.writeTo(bout); - writeSegment(id, bout.toByteArray(), 0, s.size()); - } catch (IOException e) { - throw new IllegalStateException( - "Unable to write remote segment " + id, e); + boolean hasPendingRefs = false; + List refs = s.getReferencedIds(); + if (logRefs) { + log.debug("{} -> {}", id, refs); + } + for (SegmentId nr : refs) { + // skip already persisted or self-ref + if (persisted.contains(nr) || id.equals(nr)) { + continue; + } + hasPendingRefs = true; + if (!ids.contains(nr)) { + if (nr.isBulkSegmentId()) { + // binaries first + ids.addFirst(nr); + } else { + // data segments last + ids.add(nr); + } + } + } + + if (!hasPendingRefs) { + persisted.add(id); + persist(id, s); + } else { + // persist it later, after the refs are in place + ids.add(id); + + // TODO there is a chance this might introduce + // a OOME because of the position of the current + // segment in the processing queue. putting it at + // the end of the current queue means it will stay + // in the cache until the pending queue of the + // segment's references is processed. + cache.put(id, s); + cacheWeight += s.size(); + cacheOps++; + + if (log.isDebugEnabled() && cacheOps % 500 == 0) { + log.debug( + "cache size is {} segments, weight is {}", + cache.size(), + humanReadableByteCount(cacheWeight)); + } + } + } else { + persisted.add(id); + persist(id, s); } - seen.add(id); - ids.removeAll(seen); + ids.removeAll(persisted); err = 0; } else { - log.error("could NOT read segment " + id); + log.error("could NOT read segment {}", id); if (loader.isClosed() || err == 4) { loader.close(); throw new IllegalStateException( @@ -108,14 +168,45 @@ ids.addFirst(id); } } else { - seen.add(id); + persisted.add(id); } } - log.debug("calling delegate to return segment " + sid); return delegate.readSegment(sid); } + public void persist(SegmentId in, Segment s) { + SegmentId id = delegate.getTracker().getSegmentId( + in.getMostSignificantBits(), in.getLeastSignificantBits()); + gcProtector.add(id); + log.debug("persisting segment {} with size {}", id, s.size()); + try { + ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size()); + s.writeTo(bout); + writeSegment(id, bout.toByteArray(), 0, s.size()); + } catch (IOException e) { + throw new IllegalStateException("Unable to write remote segment " + + id, e); + } + } + + private Set gcProtector = new HashSet(); + private boolean sync = false; + + public void preSync(RemoteSegmentLoader loader) { + Preconditions.checkState(!sync); + this.loader = loader; + this.gcProtector = new HashSet(); + this.sync = true; + } + + public void postSync() { + Preconditions.checkState(sync); + loader = null; + gcProtector = null; + sync = false; + } + @Override public void writeSegment(SegmentId id, byte[] bytes, int offset, int length) { delegate.writeSegment(id, bytes, offset, length); @@ -141,10 +232,6 @@ delegate.gc(); } - public void setLoader(RemoteSegmentLoader loader) { - this.loader = loader; - } - public long size() { if (delegate instanceof FileStore) { try { Index: oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/StandbyTestIT.java =================================================================== --- oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/StandbyTestIT.java (revision 1676729) +++ oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/StandbyTestIT.java (working copy) @@ -18,14 +18,12 @@ */ package org.apache.jackrabbit.oak.plugins.segment.standby; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; -import java.io.File; import java.io.IOException; import java.util.Random; @@ -88,7 +86,7 @@ @Test public void testSyncLoop() throws Exception { final int mb = 1 * 1024 * 1024; - final int blobSize = 5 * mb; + final int blobSize = 25 * mb; final int dataNodes = 5000; FileStore primary = getPrimary(); @@ -103,11 +101,13 @@ try { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 3; i++) { String cp = store.checkpoint(Long.MAX_VALUE); cl.run(); assertEquals(primary.getHead(), secondary.getHead()); assertTrue(store.release(cp)); + cl.cleanup(); + assertTrue(secondary.size() > blobSize); } } finally {