commit 2a5cdddb5c1186eede2d2b200644c3a736d161a5 Author: stack Date: Fri Oct 30 13:29:13 2015 -0700 HBASE-14540 Write Ahead Log Batching Optimization diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 928f4b6..fe5330f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -91,6 +92,7 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -184,6 +186,9 @@ public class FSHLog implements WAL { */ private final RingBufferEventHandler ringBufferEventHandler; + public static final String HBASE_REGIONSERVER_WAL_DISRUPTOR_WAITSTRATEGY_KEY = + "hbase.regionserver.wal.disruptor.waitstrategy.key"; + /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. @@ -540,9 +545,13 @@ public class FSHLog implements WAL { this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. + WaitStrategy waitStrategy = null; + Class clazz = this.conf.getClass(HBASE_REGIONSERVER_WAL_DISRUPTOR_WAITSTRATEGY_KEY, + BlockingWaitStrategy.class); + waitStrategy = (WaitStrategy)ReflectionUtils.newInstance(clazz); this.disruptor = new Disruptor(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, - this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.appendExecutor, ProducerType.MULTI, waitStrategy); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WaitForWaitStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WaitForWaitStrategy.java new file mode 100644 index 0000000..50ed6e0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WaitForWaitStrategy.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import com.lmax.disruptor.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.concurrent.locks.LockSupport; + +/** + * A Blocking Wait Strategy that will batch up writes for 2ms to limit the HBase impact on HDFS. + */ +public class WaitForWaitStrategy implements WaitStrategy { + static final Log LOG = LogFactory.getLog(WaitForWaitStrategy.class); + // 1 Millisecond Delay by default + private static long NANOS_IN_MILLIS = 1000 * 1000; + public static final long DEFAULT_NANO_SECOND_DELAY = NANOS_IN_MILLIS * 2; + private final long nanoSecondDelay; + private WaitStrategy blockingWaitStrategy; + private long lastTimestamp; + + public WaitForWaitStrategy() { + this(DEFAULT_NANO_SECOND_DELAY); + } + + public WaitForWaitStrategy(long nanoSecondDelay) { + blockingWaitStrategy = new BlockingWaitStrategy(); + this.nanoSecondDelay = nanoSecondDelay; + } + + @Override + public long waitFor(final long sequence, Sequence cursorSequence, + final Sequence dependentSequence, final SequenceBarrier barrier) + throws AlertException, InterruptedException, TimeoutException { + if (lastTimestamp == 0) LockSupport.parkNanos(nanoSecondDelay); + else { + long nextDelay = + nanoSecondDelay - (System.currentTimeMillis() - lastTimestamp) * NANOS_IN_MILLIS; + if (nextDelay > 0) LockSupport.parkNanos(nextDelay); + } + long availableSequence = + blockingWaitStrategy.waitFor(sequence, cursorSequence, dependentSequence, barrier); + lastTimestamp = System.currentTimeMillis(); + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() { + blockingWaitStrategy.signalAllWhenBlocking(); + } +} \ No newline at end of file