Index: oak-segment-tar/pom.xml =================================================================== --- oak-segment-tar/pom.xml (revision 1757664) +++ oak-segment-tar/pom.xml (working copy) @@ -35,6 +35,7 @@ 1.5.5 + 4.0.23.Final @@ -51,7 +52,24 @@ - commons-math3 + + commons-math3, + netty-* + + + com.google.protobuf.*;resolution:=optional, + com.jcraft.jzlib.*;resolution:=optional, + javassist.*;resolution:=optional, + org.apache.tomcat.jni.*;resolution:=optional, + org.bouncycastle.*;resolution:=optional, + org.eclipse.jetty.npn.*;resolution:=optional, + org.jboss.marshalling.*;resolution:=optional, + sun.misc.*;resolution:=optional, + sun.nio.ch.*;resolution:=optional, + sun.security.util.*;resolution:=optional, + sun.security.x509.*;resolution:=optional, + * + @@ -60,11 +78,49 @@ maven-scr-plugin + org.codehaus.mojo + build-helper-maven-plugin + 1.12 + + + + reserve-network-port + + process-test-resources + + + standby.server.port + standby.proxy.port + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + ${standby.server.port} + ${standby.proxy.port} + + + **/BulkTest.java + **/MBeanTest.java + **/FailoverIPRangeTest.java + + + + + org.apache.maven.plugins maven-failsafe-plugin + 2.19.1 SEGMENT_TAR + ${standby.server.port} + ${standby.proxy.port} @@ -208,6 +264,39 @@ provided + + + + io.netty + netty-common + ${netty.version} + provided + + + io.netty + netty-buffer + ${netty.version} + provided + + + io.netty + netty-transport + ${netty.version} + provided + + + io.netty + netty-codec + ${netty.version} + provided + + + io.netty + netty-handler + ${netty.version} + provided + + Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java (revision 1757664) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java (working copy) @@ -31,7 +31,7 @@ * A property definition within a template (the property name, the type, and the * index within the list of properties for the given node). */ -class PropertyTemplate implements Comparable { +public class PropertyTemplate implements Comparable { /** * The index of this property within the list of properties in the node Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java (revision 1757664) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java (working copy) @@ -30,9 +30,12 @@ import static org.apache.jackrabbit.oak.segment.SegmentWriter.BLOCK_SIZE; import java.io.IOException; +import java.io.OutputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.Map; import java.util.UUID; @@ -584,4 +587,12 @@ } } + public void writeTo(OutputStream stream) throws IOException { + ByteBuffer buffer = data.duplicate(); + WritableByteChannel channel = Channels.newChannel(stream); + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } + } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java (working copy) @@ -21,4 +21,5 @@ public interface SegmentStoreProvider { SegmentStore getSegmentStore(); + } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java (working copy) @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.client; +package org.apache.jackrabbit.oak.segment.standby.client; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy) @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.client; +package org.apache.jackrabbit.oak.segment.standby.client; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; -import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq; -import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq; +import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetBlobReq; +import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetSegmentReq; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -32,14 +32,14 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.plugins.segment.RecordId; -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; +import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.standby.codec.SegmentReply; +import org.apache.jackrabbit.oak.segment.standby.store.RemoteSegmentLoader; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,7 +131,7 @@ SegmentNodeState before = store.getHead(); SegmentNodeBuilder builder = before.builder(); - SegmentNodeState current = new SegmentNodeState(head); + SegmentNodeState current = store.newSegmentNodeState(head); do { try { current.compareAgainstBaseState(before, Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java (working copy) @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.client; +package org.apache.jackrabbit.oak.segment.standby.client; + import static org.apache.jackrabbit.oak.api.Type.BINARIES; import static org.apache.jackrabbit.oak.api.Type.BINARY; @@ -27,11 +28,11 @@ import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; -import org.apache.jackrabbit.oak.plugins.segment.RecordId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.SegmentBlob; +import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.standby.store.RemoteSegmentLoader; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; @@ -45,7 +46,7 @@ private final NodeBuilder builder; - private final SegmentStore store; + private final StandbyStore store; private final boolean hasDataStore; @@ -60,12 +61,12 @@ */ private final boolean logOnly; - public StandbyApplyDiff(NodeBuilder builder, SegmentStore store, + public StandbyApplyDiff(NodeBuilder builder, StandbyStore store, RemoteSegmentLoader loader) { this(builder, store, loader, "/", false); } - private StandbyApplyDiff(NodeBuilder builder, SegmentStore store, + private StandbyApplyDiff(NodeBuilder builder, StandbyStore store, RemoteSegmentLoader loader, String path, boolean logOnly) { this.builder = builder; this.store = store; @@ -179,7 +180,7 @@ } if (!logOnly) { RecordId id = ((SegmentNodeState) after).getRecordId(); - builder.setChildNode(name, new SegmentNodeState(id)); + builder.setChildNode(name, store.newSegmentNodeState(id)); } if ("checkpoints".equals(name)) { // if we're on the /checkpoints path, there's no need for a deep Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (working copy) @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.client; +package org.apache.jackrabbit.oak.segment.standby.client; import java.io.Closeable; import java.lang.management.ManagementFactory; @@ -44,12 +44,13 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdDecoder; +import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,7 @@ private long syncStartTimestamp; private long syncEndTimestamp; - public StandbyClient(String host, int port, SegmentStore store, + public StandbyClient(String host, int port, FileStore store, boolean secure, int readTimeoutMs, boolean autoClean) throws SSLException { this.state = STATUS_INITIALIZING; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java (working copy) @@ -16,21 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.client; -import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq; +package org.apache.jackrabbit.oak.segment.standby.client; +import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetHeadReq; + import java.io.Closeable; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.ReadTimeoutHandler; -import org.apache.jackrabbit.oak.plugins.segment.RecordId; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdDecoder; +import org.apache.jackrabbit.oak.segment.standby.codec.ReplyDecoder; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java (working copy) @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java (working copy) @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (working copy) @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; public class Messages { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java (working copy) @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; import java.io.IOException; @@ -25,9 +25,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.CharsetUtil; - -import org.apache.jackrabbit.oak.plugins.segment.RecordId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.SegmentStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +54,7 @@ String id = frame.toString(CharsetUtil.UTF_8); try { log.debug("received type {} with id {}", type, id); - return RecordId.fromString(store.getTracker(), id); + return RecordId.fromString(store, id); } catch (IllegalArgumentException e) { log.error(e.getMessage(), e); } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java (working copy) @@ -17,14 +17,14 @@ * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.CharsetUtil; -import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.segment.RecordId; public class RecordIdEncoder extends MessageToByteEncoder { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy) @@ -17,27 +17,25 @@ * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ReplayingDecoder; - import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; import java.util.UUID; -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder.DecodingState; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.standby.codec.ReplyDecoder.DecodingState; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - public class ReplyDecoder extends ReplayingDecoder { public enum DecodingState { @@ -47,12 +45,12 @@ private static final Logger log = LoggerFactory .getLogger(ReplyDecoder.class); - private final SegmentStore store; + private final StandbyStore store; private int length = -1; private byte type = -1; - public ReplyDecoder(SegmentStore store) { + public ReplyDecoder(StandbyStore store) { super(DecodingState.HEADER); this.store = store; } @@ -128,9 +126,8 @@ Hasher hasher = Hashing.murmur3_32().newHasher(); long check = hasher.putBytes(segment).hash().padToLong(); if (hash == check) { - SegmentId id = new SegmentId(store.getTracker(), msb, lsb); - Segment s = new Segment(store.getTracker(), id, - ByteBuffer.wrap(segment)); + SegmentId id = store.newSegmentId(msb, lsb); + Segment s = store.newSegment(id, ByteBuffer.wrap(segment)); log.debug("received segment with id {} and size {}", id, s.size()); return s; } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java (revision 1757664) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java (working copy) @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; - -import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder.EXTRA_HEADERS_LEN; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import java.nio.ByteBuffer; -import java.util.UUID; - -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - -@Deprecated -public class SegmentDecoder extends LengthFieldBasedFrameDecoder { - - private static final Logger log = LoggerFactory - .getLogger(SegmentDecoder.class); - - /** - * the maximum possible size a header message might have - */ - private static final int MAX_LENGHT = Segment.MAX_SEGMENT_SIZE - + EXTRA_HEADERS_LEN; - - private final SegmentStore store; - - public SegmentDecoder(SegmentStore store) { - super(MAX_LENGHT, 0, 4, 0, 0); - this.store = store; - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) - throws Exception { - ByteBuf frame = (ByteBuf) super.decode(ctx, in); - if (frame == null) { - return null; - } - int len = frame.readInt(); - byte type = frame.readByte(); - long msb = frame.readLong(); - long lsb = frame.readLong(); - long hash = frame.readLong(); - byte[] segment = new byte[len - 25]; - frame.getBytes(29, segment); - Hasher hasher = Hashing.murmur3_32().newHasher(); - long check = hasher.putBytes(segment).hash().padToLong(); - if (hash == check) { - SegmentId id = new SegmentId(store.getTracker(), msb, lsb); - Segment s = new Segment(store.getTracker(), id, - ByteBuffer.wrap(segment)); - log.debug("received type {} with id {} and size {}", type, id, - s.size()); - return s; - } - log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb)); - return null; - - } - - @Override - protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, - int index, int length) { - return buffer.slice(index, length); - } - -} Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java (working copy) @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -25,8 +25,8 @@ import java.io.ByteArrayOutputStream; -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java (working copy) @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.codec; +package org.apache.jackrabbit.oak.segment.standby.codec; -import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.segment.Segment; public class SegmentReply { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java (working copy) @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.jmx; +package org.apache.jackrabbit.oak.segment.standby.jmx; import org.apache.jackrabbit.oak.commons.jmx.Description; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java (working copy) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.jmx; +package org.apache.jackrabbit.oak.segment.standby.jmx; import org.apache.jackrabbit.oak.commons.jmx.Description; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java (working copy) @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.jmx; +package org.apache.jackrabbit.oak.segment.standby.jmx; import org.apache.jackrabbit.oak.commons.jmx.Description; import javax.annotation.Nonnull; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy) @@ -16,8 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.server; +package org.apache.jackrabbit.oak.segment.standby.server; + +import java.io.Closeable; +import java.lang.management.ManagementFactory; +import java.security.cert.CertificateException; +import java.util.concurrent.TimeUnit; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.StandardMBean; +import javax.net.ssl.SSLException; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -35,27 +47,17 @@ import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; - -import java.io.Closeable; -import java.lang.management.ManagementFactory; -import java.security.cert.CertificateException; -import java.util.concurrent.TimeUnit; - -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.BlobEncoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdEncoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder; +import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.StandardMBean; -import javax.net.ssl.SSLException; - public class StandbyServer implements StandbyStatusMBean, Closeable { private static final Logger log = LoggerFactory @@ -72,23 +74,19 @@ private ChannelFuture channelFuture; private boolean running; - public StandbyServer(int port, final SegmentStore store) - throws CertificateException, SSLException { + public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException { this(port, store, null, false); } - public StandbyServer(int port, final SegmentStore store, boolean secure) - throws CertificateException, SSLException { + public StandbyServer(int port, final FileStore store, boolean secure) throws CertificateException, SSLException { this(port, store, null, secure); } - public StandbyServer(int port, final SegmentStore store, String[] allowedClientIPRanges) - throws CertificateException, SSLException { + public StandbyServer(int port, final FileStore store, String[] allowedClientIPRanges) throws CertificateException, SSLException { this(port, store, allowedClientIPRanges, false); } - public StandbyServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure) - throws CertificateException, SSLException { + public StandbyServer(int port, final FileStore store, String[] allowedClientIPRanges, boolean secure) throws CertificateException, SSLException { this.port = port; if (secure) { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (working copy) @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.server; +package org.apache.jackrabbit.oak.segment.standby.server; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -28,15 +29,17 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; - import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.IllegalRepositoryStateException; -import org.apache.jackrabbit.oak.plugins.segment.RecordId; -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages; -import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.codec.Messages; +import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver; +import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +49,12 @@ private static final Logger log = LoggerFactory .getLogger(StandbyServerHandler.class); - private final SegmentStore store; + private final FileStore store; private final CommunicationObserver observer; private final String[] allowedIPRanges; public String state; - public StandbyServerHandler(SegmentStore store, CommunicationObserver observer, String[] allowedIPRanges) { + public StandbyServerHandler(FileStore store, CommunicationObserver observer, String[] allowedIPRanges) { this.store = store; this.observer = observer; this.allowedIPRanges = allowedIPRanges; @@ -154,9 +157,7 @@ for (int i = 0; i < 10; i++) { try { - s = store.readSegment(new SegmentId(store.getTracker(), - uuid.getMostSignificantBits(), uuid - .getLeastSignificantBits())); + s = store.readSegment(store.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); } catch (IllegalRepositoryStateException e) { // segment not found log.debug("waiting for segment. Got exception: " + e.getMessage()); @@ -174,7 +175,7 @@ } else if (request.startsWith(Messages.GET_BLOB)) { String bid = request.substring(Messages.GET_BLOB.length()); log.debug("request blob id {}", bid); - Blob b = store.readBlob(bid); + Blob b = readBlob(bid); log.debug("sending blob " + bid + " to " + client); ctx.writeAndFlush(b); observer.didSendBinariesBytes(clientID, @@ -203,4 +204,15 @@ log.error("Exception occurred: " + cause.getMessage(), cause); } } + + private Blob readBlob(String blobId) { + BlobStore blobStore = store.getBlobStore(); + + if (blobStore != null) { + return new BlobStoreBlob(blobStore, blobId); + } + + throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] without specifying BlobStore"); + } + } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (working copy) @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.store; +package org.apache.jackrabbit.oak.segment.standby.store; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java (working copy) @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.store; +package org.apache.jackrabbit.oak.segment.standby.store; import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.segment.Segment; public interface RemoteSegmentLoader { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java (working copy) @@ -14,59 +14,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.store; +package org.apache.jackrabbit.oak.segment.standby.store; + import static com.google.common.collect.Sets.newHashSet; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; -import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.plugins.segment.Segment; -import org.apache.jackrabbit.oak.plugins.segment.SegmentId; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StandbyStore implements SegmentStore { +public class StandbyStore implements SegmentStore, Closeable { private static final Logger log = LoggerFactory.getLogger(StandbyStore.class); - private final SegmentTracker tracker = new SegmentTracker(this); + private final FileStore delegate; - private final SegmentStore delegate; - private RemoteSegmentLoader loader; - public StandbyStore(SegmentStore delegate) { + public StandbyStore(FileStore delegate) { this.delegate = delegate; } + @Nonnull @Override - public SegmentTracker getTracker() { - return tracker; + public SegmentId newSegmentId(long msb, long lsb) { + return delegate.newSegmentId(msb, lsb); } + @Nonnull @Override - public SegmentNodeState getHead() { - return delegate.getHead(); + public SegmentId newBulkSegmentId() { + return delegate.newBulkSegmentId(); } + @Nonnull @Override - public boolean setHead(SegmentNodeState base, SegmentNodeState head) { - return delegate.setHead(base, head); + public SegmentId newDataSegmentId() { + return delegate.newDataSegmentId(); } @Override @@ -74,6 +81,19 @@ return delegate.containsSegment(id); } + private List getReferencedIds(Segment s) { + List segmentIds = new ArrayList<>(); + + for (int i = 0; i < s.getReferencedSegmentIdCount(); i++) { + UUID uuid = s.getReferencedSegmentId(i); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + segmentIds.add(delegate.newSegmentId(msb, lsb)); + } + + return segmentIds; + } + @Override public Segment readSegment(SegmentId sid) { callId++; @@ -114,7 +134,7 @@ s.size()); if (id.isDataSegmentId()) { boolean hasPendingRefs = false; - List refs = s.getReferencedIds(); + List refs = getReferencedIds(s); if (logRefs) { log.debug("{} -> {}", id, refs); } @@ -181,8 +201,7 @@ } public void persist(SegmentId in, Segment s) { - SegmentId id = delegate.getTracker().getSegmentId( - in.getMostSignificantBits(), in.getLeastSignificantBits()); + SegmentId id = delegate.newSegmentId(in.getMostSignificantBits(), in.getLeastSignificantBits()); log.debug("persisting segment {} with size {}", id, s.size()); try { ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size()); @@ -220,38 +239,36 @@ delegate.close(); } - @Override - public Blob readBlob(String reference) { - return delegate.readBlob(reference); + public long size() { + return delegate.getStats().getApproximateSize(); } - @Override - public BlobStore getBlobStore() { - return delegate.getBlobStore(); + public void cleanup() { + try { + delegate.flush(); + } catch (IOException e) { + log.error("Error running cleanup", e); + } } - @Override - public void gc() { - delegate.gc(); + public SegmentNodeState getHead() { + return delegate.getHead(); } - public long size() { - if (delegate instanceof FileStore) { - return ((FileStore) delegate).size(); - } - return -1; + public SegmentNodeState newSegmentNodeState(RecordId id) { + return delegate.getReader().readNode(id); } - public void cleanup() { - if (delegate instanceof FileStore) { - try { - delegate.getTracker().getWriter().dropCache(); - ((FileStore) delegate).flush(true); - } catch (IOException e) { - log.error("Error running cleanup", e); - } - } else { - log.warn("Delegate is not a FileStore, ignoring cleanup call"); - } + public boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) { + return delegate.getRevisions().setHead(expected.getRecordId(), head.getRecordId()); } + + public Segment newSegment(SegmentId segmentId, ByteBuffer buffer) { + return new Segment(delegate, delegate.getReader(), segmentId, buffer); + } + + public BlobStore getBlobStore() { + return delegate.getBlobStore(); + } + } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (working copy) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (working copy) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby.store; +package org.apache.jackrabbit.oak.segment.standby.store; import static java.lang.String.valueOf; import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC; @@ -35,10 +35,11 @@ import org.apache.felix.scr.annotations.PropertyOption; import org.apache.felix.scr.annotations.Reference; import org.apache.jackrabbit.oak.commons.PropertiesUtil; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStoreProvider; -import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.SegmentStoreProvider; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; @@ -91,7 +92,7 @@ @Reference(policy = STATIC, policyOption = GREEDY) private SegmentStoreProvider storeProvider = null; - private SegmentStore segmentStore; + private FileStore fileStore; private StandbyServer primary = null; private StandbyClient sync = null; @@ -100,12 +101,18 @@ @Activate private void activate(ComponentContext context) throws IOException, CertificateException { - if (storeProvider != null) { - segmentStore = storeProvider.getSegmentStore(); - } else { - throw new IllegalArgumentException( - "Missing SegmentStoreProvider service"); + if (storeProvider == null) { + throw new IllegalArgumentException("Missing SegmentStoreProvider service"); } + + SegmentStore segmentStore = storeProvider.getSegmentStore(); + + if (!(segmentStore instanceof FileStore)) { + throw new IllegalArgumentException("Unexpected SegmentStore implementation"); + } + + fileStore = (FileStore) segmentStore; + String mode = valueOf(context.getProperties().get(MODE)); if (MODE_PRIMARY.equals(mode)) { bootstrapMaster(context); @@ -136,7 +143,7 @@ int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT); String[] ranges = PropertiesUtil.toStringArray(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT); boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), SECURE_DEFAULT); - primary = new StandbyServer(port, segmentStore, ranges, secure); + primary = new StandbyServer(port, fileStore, ranges, secure); primary.start(); log.info("started primary on port {} with allowed ip ranges {}.", port, ranges); } @@ -150,7 +157,7 @@ int readTimeout = PropertiesUtil.toInteger(props.get(READ_TIMEOUT), READ_TIMEOUT_DEFAULT); boolean clean = PropertiesUtil.toBoolean(props.get(AUTO_CLEAN), AUTO_CLEAN_DEFAULT); - sync = new StandbyClient(host, port, segmentStore, secure, readTimeout, clean); + sync = new StandbyClient(host, port, fileStore, secure, readTimeout, clean); Dictionary dictionary = new Hashtable(); dictionary.put("scheduler.period", interval); dictionary.put("scheduler.concurrent", false); Index: oak-segment-tar/src/main/resources/OSGI-INF/metatype/metatype.properties =================================================================== --- oak-segment-tar/src/main/resources/OSGI-INF/metatype/metatype.properties (working copy) +++ oak-segment-tar/src/main/resources/OSGI-INF/metatype/metatype.properties (working copy) @@ -19,8 +19,8 @@ # suppress inspection "UnusedProperty" for whole file -org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStoreService.name = Apache Jackrabbit Oak TarMK Cold Standby service -org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStoreService.description = Provides continuous backups of TarMK based repositories +org.apache.jackrabbit.oak.segment.standby.store.StandbyStoreService.name = Apache Jackrabbit Oak Segment Tar Cold Standby Service +org.apache.jackrabbit.oak.segment.standby.store.StandbyStoreService.description = Provides continuous backups of repositories based on Segment Tar mode.name = Mode mode.description = TarMK Cold Standby mode (primary or standby) Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NetworkErrorProxy.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NetworkErrorProxy.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NetworkErrorProxy.java (working copy) @@ -16,34 +16,44 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment; +package org.apache.jackrabbit.oak.segment; + +import java.util.concurrent.TimeUnit; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; -import io.netty.channel.*; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +public class NetworkErrorProxy { -public class NetworkErrorProxy { static final Logger log = LoggerFactory .getLogger(NetworkErrorProxy.class); private final int inboundPort; + private final int outboundPort; + private final String host; + private ChannelFuture f; private ForwardHandler fh; EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); public NetworkErrorProxy(int inboundPort, String outboundHost, int outboundPort) { @@ -68,6 +78,7 @@ b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { + @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(NetworkErrorProxy.this.fh); @@ -105,12 +116,19 @@ } class ForwardHandler extends ChannelInboundHandlerAdapter { + private final String targetHost; + private final int targetPort; + public long transferredBytes; + public int skipPosition; + public int skipBytes; + public int flipPosition; + private ChannelFuture remote; public ForwardHandler(String host, int port) { @@ -128,13 +146,13 @@ cb.channel(NioSocketChannel.class); cb.handler(new ChannelInitializer() { + @Override public void initChannel(SocketChannel ch) throws Exception { SendBackHandler sbh = new SendBackHandler(c); if (ForwardHandler.this.flipPosition >= 0) { sbh = new BitFlipHandler(c, ForwardHandler.this.flipPosition); - } - else if (ForwardHandler.this.skipBytes > 0) { + } else if (ForwardHandler.this.skipBytes > 0) { sbh = new SwallowingHandler(c, ForwardHandler.this.skipPosition, ForwardHandler.this.skipBytes); } ch.pipeline().addFirst(sbh); @@ -155,7 +173,7 @@ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof ByteBuf) { - ByteBuf bb = (ByteBuf)msg; + ByteBuf bb = (ByteBuf) msg; this.transferredBytes += (bb.writerIndex() - bb.readerIndex()); } remote.channel().write(msg); @@ -174,7 +192,9 @@ } class SendBackHandler implements ChannelInboundHandler { + private final ChannelHandlerContext target; + public long transferredBytes; public SendBackHandler(ChannelHandlerContext ctx) { @@ -199,7 +219,7 @@ public int messageSize(Object msg) { if (msg instanceof ByteBuf) { - ByteBuf bb = (ByteBuf)msg; + ByteBuf bb = (ByteBuf) msg; return (bb.writerIndex() - bb.readerIndex()); } // unknown @@ -240,7 +260,9 @@ } class SwallowingHandler extends SendBackHandler { + private int skipStartingPos; + private int nrOfBytes; public SwallowingHandler(ChannelHandlerContext ctx, int skipStartingPos, int numberOfBytes) { @@ -251,13 +273,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { - ByteBuf bb = (ByteBuf)msg; + ByteBuf bb = (ByteBuf) msg; if (this.nrOfBytes > 0) { if (this.transferredBytes >= this.skipStartingPos) { bb.skipBytes(this.nrOfBytes); this.nrOfBytes = 0; - } - else { + } else { this.skipStartingPos -= messageSize(msg); } } @@ -268,6 +289,7 @@ } class BitFlipHandler extends SendBackHandler { + private static final Logger log = LoggerFactory .getLogger(BitFlipHandler.class); @@ -280,11 +302,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { - ByteBuf bb = (ByteBuf)msg; + ByteBuf bb = (ByteBuf) msg; log.debug("FlipHandler. Got Buffer size: " + bb.readableBytes()); if (this.startingPos >= 0) { if (this.transferredBytes + bb.readableBytes() >= this.startingPos) { - int i = this.startingPos - (int)this.transferredBytes; + int i = this.startingPos - (int) this.transferredBytes; log.info("FlipHandler flips byte at offset " + (this.transferredBytes + i)); byte b = (byte) (bb.getByte(i) ^ 0x01); bb.setByte(i, b); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentTestUtils.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentTestUtils.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentTestUtils.java (working copy) @@ -16,19 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment; +package org.apache.jackrabbit.oak.segment; + import static java.io.File.createTempFile; -import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE; -import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; -import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; -import java.util.Random; import org.apache.jackrabbit.oak.api.CommitFailedException; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -36,29 +32,9 @@ public final class SegmentTestUtils { - private SegmentTestUtils() { } - - public static int newValidOffset(Random random) { - return random.nextInt(MAX_SEGMENT_SIZE >> RECORD_ALIGN_BITS) << RECORD_ALIGN_BITS; + private SegmentTestUtils() { } - public static RecordId newRecordId(SegmentTracker factory, Random random) { - SegmentId id = factory.newDataSegmentId(); - RecordId r = new RecordId(id, newValidOffset(random)); - return r; - } - - public static void assertEqualStores(File d1, File d2) throws Exception { - FileStore f1 = FileStore.builder(d1).withMaxFileSize(1).withMemoryMapping(false).build(); - FileStore f2 = FileStore.builder(d2).withMaxFileSize(1).withMemoryMapping(false).build(); - try { - assertEquals(f1.getHead(), f2.getHead()); - } finally { - f1.close(); - f2.close(); - } - } - public static void addTestContent(NodeStore store, String child) throws CommitFailedException { NodeBuilder builder = store.getRoot().builder(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkTest.java (working copy) @@ -16,21 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.NetworkErrorProxy; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +package org.apache.jackrabbit.oak.segment.standby; + +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.jackrabbit.oak.segment.NetworkErrorProxy; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class BrokenNetworkTest extends TestBase { @Before @@ -119,7 +120,7 @@ p.flipByte(flipPosition); p.run(); - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS, ssl); server.start(); addTestContent(store, "server"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BulkTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BulkTest.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BulkTest.java (working copy) @@ -16,12 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +package org.apache.jackrabbit.oak.segment.standby; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +import java.lang.management.ManagementFactory; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -30,16 +41,6 @@ import org.junit.Before; import org.junit.Test; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import java.lang.management.ManagementFactory; -import java.util.Set; - -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; -import static org.junit.Assert.assertEquals; - public class BulkTest extends TestBase { @Before @@ -97,7 +98,7 @@ private void test(int number, int minExpectedSegments, int maxExpectedSegments, long minExpectedBytes, long maxExpectedBytes, boolean useSSL) throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); NodeBuilder rootbuilder = store.getRoot().builder(); NodeBuilder b = rootbuilder.child("store"); for (int j=0; j<=number / 1000; j++) { Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (working copy) @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,11 +37,12 @@ import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; -import org.apache.jackrabbit.oak.plugins.segment.NetworkErrorProxy; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.NetworkErrorProxy; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -62,12 +64,15 @@ fds.setMinRecordLength(4092); fds.init(path); DataStoreBlobStore blobStore = new DataStoreBlobStore(fds); - return FileStore.builder(d) - .withMaxFileSize(1) - .withMemoryMapping(false) - .withNoCache() - .withBlobStore(blobStore) - .build(); + return FileStoreBuilder.fileStoreBuilder(d) + .withMaxFileSize(1) + .withMemoryMapping(false) + .withNodeDeduplicationCacheSize(0) + .withSegmentCacheSize(0) + .withStringCacheSize(0) + .withTemplateCacheSize(0) + .withBlobStore(blobStore) + .build(); } protected byte[] addTestContent(NodeStore store, String child, int size) @@ -92,7 +97,7 @@ FileStore primary = getPrimary(); FileStore secondary = getSecondary(); - NodeStore store = SegmentNodeStore.builder(primary).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); final StandbyServer server = new StandbyServer(port, primary); server.start(); byte[] data = addTestContent(store, "server", blobSize); @@ -108,8 +113,8 @@ cl.close(); } - assertTrue(primary.size() < mb); - assertTrue(secondary.size() < mb); + assertTrue(primary.getStats().getApproximateSize() < mb); + assertTrue(secondary.getStats().getApproximateSize() < mb); PropertyState ps = secondary.getHead().getChildNode("root") .getChildNode("server").getProperty("testBlob"); @@ -168,7 +173,7 @@ p.flipByte(flipPosition); p.run(); - NodeStore store = SegmentNodeStore.builder(primary).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); final StandbyServer server = new StandbyServer(port, primary); server.start(); byte[] data = addTestContent(store, "server", blobSize); @@ -197,8 +202,8 @@ p.close(); } - assertTrue(primary.size() < mb); - assertTrue(secondary.size() < mb); + assertTrue(primary.getStats().getApproximateSize() < mb); + assertTrue(secondary.getStats().getApproximateSize() < mb); PropertyState ps = secondary.getHead().getChildNode("root") .getChildNode("server").getProperty("testBlob"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (working copy) @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir; +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.createTmpTargetDir; import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; import org.junit.After; Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalSharedStoreIT.java (working copy) @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir; +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.createTmpTargetDir; import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; import org.junit.After; public class ExternalSharedStoreIT extends DataStoreTestBase { Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeTest.java (working copy) @@ -16,20 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +package org.apache.jackrabbit.oak.segment.standby; + +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class FailoverIPRangeTest extends TestBase { @Before @@ -144,7 +145,7 @@ } private void createTestWithConfig(String host, String[] ipRanges, boolean expectedToWork) throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS, ipRanges); server.start(); addTestContent(store, "server"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java (working copy) @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.SegmentTestUtils; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; @@ -44,7 +45,7 @@ @Test public void testMultipleClients() throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS); server.start(); SegmentTestUtils.addTestContent(store, "server"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java (working copy) @@ -16,20 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +package org.apache.jackrabbit.oak.segment.standby; + +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class FailoverSslTestIT extends TestBase { @Before @@ -45,7 +46,7 @@ @Test public void testFailoverSecure() throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS, true); server.start(); addTestContent(store, "server"); @@ -65,7 +66,7 @@ @Test public void testFailoverSecureServerPlainClient() throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS, true); server.start(); addTestContent(store, "server"); @@ -85,7 +86,7 @@ @Test public void testFailoverPlainServerSecureClient() throws Exception { - NodeStore store = SegmentNodeStore.builder(storeS).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); final StandbyServer server = new StandbyServer(port, storeS); server.start(); addTestContent(store, "server"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java (working copy) @@ -16,24 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +package org.apache.jackrabbit.oak.segment.standby; -import javax.management.MBeanServer; -import javax.management.ObjectName; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; import java.lang.management.ManagementFactory; import java.util.Set; -import static junit.framework.Assert.*; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + public class MBeanTest extends TestBase { @Before Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java (working copy) @@ -16,22 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; -import org.apache.jackrabbit.oak.plugins.segment.DebugSegmentStore; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.addTestContent; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.addTestContent; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - public class RecoverTestIT extends TestBase { @Before @@ -45,41 +44,14 @@ } @Test - public void testBrokenConnection() throws Exception { - - NodeStore store = SegmentNodeStore.builder(storeS).build(); - DebugSegmentStore s = new DebugSegmentStore(storeS); - addTestContent(store, "server"); - storeS.flush(); - - final StandbyServer server = new StandbyServer(port, s); - s.createReadErrors = true; - server.start(); - - StandbyClient cl = newStandbyClient(storeC); - cl.run(); - - try { - assertFalse("store are not expected to be equal", storeS.getHead().equals(storeC.getHead())); - s.createReadErrors = false; - cl.run(); - assertEquals(storeS.getHead(), storeC.getHead()); - } finally { - server.close(); - cl.close(); - } - - } - - @Test public void testLocalChanges() throws Exception { - NodeStore store = SegmentNodeStore.builder(storeC).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(storeC).build(); addTestContent(store, "client"); final StandbyServer server = new StandbyServer(port, storeS); server.start(); - store = SegmentNodeStore.builder(storeS).build(); + store = SegmentNodeStoreBuilders.builder(storeS).build(); addTestContent(store, "server"); storeS.flush(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTest.java (working copy) @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -27,24 +28,24 @@ import java.io.IOException; import java.util.Random; +import com.google.common.io.ByteStreams; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; -import com.google.common.io.ByteStreams; - public class StandbyTest extends TestBase { @Before @@ -58,13 +59,14 @@ } @Test + @Ignore("OAK-4673") public void testSync() throws Exception { final int mb = 1 * 1024 * 1024; final int blobSize = 5 * mb; FileStore primary = getPrimary(); FileStore secondary = getSecondary(); - NodeStore store = SegmentNodeStore.builder(primary).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); final StandbyServer server = new StandbyServer(port, primary); server.start(); byte[] data = addTestContent(store, "server", blobSize, 150); @@ -80,8 +82,8 @@ cl.close(); } - assertTrue(primary.size() > blobSize); - assertTrue(secondary.size() > blobSize); + assertTrue(primary.getStats().getApproximateSize() > blobSize); + assertTrue(secondary.getStats().getApproximateSize() > blobSize); PropertyState ps = secondary.getHead().getChildNode("root") .getChildNode("server").getProperty("testBlob"); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java (working copy) @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; +package org.apache.jackrabbit.oak.segment.standby; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -27,15 +28,16 @@ import java.io.IOException; import java.util.Random; +import com.google.common.io.ByteStreams; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; -import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -44,8 +46,6 @@ import org.junit.Before; import org.junit.Test; -import com.google.common.io.ByteStreams; - public class StandbyTestIT extends TestBase { @Before @@ -91,7 +91,7 @@ FileStore primary = getPrimary(); FileStore secondary = getSecondary(); - NodeStore store = SegmentNodeStore.builder(primary).build(); + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); final StandbyServer server = new StandbyServer(port, primary); server.start(); byte[] data = addTestContent(store, "server", blobSize, dataNodes); @@ -108,7 +108,7 @@ assertEquals(primary.getHead(), secondary.getHead()); assertTrue(store.release(cp)); cl.cleanup(); - assertTrue(secondary.size() > blobSize); + assertTrue(secondary.getStats().getApproximateSize() > blobSize); } } finally { @@ -116,8 +116,8 @@ cl.close(); } - assertTrue(primary.size() > blobSize); - assertTrue(secondary.size() > blobSize); + assertTrue(primary.getStats().getApproximateSize() > blobSize); + assertTrue(secondary.getStats().getApproximateSize() > blobSize); long primaryFs = FileUtils.sizeOf(directoryS); long secondaryFs = FileUtils.sizeOf(directoryC); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java (working copy) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java (working copy) @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.jackrabbit.oak.plugins.segment.standby; -import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir; +package org.apache.jackrabbit.oak.segment.standby; + +import static org.apache.jackrabbit.oak.segment.SegmentTestUtils.createTmpTargetDir; +import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.junit.Assume.assumeTrue; import java.io.File; @@ -30,8 +32,8 @@ import org.apache.jackrabbit.oak.commons.CIHelper; import org.apache.jackrabbit.oak.commons.FixturesHelper; import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture; -import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; -import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient; import org.junit.BeforeClass; public class TestBase { @@ -80,11 +82,14 @@ } private static FileStore newFileStore(File directory) throws Exception { - return FileStore.builder(directory) - .withMaxFileSize(1) - .withMemoryMapping(false) - .withNoCache() - .build(); + return fileStoreBuilder(directory) + .withMaxFileSize(1) + .withMemoryMapping(false) + .withNodeDeduplicationCacheSize(0) + .withSegmentCacheSize(0) + .withStringCacheSize(0) + .withTemplateCacheSize(0) + .build(); } protected FileStore setupPrimary(File directory) throws Exception {