Index: oak-segment-tar/pom.xml
===================================================================
--- oak-segment-tar/pom.xml (revision 1757664)
+++ oak-segment-tar/pom.xml (working copy)
@@ -35,6 +35,7 @@
1.5.5
+ 4.0.23.Final
@@ -51,7 +52,24 @@
- commons-math3
+
+ commons-math3,
+ netty-*
+
+
+ com.google.protobuf.*;resolution:=optional,
+ com.jcraft.jzlib.*;resolution:=optional,
+ javassist.*;resolution:=optional,
+ org.apache.tomcat.jni.*;resolution:=optional,
+ org.bouncycastle.*;resolution:=optional,
+ org.eclipse.jetty.npn.*;resolution:=optional,
+ org.jboss.marshalling.*;resolution:=optional,
+ sun.misc.*;resolution:=optional,
+ sun.nio.ch.*;resolution:=optional,
+ sun.security.util.*;resolution:=optional,
+ sun.security.x509.*;resolution:=optional,
+ *
+
@@ -60,11 +78,49 @@
maven-scr-plugin
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.12
+
+
+
+ reserve-network-port
+
+ process-test-resources
+
+
+ standby.server.port
+ standby.proxy.port
+
+
+
+
+
+
org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+
+ ${standby.server.port}
+ ${standby.proxy.port}
+
+
+ **/BulkTest.java
+ **/MBeanTest.java
+ **/FailoverIPRangeTest.java
+
+
+
+
+ org.apache.maven.plugins
maven-failsafe-plugin
+ 2.19.1
SEGMENT_TAR
+ ${standby.server.port}
+ ${standby.proxy.port}
@@ -208,6 +264,39 @@
provided
+
+
+
+ io.netty
+ netty-common
+ ${netty.version}
+ provided
+
+
+ io.netty
+ netty-buffer
+ ${netty.version}
+ provided
+
+
+ io.netty
+ netty-transport
+ ${netty.version}
+ provided
+
+
+ io.netty
+ netty-codec
+ ${netty.version}
+ provided
+
+
+ io.netty
+ netty-handler
+ ${netty.version}
+ provided
+
+
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java (revision 1757664)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/PropertyTemplate.java (working copy)
@@ -31,7 +31,7 @@
* A property definition within a template (the property name, the type, and the
* index within the list of properties for the given node).
*/
-class PropertyTemplate implements Comparable {
+public class PropertyTemplate implements Comparable {
/**
* The index of this property within the list of properties in the node
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java (revision 1757664)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java (working copy)
@@ -30,9 +30,12 @@
import static org.apache.jackrabbit.oak.segment.SegmentWriter.BLOCK_SIZE;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
@@ -584,4 +587,12 @@
}
}
+ public void writeTo(OutputStream stream) throws IOException {
+ ByteBuffer buffer = data.duplicate();
+ WritableByteChannel channel = Channels.newChannel(stream);
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ }
+
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStoreProvider.java (working copy)
@@ -21,4 +21,5 @@
public interface SegmentStoreProvider {
SegmentStore getSegmentStore();
+
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/FailedRequestListener.java (working copy)
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+package org.apache.jackrabbit.oak.segment.standby.client;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy)
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+package org.apache.jackrabbit.oak.segment.standby.client;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
-import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
+import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetBlobReq;
+import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetSegmentReq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -32,14 +32,14 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.standby.codec.SegmentReply;
+import org.apache.jackrabbit.oak.segment.standby.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +131,7 @@
SegmentNodeState before = store.getHead();
SegmentNodeBuilder builder = before.builder();
- SegmentNodeState current = new SegmentNodeState(head);
+ SegmentNodeState current = store.newSegmentNodeState(head);
do {
try {
current.compareAgainstBaseState(before,
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyApplyDiff.java (working copy)
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+package org.apache.jackrabbit.oak.segment.standby.client;
+
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
import static org.apache.jackrabbit.oak.api.Type.BINARY;
@@ -27,11 +28,11 @@
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentBlob;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.standby.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
@@ -45,7 +46,7 @@
private final NodeBuilder builder;
- private final SegmentStore store;
+ private final StandbyStore store;
private final boolean hasDataStore;
@@ -60,12 +61,12 @@
*/
private final boolean logOnly;
- public StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+ public StandbyApplyDiff(NodeBuilder builder, StandbyStore store,
RemoteSegmentLoader loader) {
this(builder, store, loader, "/", false);
}
- private StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+ private StandbyApplyDiff(NodeBuilder builder, StandbyStore store,
RemoteSegmentLoader loader, String path, boolean logOnly) {
this.builder = builder;
this.store = store;
@@ -179,7 +180,7 @@
}
if (!logOnly) {
RecordId id = ((SegmentNodeState) after).getRecordId();
- builder.setChildNode(name, new SegmentNodeState(id));
+ builder.setChildNode(name, store.newSegmentNodeState(id));
}
if ("checkpoints".equals(name)) {
// if we're on the /checkpoints path, there's no need for a deep
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (working copy)
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+package org.apache.jackrabbit.oak.segment.standby.client;
import java.io.Closeable;
import java.lang.management.ManagementFactory;
@@ -44,12 +44,13 @@
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdDecoder;
+import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@
private long syncStartTimestamp;
private long syncEndTimestamp;
- public StandbyClient(String host, int port, SegmentStore store,
+ public StandbyClient(String host, int port, FileStore store,
boolean secure, int readTimeoutMs, boolean autoClean)
throws SSLException {
this.state = STATUS_INITIALIZING;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientHandler.java (working copy)
@@ -16,21 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.client;
-import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq;
+package org.apache.jackrabbit.oak.segment.standby.client;
+import static org.apache.jackrabbit.oak.segment.standby.codec.Messages.newGetHeadReq;
+
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdDecoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.ReplyDecoder;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/BlobEncoder.java (working copy)
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/IdArrayBasedBlob.java (working copy)
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (working copy)
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
public class Messages {
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdDecoder.java (working copy)
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
import java.io.IOException;
@@ -25,9 +25,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.CharsetUtil;
-
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@
String id = frame.toString(CharsetUtil.UTF_8);
try {
log.debug("received type {} with id {}", type, id);
- return RecordId.fromString(store.getTracker(), id);
+ return RecordId.fromString(store, id);
} catch (IllegalArgumentException e) {
log.error(e.getMessage(), e);
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RecordIdEncoder.java (working copy)
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.RecordId;
public class RecordIdEncoder extends MessageToByteEncoder {
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy)
@@ -17,27 +17,25 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ReplayingDecoder;
-
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.UUID;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder.DecodingState;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.standby.codec.ReplyDecoder.DecodingState;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
public class ReplyDecoder extends ReplayingDecoder {
public enum DecodingState {
@@ -47,12 +45,12 @@
private static final Logger log = LoggerFactory
.getLogger(ReplyDecoder.class);
- private final SegmentStore store;
+ private final StandbyStore store;
private int length = -1;
private byte type = -1;
- public ReplyDecoder(SegmentStore store) {
+ public ReplyDecoder(StandbyStore store) {
super(DecodingState.HEADER);
this.store = store;
}
@@ -128,9 +126,8 @@
Hasher hasher = Hashing.murmur3_32().newHasher();
long check = hasher.putBytes(segment).hash().padToLong();
if (hash == check) {
- SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
- Segment s = new Segment(store.getTracker(), id,
- ByteBuffer.wrap(segment));
+ SegmentId id = store.newSegmentId(msb, lsb);
+ Segment s = store.newSegment(id, ByteBuffer.wrap(segment));
log.debug("received segment with id {} and size {}", id, s.size());
return s;
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java (revision 1757664)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentDecoder.java (working copy)
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
-
-import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder.EXTRA_HEADERS_LEN;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
-@Deprecated
-public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
-
- private static final Logger log = LoggerFactory
- .getLogger(SegmentDecoder.class);
-
- /**
- * the maximum possible size a header message might have
- */
- private static final int MAX_LENGHT = Segment.MAX_SEGMENT_SIZE
- + EXTRA_HEADERS_LEN;
-
- private final SegmentStore store;
-
- public SegmentDecoder(SegmentStore store) {
- super(MAX_LENGHT, 0, 4, 0, 0);
- this.store = store;
- }
-
- @Override
- protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
- throws Exception {
- ByteBuf frame = (ByteBuf) super.decode(ctx, in);
- if (frame == null) {
- return null;
- }
- int len = frame.readInt();
- byte type = frame.readByte();
- long msb = frame.readLong();
- long lsb = frame.readLong();
- long hash = frame.readLong();
- byte[] segment = new byte[len - 25];
- frame.getBytes(29, segment);
- Hasher hasher = Hashing.murmur3_32().newHasher();
- long check = hasher.putBytes(segment).hash().padToLong();
- if (hash == check) {
- SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
- Segment s = new Segment(store.getTracker(), id,
- ByteBuffer.wrap(segment));
- log.debug("received type {} with id {} and size {}", type, id,
- s.size());
- return s;
- }
- log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
- return null;
-
- }
-
- @Override
- protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer,
- int index, int length) {
- return buffer.slice(index, length);
- }
-
-}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentEncoder.java (working copy)
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -25,8 +25,8 @@
import java.io.ByteArrayOutputStream;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/SegmentReply.java (working copy)
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+package org.apache.jackrabbit.oak.segment.standby.codec;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.segment.Segment;
public class SegmentReply {
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ClientStandbyStatusMBean.java (working copy)
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+package org.apache.jackrabbit.oak.segment.standby.jmx;
import org.apache.jackrabbit.oak.commons.jmx.Description;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/ObservablePartnerMBean.java (working copy)
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+package org.apache.jackrabbit.oak.segment.standby.jmx;
import org.apache.jackrabbit.oak.commons.jmx.Description;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/jmx/StandbyStatusMBean.java (working copy)
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+package org.apache.jackrabbit.oak.segment.standby.jmx;
import org.apache.jackrabbit.oak.commons.jmx.Description;
import javax.annotation.Nonnull;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy)
@@ -16,8 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.server;
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.io.Closeable;
+import java.lang.management.ManagementFactory;
+import java.security.cert.CertificateException;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import javax.net.ssl.SSLException;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -35,27 +47,17 @@
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.lang.management.ManagementFactory;
-import java.security.cert.CertificateException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.BlobEncoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdEncoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import javax.net.ssl.SSLException;
-
public class StandbyServer implements StandbyStatusMBean, Closeable {
private static final Logger log = LoggerFactory
@@ -72,23 +74,19 @@
private ChannelFuture channelFuture;
private boolean running;
- public StandbyServer(int port, final SegmentStore store)
- throws CertificateException, SSLException {
+ public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException {
this(port, store, null, false);
}
- public StandbyServer(int port, final SegmentStore store, boolean secure)
- throws CertificateException, SSLException {
+ public StandbyServer(int port, final FileStore store, boolean secure) throws CertificateException, SSLException {
this(port, store, null, secure);
}
- public StandbyServer(int port, final SegmentStore store, String[] allowedClientIPRanges)
- throws CertificateException, SSLException {
+ public StandbyServer(int port, final FileStore store, String[] allowedClientIPRanges) throws CertificateException, SSLException {
this(port, store, allowedClientIPRanges, false);
}
- public StandbyServer(int port, final SegmentStore store, String[] allowedClientIPRanges, boolean secure)
- throws CertificateException, SSLException {
+ public StandbyServer(int port, final FileStore store, String[] allowedClientIPRanges, boolean secure) throws CertificateException, SSLException {
this.port = port;
if (secure) {
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java (working copy)
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.server;
+package org.apache.jackrabbit.oak.segment.standby.server;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -28,15 +29,17 @@
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
-
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.IllegalRepositoryStateException;
-import org.apache.jackrabbit.oak.plugins.segment.RecordId;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+import org.apache.jackrabbit.oak.segment.standby.store.StandbyStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +49,12 @@
private static final Logger log = LoggerFactory
.getLogger(StandbyServerHandler.class);
- private final SegmentStore store;
+ private final FileStore store;
private final CommunicationObserver observer;
private final String[] allowedIPRanges;
public String state;
- public StandbyServerHandler(SegmentStore store, CommunicationObserver observer, String[] allowedIPRanges) {
+ public StandbyServerHandler(FileStore store, CommunicationObserver observer, String[] allowedIPRanges) {
this.store = store;
this.observer = observer;
this.allowedIPRanges = allowedIPRanges;
@@ -154,9 +157,7 @@
for (int i = 0; i < 10; i++) {
try {
- s = store.readSegment(new SegmentId(store.getTracker(),
- uuid.getMostSignificantBits(), uuid
- .getLeastSignificantBits()));
+ s = store.readSegment(store.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
} catch (IllegalRepositoryStateException e) {
// segment not found
log.debug("waiting for segment. Got exception: " + e.getMessage());
@@ -174,7 +175,7 @@
} else if (request.startsWith(Messages.GET_BLOB)) {
String bid = request.substring(Messages.GET_BLOB.length());
log.debug("request blob id {}", bid);
- Blob b = store.readBlob(bid);
+ Blob b = readBlob(bid);
log.debug("sending blob " + bid + " to " + client);
ctx.writeAndFlush(b);
observer.didSendBinariesBytes(clientID,
@@ -203,4 +204,15 @@
log.error("Exception occurred: " + cause.getMessage(), cause);
}
}
+
+ private Blob readBlob(String blobId) {
+ BlobStore blobStore = store.getBlobStore();
+
+ if (blobStore != null) {
+ return new BlobStoreBlob(blobStore, blobId);
+ }
+
+ throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] without specifying BlobStore");
+ }
+
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (working copy)
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+package org.apache.jackrabbit.oak.segment.standby.store;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.segment.standby.jmx.ObservablePartnerMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/RemoteSegmentLoader.java (working copy)
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+package org.apache.jackrabbit.oak.segment.standby.store;
import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.segment.Segment;
public interface RemoteSegmentLoader {
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStore.java (working copy)
@@ -14,59 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+package org.apache.jackrabbit.oak.segment.standby.store;
+
import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.plugins.segment.Segment;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
-import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandbyStore implements SegmentStore {
+public class StandbyStore implements SegmentStore, Closeable {
private static final Logger log = LoggerFactory.getLogger(StandbyStore.class);
- private final SegmentTracker tracker = new SegmentTracker(this);
+ private final FileStore delegate;
- private final SegmentStore delegate;
-
private RemoteSegmentLoader loader;
- public StandbyStore(SegmentStore delegate) {
+ public StandbyStore(FileStore delegate) {
this.delegate = delegate;
}
+ @Nonnull
@Override
- public SegmentTracker getTracker() {
- return tracker;
+ public SegmentId newSegmentId(long msb, long lsb) {
+ return delegate.newSegmentId(msb, lsb);
}
+ @Nonnull
@Override
- public SegmentNodeState getHead() {
- return delegate.getHead();
+ public SegmentId newBulkSegmentId() {
+ return delegate.newBulkSegmentId();
}
+ @Nonnull
@Override
- public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- return delegate.setHead(base, head);
+ public SegmentId newDataSegmentId() {
+ return delegate.newDataSegmentId();
}
@Override
@@ -74,6 +81,19 @@
return delegate.containsSegment(id);
}
+ private List getReferencedIds(Segment s) {
+ List segmentIds = new ArrayList<>();
+
+ for (int i = 0; i < s.getReferencedSegmentIdCount(); i++) {
+ UUID uuid = s.getReferencedSegmentId(i);
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+ segmentIds.add(delegate.newSegmentId(msb, lsb));
+ }
+
+ return segmentIds;
+ }
+
@Override
public Segment readSegment(SegmentId sid) {
callId++;
@@ -114,7 +134,7 @@
s.size());
if (id.isDataSegmentId()) {
boolean hasPendingRefs = false;
- List refs = s.getReferencedIds();
+ List refs = getReferencedIds(s);
if (logRefs) {
log.debug("{} -> {}", id, refs);
}
@@ -181,8 +201,7 @@
}
public void persist(SegmentId in, Segment s) {
- SegmentId id = delegate.getTracker().getSegmentId(
- in.getMostSignificantBits(), in.getLeastSignificantBits());
+ SegmentId id = delegate.newSegmentId(in.getMostSignificantBits(), in.getLeastSignificantBits());
log.debug("persisting segment {} with size {}", id, s.size());
try {
ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
@@ -220,38 +239,36 @@
delegate.close();
}
- @Override
- public Blob readBlob(String reference) {
- return delegate.readBlob(reference);
+ public long size() {
+ return delegate.getStats().getApproximateSize();
}
- @Override
- public BlobStore getBlobStore() {
- return delegate.getBlobStore();
+ public void cleanup() {
+ try {
+ delegate.flush();
+ } catch (IOException e) {
+ log.error("Error running cleanup", e);
+ }
}
- @Override
- public void gc() {
- delegate.gc();
+ public SegmentNodeState getHead() {
+ return delegate.getHead();
}
- public long size() {
- if (delegate instanceof FileStore) {
- return ((FileStore) delegate).size();
- }
- return -1;
+ public SegmentNodeState newSegmentNodeState(RecordId id) {
+ return delegate.getReader().readNode(id);
}
- public void cleanup() {
- if (delegate instanceof FileStore) {
- try {
- delegate.getTracker().getWriter().dropCache();
- ((FileStore) delegate).flush(true);
- } catch (IOException e) {
- log.error("Error running cleanup", e);
- }
- } else {
- log.warn("Delegate is not a FileStore, ignoring cleanup call");
- }
+ public boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) {
+ return delegate.getRevisions().setHead(expected.getRecordId(), head.getRecordId());
}
+
+ public Segment newSegment(SegmentId segmentId, ByteBuffer buffer) {
+ return new Segment(delegate, delegate.getReader(), segmentId, buffer);
+ }
+
+ public BlobStore getBlobStore() {
+ return delegate.getBlobStore();
+ }
+
}
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (working copy)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (working copy)
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+package org.apache.jackrabbit.oak.segment.standby.store;
import static java.lang.String.valueOf;
import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC;
@@ -35,10 +35,11 @@
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStoreProvider;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
-import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
-import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.SegmentStoreProvider;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.segment.standby.server.StandbyServer;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -91,7 +92,7 @@
@Reference(policy = STATIC, policyOption = GREEDY)
private SegmentStoreProvider storeProvider = null;
- private SegmentStore segmentStore;
+ private FileStore fileStore;
private StandbyServer primary = null;
private StandbyClient sync = null;
@@ -100,12 +101,18 @@
@Activate
private void activate(ComponentContext context) throws IOException, CertificateException {
- if (storeProvider != null) {
- segmentStore = storeProvider.getSegmentStore();
- } else {
- throw new IllegalArgumentException(
- "Missing SegmentStoreProvider service");
+ if (storeProvider == null) {
+ throw new IllegalArgumentException("Missing SegmentStoreProvider service");
}
+
+ SegmentStore segmentStore = storeProvider.getSegmentStore();
+
+ if (!(segmentStore instanceof FileStore)) {
+ throw new IllegalArgumentException("Unexpected SegmentStore implementation");
+ }
+
+ fileStore = (FileStore) segmentStore;
+
String mode = valueOf(context.getProperties().get(MODE));
if (MODE_PRIMARY.equals(mode)) {
bootstrapMaster(context);
@@ -136,7 +143,7 @@
int port = PropertiesUtil.toInteger(props.get(PORT), PORT_DEFAULT);
String[] ranges = PropertiesUtil.toStringArray(props.get(ALLOWED_CLIENT_IP_RANGES), ALLOWED_CLIENT_IP_RANGES_DEFAULT);
boolean secure = PropertiesUtil.toBoolean(props.get(SECURE), SECURE_DEFAULT);
- primary = new StandbyServer(port, segmentStore, ranges, secure);
+ primary = new StandbyServer(port, fileStore, ranges, secure);
primary.start();
log.info("started primary on port {} with allowed ip ranges {}.", port, ranges);
}
@@ -150,7 +157,7 @@
int readTimeout = PropertiesUtil.toInteger(props.get(READ_TIMEOUT), READ_TIMEOUT_DEFAULT);
boolean clean = PropertiesUtil.toBoolean(props.get(AUTO_CLEAN), AUTO_CLEAN_DEFAULT);
- sync = new StandbyClient(host, port, segmentStore, secure, readTimeout, clean);
+ sync = new StandbyClient(host, port, fileStore, secure, readTimeout, clean);
Dictionary