Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (revision 1605416)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (working copy)
@@ -46,14 +46,14 @@
private volatile Segment segment;
- SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) {
+ public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) {
this.tracker = tracker;
this.msb = msb;
this.lsb = lsb;
this.segment = segment;
}
- SegmentId(SegmentTracker tracker, long msb, long lsb) {
+ public SegmentId(SegmentTracker tracker, long msb, long lsb) {
this(tracker, msb, lsb, null);
}
Index: oak-tarmk-failover/README.md
===================================================================
--- oak-tarmk-failover/README.md (revision 0)
+++ oak-tarmk-failover/README.md (revision 0)
@@ -0,0 +1,56 @@
+Oak TarMK Failover
+==================
+
+Failover
+-------
+
+The component should be installed when failover support is needed.
+
+The setup is expected to be: one master to one/many slaves nodes.
+The slave will periodically poll the master for the head state over http
+on a custom port, if it changed, it should pull in all the new segments.
+
+Setup in OSGi
+-------------
+
+The FailoverStoreService will be enabled if there is a config available for the component.
+Expected setup is:
+ - for the 'master' mode: mode=master (default), port=8023 (default)
+ - for the 'slave' mode: mode=slave, port=8023 (default), master.host=127.0.0.1 (default), interval=5 (default)
+
+Port is used in both configs: for master is the port where the server will be available on the host, for the slave it
+represents the port on the master it needs to connect to.
+Master host represents the master host info.
+Interval represents how often the sync thread should run, in seconds.
+
+TODO
+----
+
+ - timeout handling doesn't cover everything on both server and slave
+ - error handling on the slave still has some issues (the slave hangs)
+ - maybe enable compression of the segments over the wire
+ - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer
+ - slave runmode could possibly be a read-only mode (no writes permitted)
+
+License
+-------
+
+(see the top-level [LICENSE.txt](../LICENSE.txt) for full license details)
+
+Collective work: Copyright 2012 The Apache Software Foundation.
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
Index: oak-tarmk-failover/pom.xml
===================================================================
--- oak-tarmk-failover/pom.xml (revision 0)
+++ oak-tarmk-failover/pom.xml (revision 0)
@@ -0,0 +1,150 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.jackrabbit
+ oak-parent
+ 1.1-SNAPSHOT
+ ../oak-parent/pom.xml
+
+
+ oak-tarmk-failover
+ Oak TarMK Failover
+ bundle
+ Oak TarMK failover module
+
+
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+
+
+
+ com.google.protobuf.*;resolution:=optional,
+ com.jcraft.jzlib.*;resolution:=optional,
+ javassist.*;resolution:=optional,
+ org.apache.tomcat.jni.*;resolution:=optional,
+ org.bouncycastle.*;resolution:=optional,
+ org.eclipse.jetty.npn.*;resolution:=optional,
+ org.jboss.marshalling.*;resolution:=optional,
+ sun.misc.*;resolution:=optional,
+ sun.nio.ch.*;resolution:=optional,
+ sun.security.util.*;resolution:=optional,
+ sun.security.x509.*;resolution:=optional,
+ *
+
+ netty-*;inline=true
+
+
+
+
+ org.apache.felix
+ maven-scr-plugin
+
+
+
+
+
+
+
+ org.osgi
+ org.osgi.core
+ provided
+
+
+ org.osgi
+ org.osgi.compendium
+ provided
+
+
+ biz.aQute.bnd
+ bndlib
+ provided
+
+
+ org.apache.felix
+ org.apache.felix.scr.annotations
+ provided
+
+
+
+ org.apache.jackrabbit
+ oak-core
+ ${project.version}
+ provided
+
+
+
+ io.netty
+ netty-common
+ 4.0.20.Final
+ provided
+
+
+ io.netty
+ netty-buffer
+ 4.0.20.Final
+ provided
+
+
+ io.netty
+ netty-transport
+ 4.0.20.Final
+ provided
+
+
+ io.netty
+ netty-codec
+ 4.0.20.Final
+ provided
+
+
+ io.netty
+ netty-handler
+ 4.0.20.Final
+ provided
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
+
+ junit
+ junit
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java (revision 0)
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.client;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.concurrent.Promise;
+
+public class FailedRequestListener implements ChannelFutureListener {
+
+ private final Promise> promise;
+
+ public FailedRequestListener(Promise> promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ promise.setFailure(future.cause());
+ future.channel().close();
+ } else {
+ future.channel().read();
+ }
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (revision 0)
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class FailoverClient implements Runnable, Closeable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FailoverClient.class);
+
+ private final String host;
+ private final int port;
+ private int readTimeoutMs = 10000;
+
+ private final FailoverStore store;
+ private FailoverClientHandler handler;
+ private EventLoopGroup group;
+ private EventExecutorGroup executor;
+
+ public FailoverClient(String host, int port, SegmentStore store) {
+ this.host = host;
+ this.port = port;
+ this.store = new FailoverStore(store);
+ }
+
+ public void run() {
+
+ this.executor = new DefaultEventExecutorGroup(4);
+ this.handler = new FailoverClientHandler(this.store, executor);
+
+ group = new NioEventLoopGroup();
+ Bootstrap b = new Bootstrap();
+ b.group(group);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs);
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.SO_REUSEADDR, true);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+
+ b.handler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+
+ // p.addLast(new LoggingHandler(LogLevel.INFO));
+ // Enable stream compression
+ // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
+ // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
+
+ // WriteTimeoutHandler & ReadTimeoutHandler
+ p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
+ readTimeoutMs, TimeUnit.MILLISECONDS));
+
+ p.addLast(new StringEncoder(CharsetUtil.UTF_8));
+ p.addLast(new RecordIdDecoder(store));
+ p.addLast(executor, handler);
+ }
+ });
+ try {
+ // Start the client.
+ ChannelFuture f = b.connect(host, port).sync();
+ // Wait until the connection is closed.
+ f.channel().closeFuture().sync();
+ } catch (Exception e) {
+ log.error("Failed synchronizing state.", e);
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (group != null && !group.isShuttingDown()) {
+ group.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ if (executor != null && !executor.isShuttingDown()) {
+ executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (revision 0)
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.client;
+
+import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverClientHandler extends
+ SimpleChannelInboundHandler implements Closeable {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FailoverClientHandler.class);
+
+ private final FailoverStore store;
+ private final EventExecutorGroup executor;
+
+ private ChannelHandlerContext ctx;
+
+ private Promise headPromise;
+
+ public FailoverClientHandler(final FailoverStore store,
+ EventExecutorGroup executor) {
+ this.store = store;
+ this.executor = executor;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ sendHeadRequest();
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RecordId msg)
+ throws Exception {
+ headPromise.setSuccess(msg);
+ };
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ private synchronized void sendHeadRequest() {
+ headPromise = ctx.executor().newPromise();
+ headPromise.addListener(new GenericFutureListener>() {
+ @Override
+ public void operationComplete(Future future) {
+ if (future.isSuccess()) {
+ try {
+ setHead(future.get());
+ } catch (Exception e) {
+ exceptionCaught(ctx, e);
+ }
+ } else {
+ exceptionCaught(ctx, future.cause());
+ }
+ }
+ });
+ ctx.writeAndFlush(newGetHeadReq()).addListener(
+ new FailedRequestListener(headPromise));
+ }
+
+ synchronized void setHead(RecordId head) {
+ headPromise = null;
+
+ if (store.getHead().getRecordId().equals(head)) {
+ // all sync'ed up
+ log.info("no changes on sync.");
+ ctx.close();
+ return;
+ }
+ ctx.pipeline().remove(RecordIdDecoder.class);
+ ctx.pipeline().remove(this);
+ ctx.pipeline().addLast(new SegmentDecoder(store));
+
+ e1 = new DefaultEventExecutorGroup(4);
+ SegmentPreLoaderHandler h1 = new SegmentPreLoaderHandler();
+ ctx.pipeline().addLast(e1, h1);
+
+ e2 = new DefaultEventExecutorGroup(4);
+ SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head, e1, e2);
+ ctx.pipeline().addLast(e2, h2);
+
+ h1.channelActive(ctx);
+ h2.channelActive(ctx);
+ }
+
+ private EventExecutorGroup e1;
+ private EventExecutorGroup e2;
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("Failed synchronizing state.", cause);
+ close();
+ }
+
+ @Override
+ public void close() {
+ ctx.close();
+ if (!executor.isShuttingDown()) {
+ executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ if (e1 != null && !e1.isShuttingDown()) {
+ e1.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+ }
+ if (e1 != null && !e2.isShuttingDown()) {
+ e2.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+ }
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (revision 0)
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.client;
+
+import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
+import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter
+ implements RemoteSegmentLoader {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SegmentLoaderHandler.class);
+
+ private final FailoverStore store;
+ private final RecordId head;
+ private final EventExecutorGroup e1;
+ private final EventExecutorGroup e2;
+
+ private int timeoutMs = 5000;
+
+ private ChannelHandlerContext ctx;
+
+ final BlockingQueue segment = new LinkedBlockingQueue();
+
+ public SegmentLoaderHandler(final FailoverStore store, RecordId head,
+ EventExecutorGroup e1, EventExecutorGroup e2) {
+ this.store = store;
+ this.head = head;
+ this.e1 = e1;
+ this.e2 = e2;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ initSync();
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
+ throws Exception {
+ if (evt instanceof SegmentReply) {
+ segment.offer(((SegmentReply) evt).getSegment());
+ }
+ }
+
+ private void initSync() {
+ log.debug("new head id " + head);
+ long t = System.currentTimeMillis();
+
+ try {
+ store.setLoader(this);
+ SegmentNodeState before = store.getHead();
+ SegmentNodeBuilder builder = before.builder();
+
+ SegmentNodeState current = new SegmentNodeState(head);
+ current.compareAgainstBaseState(before, new ApplyDiff(builder));
+
+ boolean ok = store.setHead(before, builder.getNodeState());
+ log.info("#updated state (set head {}) in {}ms.", ok,
+ System.currentTimeMillis() - t);
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public Segment readSegment(final SegmentId id) {
+ ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener);
+ return getSegment();
+ }
+
+ private final ChannelFutureListener reqListener = new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ exceptionCaught(ctx, future.cause());
+ }
+ }
+ };
+
+ public Segment getSegment() {
+ boolean interrupted = false;
+ try {
+ for (;;) {
+ try {
+ return segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("Failed synchronizing state.", cause);
+ close();
+ }
+
+ @Override
+ public void close() {
+ ctx.close();
+ if (e1 != null && !e1.isShuttingDown()) {
+ e1.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+ }
+ if (e1 != null && !e2.isShuttingDown()) {
+ e2.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly();
+ }
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (revision 0)
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.client;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply;
+
+public class SegmentPreLoaderHandler extends
+ SimpleChannelInboundHandler {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Segment msg)
+ throws Exception {
+ ctx.fireUserEventTriggered(new SegmentReply(msg));
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (revision 0)
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+
+public class Messages {
+
+ public static final byte HEADER_RECORD = 0x00;
+ public static final byte HEADER_SEGMENT = 0x01;
+
+ public static final String GET_HEAD = "h";
+
+ public static final String GET_SEGMENT = "s.";
+
+ public static String newGetHeadReq() {
+ return GET_HEAD + "\r\n";
+ }
+
+ public static String newGetSegmentReq(SegmentId sid) {
+ return GET_SEGMENT + sid.toString() + "\r\n";
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (revision 0)
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.CharsetUtil;
+
+import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordIdDecoder extends LengthFieldBasedFrameDecoder {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(RecordIdDecoder.class);
+
+ private final SegmentStore store;
+
+ public RecordIdDecoder(SegmentStore store) {
+ super(64, 0, 4, 0, 4);
+ this.store = store;
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
+ throws Exception {
+ ByteBuf frame = (ByteBuf) super.decode(ctx, in);
+ if (frame == null) {
+ return null;
+ }
+ byte type = frame.readByte();
+ frame.discardReadBytes();
+ String id = frame.toString(CharsetUtil.UTF_8);
+ try {
+ log.debug("received type {} with id {}", type, id);
+ return RecordId.fromString(store.getTracker(), id);
+ } catch (IllegalArgumentException e) {
+ log.error(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ @Override
+ protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer,
+ int index, int length) {
+ return buffer.slice(index, length);
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java (revision 0)
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.CharsetUtil;
+
+import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+
+public class RecordIdEncoder extends MessageToByteEncoder {
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, RecordId msg, ByteBuf out)
+ throws Exception {
+ byte[] body = msg.toString().getBytes(CharsetUtil.UTF_8);
+ out.writeInt(body.length + 1);
+ out.writeByte(Messages.HEADER_RECORD);
+ out.writeBytes(body);
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (revision 0)
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SegmentDecoder.class);
+
+ private final SegmentStore store;
+
+ public SegmentDecoder(SegmentStore store) {
+ super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4);
+ this.store = store;
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
+ throws Exception {
+ ByteBuf frame = (ByteBuf) super.decode(ctx, in);
+ if (frame == null) {
+ return null;
+ }
+ byte type = frame.readByte();
+ long msb = frame.readLong();
+ long lsb = frame.readLong();
+ frame.discardReadBytes();
+ SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+ Segment s = new Segment(store.getTracker(), id, frame.nioBuffer());
+ log.debug("received type {} with id {} and size {}", type, id, s.size());
+ return s;
+ }
+
+ @Override
+ protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer,
+ int index, int length) {
+ return buffer.slice(index, length);
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (revision 0)
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+
+public class SegmentEncoder extends MessageToByteEncoder {
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out)
+ throws Exception {
+ SegmentId id = s.getSegmentId();
+ int len = s.size() + 17;
+ out.writeInt(len);
+ out.writeByte(Messages.HEADER_SEGMENT);
+ out.writeLong(id.getMostSignificantBits());
+ out.writeLong(id.getLeastSignificantBits());
+ ByteBufOutputStream bout = new ByteBufOutputStream(out);
+ s.writeTo(bout);
+ bout.flush();
+ bout.close();
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java (revision 0)
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+
+public class SegmentReply {
+
+ private final Segment segment;
+
+ public SegmentReply(Segment segment) {
+ this.segment = segment;
+ }
+
+ public Segment getSegment() {
+ return this.segment;
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (revision 0)
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.server;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.util.CharsetUtil;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdEncoder;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder;
+
+public class FailoverServer implements Closeable {
+
+ private final int port;
+
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
+ private final ServerBootstrap b;
+
+ public FailoverServer(int port, final SegmentStore store) {
+ this.port = port;
+
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+
+ b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup);
+ b.channel(NioServerSocketChannel.class);
+
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.SO_REUSEADDR, true);
+ b.childOption(ChannelOption.TCP_NODELAY, true);
+ b.childOption(ChannelOption.SO_REUSEADDR, true);
+ b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ b.childHandler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new LineBasedFrameDecoder(8192));
+ p.addLast(new StringDecoder(CharsetUtil.UTF_8));
+ p.addLast(new RecordIdEncoder());
+ p.addLast(new SegmentEncoder());
+ p.addLast(new FailoverServerHandler(store));
+ }
+ });
+ }
+
+ public void start() {
+ try {
+ b.bind("127.0.0.1", port).sync().channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ close();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (bossGroup != null && !bossGroup.isShuttingDown()) {
+ bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ }
+ if (workerGroup != null && !workerGroup.isShuttingDown()) {
+ workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS)
+ .syncUninterruptibly();
+ ;
+ }
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (revision 0)
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.server;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Sharable
+public class FailoverServerHandler extends SimpleChannelInboundHandler {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(FailoverServerHandler.class);
+
+ private final SegmentStore store;
+
+ public FailoverServerHandler(SegmentStore store) {
+ this.store = store;
+ }
+
+ private RecordId headId() {
+ if (store != null) {
+ return store.getHead().getRecordId();
+ }
+ return null;
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, String request)
+ throws Exception {
+ if (Messages.GET_HEAD.equalsIgnoreCase(request)) {
+ RecordId r = headId();
+ if (r != null) {
+ ctx.writeAndFlush(r);
+ } else {
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ }
+
+ } else if (request.startsWith(Messages.GET_SEGMENT)) {
+ String sid = request.substring(Messages.GET_SEGMENT.length());
+ log.debug("request segment id {}", sid);
+ UUID uuid = UUID.fromString(sid);
+
+ Segment s = null;
+
+ for (int i = 0; i < 3; i++) {
+ try {
+ s = store.readSegment(new SegmentId(store.getTracker(),
+ uuid.getMostSignificantBits(), uuid
+ .getLeastSignificantBits()));
+ } catch (IllegalStateException e) {
+ // segment not found
+ log.warn(e.getMessage());
+ }
+ if (s != null) {
+ break;
+ } else {
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ }
+
+ if (s != null) {
+ ctx.writeAndFlush(s);
+ } else {
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ }
+ } else {
+ log.warn("Unknown request {}, ignoring.", request);
+ ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
+ }
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error(cause.getMessage(), cause);
+ ctx.close();
+ }
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java (revision 0)
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+public class FailoverStore implements SegmentStore {
+
+ private final SegmentTracker tracker = new SegmentTracker(this);
+
+ private final SegmentStore delegate;
+
+ private RemoteSegmentLoader loader;
+
+ public FailoverStore(SegmentStore delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public SegmentTracker getTracker() {
+ return tracker;
+ }
+
+ @Override
+ public SegmentNodeState getHead() {
+ return delegate.getHead();
+ }
+
+ @Override
+ public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+ return delegate.setHead(base, head);
+ }
+
+ @Override
+ public boolean containsSegment(SegmentId id) {
+ return delegate.containsSegment(id);
+ }
+
+ @Override
+ public Segment readSegment(SegmentId sid) {
+
+ Deque ids = new ArrayDeque();
+ ids.offer(sid);
+ int err = 0;
+ Set seen = new HashSet();
+
+ while (!ids.isEmpty()) {
+ SegmentId id = ids.remove();
+ if (!seen.contains(id) && !delegate.containsSegment(id)) {
+ Segment s = loader.readSegment(id);
+ if (s != null) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream(
+ s.size());
+ if (id.isDataSegmentId()) {
+ ids.addAll(s.getReferencedIds());
+ }
+ try {
+ s.writeTo(bout);
+ writeSegment(id, bout.toByteArray(), 0, s.size());
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Unable to write remote segment " + id, e);
+ }
+ seen.add(id);
+ ids.removeAll(seen);
+ err = 0;
+ } else {
+ if (err == 5) {
+ loader.close();
+ throw new IllegalStateException(
+ "Unable to load remote segment " + id);
+ }
+ err++;
+ ids.addFirst(id);
+ }
+ } else {
+ seen.add(id);
+ }
+ }
+
+ return delegate.readSegment(sid);
+ }
+
+ @Override
+ public void writeSegment(SegmentId id, byte[] bytes, int offset, int length) {
+ delegate.writeSegment(id, bytes, offset, length);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public Blob readBlob(String reference) {
+ return delegate.readBlob(reference);
+ }
+
+ @Override
+ public BlobStore getBlobStore() {
+ return delegate.getBlobStore();
+ }
+
+ @Override
+ public void gc() {
+ delegate.gc();
+ }
+
+ public void setLoader(RemoteSegmentLoader loader) {
+ this.loader = loader;
+ }
+
+}
Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java
===================================================================
--- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (revision 0)
+++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (revision 0)
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.failover.store;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.String.valueOf;
+import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC;
+import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient;
+import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(policy = ConfigurationPolicy.REQUIRE)
+public class FailoverStoreService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Property(label = "Mode", description = "TarMK Failover mode (master or slave)", options = {
+ @PropertyOption(name = "master", value = "master"),
+ @PropertyOption(name = "slave", value = "slave") }, value = "master")
+ public static final String MODE = "mode";
+
+ @Property(label = "Port", description = "TarMK Failover port", intValue = 8023)
+ public static final String PORT = "port";
+
+ @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = "127.0.0.1")
+ public static final String MASTER_HOST = "master.host";
+
+ @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = 5)
+ public static final String INTERVAL = "interval";
+
+ private static String MODE_MASTER = "master";
+
+ private static String MODE_SLAVE = "slave";
+
+ @Reference(policy = STATIC, policyOption = GREEDY)
+ private NodeStore store = null;
+
+ private SegmentStore segmentStore;
+
+ private FailoverServer master = null;
+ private FailoverClient sync = null;
+ private ServiceRegistration syncReg = null;
+
+ @Activate
+ private void activate(ComponentContext context) throws IOException {
+ if (store instanceof SegmentNodeStoreService) {
+ segmentStore = ((SegmentNodeStoreService) store).getSegmentStore();
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected NodeStore impl, expecting SegmentNodeStoreService, got "
+ + store.getClass());
+ }
+ String mode = valueOf(context.getProperties().get(MODE));
+ if (MODE_MASTER.equals(mode)) {
+ bootstrapMaster(context);
+ } else if (MODE_SLAVE.equals(mode)) {
+ bootstrapSlave(context);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected 'mode' param, expecting 'master' or 'slave' got "
+ + mode);
+ }
+ }
+
+ @Deactivate
+ public synchronized void deactivate() {
+ if (master != null) {
+ master.close();
+ }
+ if (sync != null) {
+ sync.close();
+ }
+ if (syncReg != null) {
+ syncReg.unregister();
+ }
+ }
+
+ private void bootstrapMaster(ComponentContext context) {
+ Dictionary, ?> props = context.getProperties();
+ int port = parseInt(valueOf(props.get(PORT)));
+ master = new FailoverServer(port, segmentStore);
+ master.start();
+ log.info("started failover master on port {}.", port);
+ }
+
+ private void bootstrapSlave(ComponentContext context) {
+ Dictionary, ?> props = context.getProperties();
+ int port = parseInt(valueOf(props.get(PORT)));
+ long interval = parseInt(valueOf(props.get(INTERVAL)));
+ String host = valueOf(context.getProperties().get(MASTER_HOST));
+
+ sync = new FailoverClient(host, port, segmentStore);
+ Dictionary