Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (revision 1605416) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (working copy) @@ -46,14 +46,14 @@ private volatile Segment segment; - SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) { + public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) { this.tracker = tracker; this.msb = msb; this.lsb = lsb; this.segment = segment; } - SegmentId(SegmentTracker tracker, long msb, long lsb) { + public SegmentId(SegmentTracker tracker, long msb, long lsb) { this(tracker, msb, lsb, null); } Index: oak-tarmk-failover/README.md =================================================================== --- oak-tarmk-failover/README.md (revision 0) +++ oak-tarmk-failover/README.md (revision 0) @@ -0,0 +1,56 @@ +Oak TarMK Failover +================== + +Failover +------- + +The component should be installed when failover support is needed. + +The setup is expected to be: one master to one/many slaves nodes. +The slave will periodically poll the master for the head state over http +on a custom port, if it changed, it should pull in all the new segments. + +Setup in OSGi +------------- + +The FailoverStoreService will be enabled if there is a config available for the component. +Expected setup is: + - for the 'master' mode: mode=master (default), port=8023 (default) + - for the 'slave' mode: mode=slave, port=8023 (default), master.host=127.0.0.1 (default), interval=5 (default) + +Port is used in both configs: for master is the port where the server will be available on the host, for the slave it +represents the port on the master it needs to connect to. +Master host represents the master host info. +Interval represents how often the sync thread should run, in seconds. + +TODO +---- + + - timeout handling doesn't cover everything on both server and slave + - error handling on the slave still has some issues (the slave hangs) + - maybe enable compression of the segments over the wire + - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer + - slave runmode could possibly be a read-only mode (no writes permitted) + +License +------- + +(see the top-level [LICENSE.txt](../LICENSE.txt) for full license details) + +Collective work: Copyright 2012 The Apache Software Foundation. + +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + Index: oak-tarmk-failover/pom.xml =================================================================== --- oak-tarmk-failover/pom.xml (revision 0) +++ oak-tarmk-failover/pom.xml (revision 0) @@ -0,0 +1,150 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.1-SNAPSHOT + ../oak-parent/pom.xml + + + oak-tarmk-failover + Oak TarMK Failover + bundle + Oak TarMK failover module + + + + + + + + org.apache.felix + maven-bundle-plugin + + + + com.google.protobuf.*;resolution:=optional, + com.jcraft.jzlib.*;resolution:=optional, + javassist.*;resolution:=optional, + org.apache.tomcat.jni.*;resolution:=optional, + org.bouncycastle.*;resolution:=optional, + org.eclipse.jetty.npn.*;resolution:=optional, + org.jboss.marshalling.*;resolution:=optional, + sun.misc.*;resolution:=optional, + sun.nio.ch.*;resolution:=optional, + sun.security.util.*;resolution:=optional, + sun.security.x509.*;resolution:=optional, + * + + netty-*;inline=true + + + + + org.apache.felix + maven-scr-plugin + + + + + + + + org.osgi + org.osgi.core + provided + + + org.osgi + org.osgi.compendium + provided + + + biz.aQute.bnd + bndlib + provided + + + org.apache.felix + org.apache.felix.scr.annotations + provided + + + + org.apache.jackrabbit + oak-core + ${project.version} + provided + + + + io.netty + netty-common + 4.0.20.Final + provided + + + io.netty + netty-buffer + 4.0.20.Final + provided + + + io.netty + netty-transport + 4.0.20.Final + provided + + + io.netty + netty-codec + 4.0.20.Final + provided + + + io.netty + netty-handler + 4.0.20.Final + provided + + + + + org.slf4j + slf4j-api + + + + + com.google.code.findbugs + jsr305 + + + + + junit + junit + test + + + ch.qos.logback + logback-classic + test + + + Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailedRequestListener.java (revision 0) @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.client; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.util.concurrent.Promise; + +public class FailedRequestListener implements ChannelFutureListener { + + private final Promise promise; + + public FailedRequestListener(Promise promise) { + this.promise = promise; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + future.channel().close(); + } else { + future.channel().read(); + } + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (revision 0) @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder; +import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FailoverClient implements Runnable, Closeable { + + private static final Logger log = LoggerFactory + .getLogger(FailoverClient.class); + + private final String host; + private final int port; + private int readTimeoutMs = 10000; + + private final FailoverStore store; + private FailoverClientHandler handler; + private EventLoopGroup group; + private EventExecutorGroup executor; + + public FailoverClient(String host, int port, SegmentStore store) { + this.host = host; + this.port = port; + this.store = new FailoverStore(store); + } + + public void run() { + + this.executor = new DefaultEventExecutorGroup(4); + this.handler = new FailoverClientHandler(this.store, executor); + + group = new NioEventLoopGroup(); + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(NioSocketChannel.class); + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, readTimeoutMs); + b.option(ChannelOption.TCP_NODELAY, true); + b.option(ChannelOption.SO_REUSEADDR, true); + b.option(ChannelOption.SO_KEEPALIVE, true); + + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + + // p.addLast(new LoggingHandler(LogLevel.INFO)); + // Enable stream compression + // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); + // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); + + // WriteTimeoutHandler & ReadTimeoutHandler + p.addLast("readTimeoutHandler", new ReadTimeoutHandler( + readTimeoutMs, TimeUnit.MILLISECONDS)); + + p.addLast(new StringEncoder(CharsetUtil.UTF_8)); + p.addLast(new RecordIdDecoder(store)); + p.addLast(executor, handler); + } + }); + try { + // Start the client. + ChannelFuture f = b.connect(host, port).sync(); + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("Failed synchronizing state.", e); + } finally { + close(); + } + } + + @Override + public void close() { + if (group != null && !group.isShuttingDown()) { + group.shutdownGracefully(1, 2, TimeUnit.SECONDS) + .syncUninterruptibly(); + } + if (executor != null && !executor.isShuttingDown()) { + executor.shutdownGracefully(1, 2, TimeUnit.SECONDS) + .syncUninterruptibly(); + } + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClientHandler.java (revision 0) @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.client; + +import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetHeadReq; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdDecoder; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentDecoder; +import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverClientHandler extends + SimpleChannelInboundHandler implements Closeable { + + private static final Logger log = LoggerFactory + .getLogger(FailoverClientHandler.class); + + private final FailoverStore store; + private final EventExecutorGroup executor; + + private ChannelHandlerContext ctx; + + private Promise headPromise; + + public FailoverClientHandler(final FailoverStore store, + EventExecutorGroup executor) { + this.store = store; + this.executor = executor; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + this.ctx = ctx; + sendHeadRequest(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) + throws Exception { + headPromise.setSuccess(msg); + }; + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + private synchronized void sendHeadRequest() { + headPromise = ctx.executor().newPromise(); + headPromise.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) { + if (future.isSuccess()) { + try { + setHead(future.get()); + } catch (Exception e) { + exceptionCaught(ctx, e); + } + } else { + exceptionCaught(ctx, future.cause()); + } + } + }); + ctx.writeAndFlush(newGetHeadReq()).addListener( + new FailedRequestListener(headPromise)); + } + + synchronized void setHead(RecordId head) { + headPromise = null; + + if (store.getHead().getRecordId().equals(head)) { + // all sync'ed up + log.info("no changes on sync."); + ctx.close(); + return; + } + ctx.pipeline().remove(RecordIdDecoder.class); + ctx.pipeline().remove(this); + ctx.pipeline().addLast(new SegmentDecoder(store)); + + e1 = new DefaultEventExecutorGroup(4); + SegmentPreLoaderHandler h1 = new SegmentPreLoaderHandler(); + ctx.pipeline().addLast(e1, h1); + + e2 = new DefaultEventExecutorGroup(4); + SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head, e1, e2); + ctx.pipeline().addLast(e2, h2); + + h1.channelActive(ctx); + h2.channelActive(ctx); + } + + private EventExecutorGroup e1; + private EventExecutorGroup e2; + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Failed synchronizing state.", cause); + close(); + } + + @Override + public void close() { + ctx.close(); + if (!executor.isShuttingDown()) { + executor.shutdownGracefully(1, 2, TimeUnit.SECONDS) + .syncUninterruptibly(); + } + if (e1 != null && !e1.isShuttingDown()) { + e1.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + if (e1 != null && !e2.isShuttingDown()) { + e2.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentLoaderHandler.java (revision 0) @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.client; + +import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.newGetSegmentReq; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.concurrent.EventExecutorGroup; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply; +import org.apache.jackrabbit.oak.plugins.segment.failover.store.FailoverStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.store.RemoteSegmentLoader; +import org.apache.jackrabbit.oak.spi.state.ApplyDiff; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter + implements RemoteSegmentLoader { + + private static final Logger log = LoggerFactory + .getLogger(SegmentLoaderHandler.class); + + private final FailoverStore store; + private final RecordId head; + private final EventExecutorGroup e1; + private final EventExecutorGroup e2; + + private int timeoutMs = 5000; + + private ChannelHandlerContext ctx; + + final BlockingQueue segment = new LinkedBlockingQueue(); + + public SegmentLoaderHandler(final FailoverStore store, RecordId head, + EventExecutorGroup e1, EventExecutorGroup e2) { + this.store = store; + this.head = head; + this.e1 = e1; + this.e2 = e2; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + this.ctx = ctx; + initSync(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) + throws Exception { + if (evt instanceof SegmentReply) { + segment.offer(((SegmentReply) evt).getSegment()); + } + } + + private void initSync() { + log.debug("new head id " + head); + long t = System.currentTimeMillis(); + + try { + store.setLoader(this); + SegmentNodeState before = store.getHead(); + SegmentNodeBuilder builder = before.builder(); + + SegmentNodeState current = new SegmentNodeState(head); + current.compareAgainstBaseState(before, new ApplyDiff(builder)); + + boolean ok = store.setHead(before, builder.getNodeState()); + log.info("#updated state (set head {}) in {}ms.", ok, + System.currentTimeMillis() - t); + } finally { + close(); + } + } + + @Override + public Segment readSegment(final SegmentId id) { + ctx.writeAndFlush(newGetSegmentReq(id)).addListener(reqListener); + return getSegment(); + } + + private final ChannelFutureListener reqListener = new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) { + if (!future.isSuccess()) { + exceptionCaught(ctx, future.cause()); + } + } + }; + + public Segment getSegment() { + boolean interrupted = false; + try { + for (;;) { + try { + return segment.poll(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignore) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Failed synchronizing state.", cause); + close(); + } + + @Override + public void close() { + ctx.close(); + if (e1 != null && !e1.isShuttingDown()) { + e1.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + if (e1 != null && !e2.isShuttingDown()) { + e2.shutdownGracefully(1, 2, TimeUnit.SECONDS).syncUninterruptibly(); + } + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/SegmentPreLoaderHandler.java (revision 0) @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentReply; + +public class SegmentPreLoaderHandler extends + SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Segment msg) + throws Exception { + ctx.fireUserEventTriggered(new SegmentReply(msg)); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/Messages.java (revision 0) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public class Messages { + + public static final byte HEADER_RECORD = 0x00; + public static final byte HEADER_SEGMENT = 0x01; + + public static final String GET_HEAD = "h"; + + public static final String GET_SEGMENT = "s."; + + public static String newGetHeadReq() { + return GET_HEAD + "\r\n"; + } + + public static String newGetSegmentReq(SegmentId sid) { + return GET_SEGMENT + sid.toString() + "\r\n"; + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdDecoder.java (revision 0) @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.CharsetUtil; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecordIdDecoder extends LengthFieldBasedFrameDecoder { + + private static final Logger log = LoggerFactory + .getLogger(RecordIdDecoder.class); + + private final SegmentStore store; + + public RecordIdDecoder(SegmentStore store) { + super(64, 0, 4, 0, 4); + this.store = store; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) + throws Exception { + ByteBuf frame = (ByteBuf) super.decode(ctx, in); + if (frame == null) { + return null; + } + byte type = frame.readByte(); + frame.discardReadBytes(); + String id = frame.toString(CharsetUtil.UTF_8); + try { + log.debug("received type {} with id {}", type, id); + return RecordId.fromString(store.getTracker(), id); + } catch (IllegalArgumentException e) { + log.error(e.getMessage(), e); + } + return null; + } + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, + int index, int length) { + return buffer.slice(index, length); + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/RecordIdEncoder.java (revision 0) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.CharsetUtil; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; + +public class RecordIdEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext ctx, RecordId msg, ByteBuf out) + throws Exception { + byte[] body = msg.toString().getBytes(CharsetUtil.UTF_8); + out.writeInt(body.length + 1); + out.writeByte(Messages.HEADER_RECORD); + out.writeBytes(body); + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (revision 0) @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentDecoder extends LengthFieldBasedFrameDecoder { + + private static final Logger log = LoggerFactory + .getLogger(SegmentDecoder.class); + + private final SegmentStore store; + + public SegmentDecoder(SegmentStore store) { + super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4); + this.store = store; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) + throws Exception { + ByteBuf frame = (ByteBuf) super.decode(ctx, in); + if (frame == null) { + return null; + } + byte type = frame.readByte(); + long msb = frame.readLong(); + long lsb = frame.readLong(); + frame.discardReadBytes(); + SegmentId id = new SegmentId(store.getTracker(), msb, lsb); + Segment s = new Segment(store.getTracker(), id, frame.nioBuffer()); + log.debug("received type {} with id {} and size {}", type, id, s.size()); + return s; + } + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, + int index, int length) { + return buffer.slice(index, length); + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (revision 0) @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public class SegmentEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out) + throws Exception { + SegmentId id = s.getSegmentId(); + int len = s.size() + 17; + out.writeInt(len); + out.writeByte(Messages.HEADER_SEGMENT); + out.writeLong(id.getMostSignificantBits()); + out.writeLong(id.getLeastSignificantBits()); + ByteBufOutputStream bout = new ByteBufOutputStream(out); + s.writeTo(bout); + bout.flush(); + bout.close(); + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentReply.java (revision 0) @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import org.apache.jackrabbit.oak.plugins.segment.Segment; + +public class SegmentReply { + + private final Segment segment; + + public SegmentReply(Segment segment) { + this.segment = segment; + } + + public Segment getSegment() { + return this.segment; + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServer.java (revision 0) @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.util.CharsetUtil; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.RecordIdEncoder; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.SegmentEncoder; + +public class FailoverServer implements Closeable { + + private final int port; + + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private final ServerBootstrap b; + + public FailoverServer(int port, final SegmentStore store) { + this.port = port; + + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + b = new ServerBootstrap(); + b.group(bossGroup, workerGroup); + b.channel(NioServerSocketChannel.class); + + b.option(ChannelOption.TCP_NODELAY, true); + b.option(ChannelOption.SO_REUSEADDR, true); + b.childOption(ChannelOption.TCP_NODELAY, true); + b.childOption(ChannelOption.SO_REUSEADDR, true); + b.childOption(ChannelOption.SO_KEEPALIVE, true); + + b.childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new LineBasedFrameDecoder(8192)); + p.addLast(new StringDecoder(CharsetUtil.UTF_8)); + p.addLast(new RecordIdEncoder()); + p.addLast(new SegmentEncoder()); + p.addLast(new FailoverServerHandler(store)); + } + }); + } + + public void start() { + try { + b.bind("127.0.0.1", port).sync().channel().closeFuture().sync(); + } catch (InterruptedException e) { + close(); + } + } + + @Override + public void close() { + if (bossGroup != null && !bossGroup.isShuttingDown()) { + bossGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS) + .syncUninterruptibly(); + } + if (workerGroup != null && !workerGroup.isShuttingDown()) { + workerGroup.shutdownGracefully(1, 2, TimeUnit.SECONDS) + .syncUninterruptibly(); + ; + } + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/server/FailoverServerHandler.java (revision 0) @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.server; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Sharable +public class FailoverServerHandler extends SimpleChannelInboundHandler { + + private static final Logger log = LoggerFactory + .getLogger(FailoverServerHandler.class); + + private final SegmentStore store; + + public FailoverServerHandler(SegmentStore store) { + this.store = store; + } + + private RecordId headId() { + if (store != null) { + return store.getHead().getRecordId(); + } + return null; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, String request) + throws Exception { + if (Messages.GET_HEAD.equalsIgnoreCase(request)) { + RecordId r = headId(); + if (r != null) { + ctx.writeAndFlush(r); + } else { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); + } + + } else if (request.startsWith(Messages.GET_SEGMENT)) { + String sid = request.substring(Messages.GET_SEGMENT.length()); + log.debug("request segment id {}", sid); + UUID uuid = UUID.fromString(sid); + + Segment s = null; + + for (int i = 0; i < 3; i++) { + try { + s = store.readSegment(new SegmentId(store.getTracker(), + uuid.getMostSignificantBits(), uuid + .getLeastSignificantBits())); + } catch (IllegalStateException e) { + // segment not found + log.warn(e.getMessage()); + } + if (s != null) { + break; + } else { + TimeUnit.MILLISECONDS.sleep(500); + } + } + + if (s != null) { + ctx.writeAndFlush(s); + } else { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); + } + } else { + log.warn("Unknown request {}, ignoring.", request); + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error(cause.getMessage(), cause); + ctx.close(); + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStore.java (revision 0) @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.store; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashSet; +import java.util.Set; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +public class FailoverStore implements SegmentStore { + + private final SegmentTracker tracker = new SegmentTracker(this); + + private final SegmentStore delegate; + + private RemoteSegmentLoader loader; + + public FailoverStore(SegmentStore delegate) { + this.delegate = delegate; + } + + @Override + public SegmentTracker getTracker() { + return tracker; + } + + @Override + public SegmentNodeState getHead() { + return delegate.getHead(); + } + + @Override + public boolean setHead(SegmentNodeState base, SegmentNodeState head) { + return delegate.setHead(base, head); + } + + @Override + public boolean containsSegment(SegmentId id) { + return delegate.containsSegment(id); + } + + @Override + public Segment readSegment(SegmentId sid) { + + Deque ids = new ArrayDeque(); + ids.offer(sid); + int err = 0; + Set seen = new HashSet(); + + while (!ids.isEmpty()) { + SegmentId id = ids.remove(); + if (!seen.contains(id) && !delegate.containsSegment(id)) { + Segment s = loader.readSegment(id); + if (s != null) { + ByteArrayOutputStream bout = new ByteArrayOutputStream( + s.size()); + if (id.isDataSegmentId()) { + ids.addAll(s.getReferencedIds()); + } + try { + s.writeTo(bout); + writeSegment(id, bout.toByteArray(), 0, s.size()); + } catch (IOException e) { + throw new IllegalStateException( + "Unable to write remote segment " + id, e); + } + seen.add(id); + ids.removeAll(seen); + err = 0; + } else { + if (err == 5) { + loader.close(); + throw new IllegalStateException( + "Unable to load remote segment " + id); + } + err++; + ids.addFirst(id); + } + } else { + seen.add(id); + } + } + + return delegate.readSegment(sid); + } + + @Override + public void writeSegment(SegmentId id, byte[] bytes, int offset, int length) { + delegate.writeSegment(id, bytes, offset, length); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public Blob readBlob(String reference) { + return delegate.readBlob(reference); + } + + @Override + public BlobStore getBlobStore() { + return delegate.getBlobStore(); + } + + @Override + public void gc() { + delegate.gc(); + } + + public void setLoader(RemoteSegmentLoader loader) { + this.loader = loader; + } + +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/FailoverStoreService.java (revision 0) @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.store; + +import static java.lang.Integer.parseInt; +import static java.lang.String.valueOf; +import static org.apache.felix.scr.annotations.ReferencePolicy.STATIC; +import static org.apache.felix.scr.annotations.ReferencePolicyOption.GREEDY; + +import java.io.IOException; +import java.util.Dictionary; +import java.util.Hashtable; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.ConfigurationPolicy; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; +import org.apache.felix.scr.annotations.Reference; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStoreService; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component(policy = ConfigurationPolicy.REQUIRE) +public class FailoverStoreService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Property(label = "Mode", description = "TarMK Failover mode (master or slave)", options = { + @PropertyOption(name = "master", value = "master"), + @PropertyOption(name = "slave", value = "slave") }, value = "master") + public static final String MODE = "mode"; + + @Property(label = "Port", description = "TarMK Failover port", intValue = 8023) + public static final String PORT = "port"; + + @Property(label = "Master Host", description = "TarMK Failover master host (enabled for slave mode only)", value = "127.0.0.1") + public static final String MASTER_HOST = "master.host"; + + @Property(label = "Sync interval (seconds)", description = "TarMK Failover sync interval (seconds)", intValue = 5) + public static final String INTERVAL = "interval"; + + private static String MODE_MASTER = "master"; + + private static String MODE_SLAVE = "slave"; + + @Reference(policy = STATIC, policyOption = GREEDY) + private NodeStore store = null; + + private SegmentStore segmentStore; + + private FailoverServer master = null; + private FailoverClient sync = null; + private ServiceRegistration syncReg = null; + + @Activate + private void activate(ComponentContext context) throws IOException { + if (store instanceof SegmentNodeStoreService) { + segmentStore = ((SegmentNodeStoreService) store).getSegmentStore(); + } else { + throw new IllegalArgumentException( + "Unexpected NodeStore impl, expecting SegmentNodeStoreService, got " + + store.getClass()); + } + String mode = valueOf(context.getProperties().get(MODE)); + if (MODE_MASTER.equals(mode)) { + bootstrapMaster(context); + } else if (MODE_SLAVE.equals(mode)) { + bootstrapSlave(context); + } else { + throw new IllegalArgumentException( + "Unexpected 'mode' param, expecting 'master' or 'slave' got " + + mode); + } + } + + @Deactivate + public synchronized void deactivate() { + if (master != null) { + master.close(); + } + if (sync != null) { + sync.close(); + } + if (syncReg != null) { + syncReg.unregister(); + } + } + + private void bootstrapMaster(ComponentContext context) { + Dictionary props = context.getProperties(); + int port = parseInt(valueOf(props.get(PORT))); + master = new FailoverServer(port, segmentStore); + master.start(); + log.info("started failover master on port {}.", port); + } + + private void bootstrapSlave(ComponentContext context) { + Dictionary props = context.getProperties(); + int port = parseInt(valueOf(props.get(PORT))); + long interval = parseInt(valueOf(props.get(INTERVAL))); + String host = valueOf(context.getProperties().get(MASTER_HOST)); + + sync = new FailoverClient(host, port, segmentStore); + Dictionary dictionary = new Hashtable(); + dictionary.put("scheduler.period", interval); + dictionary.put("scheduler.concurrent", false); + dictionary.put("scheduler.runOn", "SINGLE"); + + syncReg = context.getBundleContext().registerService( + Runnable.class.getName(), sync, dictionary); + log.info("started failover slave sync with {}:{} at {} sec.", host, + port, interval); + } +} Index: oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/RemoteSegmentLoader.java =================================================================== --- oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/RemoteSegmentLoader.java (revision 0) +++ oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/store/RemoteSegmentLoader.java (revision 0) @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.failover.store; + +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public interface RemoteSegmentLoader { + + Segment readSegment(SegmentId id); + + void close(); + +} Index: oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java =================================================================== --- oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java (revision 0) +++ oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTestUtils.java (revision 0) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE; +import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; + +import java.util.Random; + +public class SegmentTestUtils { + + private SegmentTestUtils() { + + } + + public static int newValidOffset(Random random) { + return random.nextInt(MAX_SEGMENT_SIZE >> RECORD_ALIGN_BITS) << RECORD_ALIGN_BITS; + } + + public static RecordId newRecordId(SegmentTracker factory, Random random) { + SegmentId id = factory.newDataSegmentId(); + RecordId r = new RecordId(id, newValidOffset(random)); + return r; + } +} Index: oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java =================================================================== --- oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java (revision 0) +++ oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/FailoverTest.java (revision 0) @@ -0,0 +1,118 @@ +package org.apache.jackrabbit.oak.plugins.segment.failover; + +import static java.io.File.createTempFile; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.plugins.segment.failover.client.FailoverClient; +import org.apache.jackrabbit.oak.plugins.segment.failover.server.FailoverServer; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FailoverTest { + + private File directoryS; + + private FileStore storeS; + + private File directoryC; + + private FileStore storeC; + + @Before + public void setUp() throws IOException { + File target = new File("target"); + + // server + directoryS = createTempFile("FailoverServerTest", "dir", target); + directoryS.delete(); + directoryS.mkdir(); + storeS = new FileStore(directoryS, 1, false); + + // client + directoryC = createTempFile("FailoverClientTest", "dir", target); + directoryC.delete(); + directoryC.mkdir(); + storeC = new FileStore(directoryC, 1, false); + } + + @After + public void after() { + storeS.close(); + storeC.close(); + try { + FileUtils.deleteDirectory(directoryS); + FileUtils.deleteDirectory(directoryC); + } catch (IOException e) { + } + } + + @Test + public void testFailover() throws Exception { + + NodeStore store = new SegmentNodeStore(storeS); + final FailoverServer server = new FailoverServer(8023, storeS); + Thread s = new Thread() { + public void run() { + server.start(); + } + }; + s.start(); + addTestContent(store); + + FailoverClient cl = new FailoverClient("127.0.0.1", 8023, storeC); + cl.run(); + + try { + assertEquals(storeS.getHead(), storeC.getHead()); + } finally { + server.close(); + cl.close(); + } + + } + + private static void addTestContent(NodeStore store) + throws CommitFailedException { + NodeBuilder builder = store.getRoot().builder(); + builder.child("server").setProperty("ts", System.currentTimeMillis()); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + static void assertEqualStores(File d1, File d2) throws IOException { + FileStore f1 = new FileStore(d1, 1, false); + FileStore f2 = new FileStore(d2, 1, false); + try { + assertEquals(f1.getHead(), f2.getHead()); + } finally { + f1.close(); + f2.close(); + } + } + + public static void main(String[] args) throws Exception { + File d = createTempFile("FailoverLiveTest", "dir", new File("target")); + d.delete(); + d.mkdir(); + FileStore s = new FileStore(d, 256, false); + FailoverClient cl = new FailoverClient("127.0.0.1", 8023, s); + try { + cl.run(); + } finally { + s.close(); + cl.close(); + } + } + +} Index: oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/CodecTest.java =================================================================== --- oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/CodecTest.java (revision 0) +++ oak-tarmk-failover/src/test/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/CodecTest.java (revision 0) @@ -0,0 +1,76 @@ +package org.apache.jackrabbit.oak.plugins.segment.failover.codec; + +import static io.netty.util.CharsetUtil.UTF_8; +import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.newRecordId; +import static org.apache.jackrabbit.oak.plugins.segment.failover.codec.Messages.HEADER_RECORD; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; + +import java.util.Random; + +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; +import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore; +import org.junit.Test; + +public class CodecTest { + + @Test + public void testEncoder() throws Exception { + SegmentTracker factory = new MemoryStore().getTracker(); + int seed = new Random().nextInt(); + Random random = new Random(seed); + + RecordId r = newRecordId(factory, random); + + EmbeddedChannel channel = new EmbeddedChannel(new RecordIdEncoder()); + assertTrue(channel.writeOutbound(r)); + ByteBuf encoded = (ByteBuf) channel.readOutbound(); + + assertNotNull(encoded); + + byte[] ref = r.toString().getBytes(UTF_8); + assertEquals(ref.length + 1, encoded.readInt()); + assertEquals(HEADER_RECORD, encoded.readByte()); + byte[] data = new byte[encoded.readableBytes()]; + encoded.readBytes(data); + assertArrayEquals(ref, data); + assertFalse(encoded.isReadable()); + assertFalse(channel.finish()); + assertNull(channel.readInbound()); + } + + @Test + public void testDecoder() throws Exception { + SegmentStore store = new MemoryStore(); + SegmentTracker factory = store.getTracker(); + int seed = new Random().nextInt(); + Random random = new Random(seed); + + RecordId r = newRecordId(factory, random); + byte[] ref = r.toString().getBytes(UTF_8); + + System.out.println(r); + + EmbeddedChannel channel = new EmbeddedChannel( + new RecordIdDecoder(store)); + + ByteBuf buffer = Unpooled.buffer(); + buffer.writeInt(ref.length + 1); + buffer.writeByte(HEADER_RECORD); + buffer.writeBytes(ref); + + assertTrue(channel.writeInbound(buffer)); + RecordId response = (RecordId) channel.readInbound(); + assertEquals(r, response); + + } +} Index: oak-tarmk-failover/src/test/resources/logback-test.xml =================================================================== --- oak-tarmk-failover/src/test/resources/logback-test.xml (revision 0) +++ oak-tarmk-failover/src/test/resources/logback-test.xml (revision 0) @@ -0,0 +1,29 @@ + + + + + + %date{HH:mm:ss.SSS} %-5level %-40([%thread] %F:%L) %msg%n + + + + + + + +