From df3d198a1a232e618beae300df6efd97409daa79 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 31 Jul 2019 14:18:09 +0100 Subject: [PATCH] HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used This is a draft and may require further polishing. Solution proposed here is based on Stack's idea of ignoring ASYNC_WAL behaviour in case of DBB re-use. For now, we just check if we have a DBB in the edit, then we'll always force sync. Added test had been consistently passing with such solution. Further enhancements would be to add extra checks to only force sync if DBB re-use is in place, instead of only check for DBB as whole. --- .../org/apache/hadoop/hbase/KeyValueUtil.java | 14 ++ .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 7 + .../hbase/regionserver/wal/AsyncFSWAL.java | 1 + .../hadoop/hbase/regionserver/wal/FSHLog.java | 1 + .../hbase/regionserver/wal/WALCellCodec.java | 1 + .../java/org/apache/hadoop/hbase/wal/WAL.java | 3 + .../org/apache/hadoop/hbase/wal/WALEdit.java | 13 ++ .../hadoop/hbase/ipc/TestNettyRpcServer.java | 149 +++++++++++++++++- 9 files changed, 187 insertions(+), 4 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 8a17ce975d..061b1fc077 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -517,6 +517,20 @@ public class KeyValueUtil { return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset + ", length=" + length; } + public static boolean isBufferValid(byte[] buf, int offset, int length, boolean withTags) { + try{ + checkKeyValueBytes(buf, offset, length, withTags); + }catch(IllegalArgumentException e){ + String kv = Bytes.toStringBinary(buf, 0, buf.length); + if (!kv.contains("tablestate") && !kv.contains("regioninfo")) { + LOG.warn("Got a KV validation error while writing. Just logging it for now " + + "and allowing to continue: ", e); + return false; + } + } + return true; + } + static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) { if (buf == null) { String msg = "Invalid to have null byte array in KeyValue."; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 246ffea15e..5183563a81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8526,7 +8526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException If anything goes wrong with DFS */ private void sync(long txid, Durability durability) throws IOException { - if (this.getRegionInfo().isMetaRegion()) { + if (this.getRegionInfo().isMetaRegion() || this.wal.syncAlways()) { this.wal.sync(txid); } else { switch(durability) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index af7cc0d202..697e77a732 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -1072,6 +1072,13 @@ public abstract class AbstractFSWAL implements WAL { public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; + protected boolean usingDirectByteBuffer = false; + + @Override + public boolean syncAlways(){ + return usingDirectByteBuffer; + } + protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; protected abstract W createWriterInstance(Path path) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index e12c03942b..39cfb2912b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -564,6 +564,7 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { + usingDirectByteBuffer = edits.isUsingDirectByteBuffer(); long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 9c6eafac43..03a442659b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -439,6 +439,7 @@ public class FSHLog extends AbstractFSWAL { @Override public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, final boolean inMemstore) throws IOException { + usingDirectByteBuffer = edits.isUsingDirectByteBuffer(); return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, disruptor.getRingBuffer()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 5aa943f1d8..df854a6a2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -364,6 +364,7 @@ public class WALCellCodec implements Codec { } } + @Override public Decoder getDecoder(InputStream is) { return (compression == null) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 4fb30febf2..40df504c92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -126,6 +126,9 @@ public interface WAL extends Closeable, WALFileLengthProvider { void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, boolean onlyIfGreater); + default boolean syncAlways(){ + return false; + } /** * Sync what we have in the WAL. * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index 214000200c..df57c64e46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -81,6 +82,8 @@ public class WALEdit implements HeapSize { private ArrayList cells = null; + private boolean usingDirectByteBuffer = false; + /** * All the Cell families in cells. Updated by {@link #add(Cell)} and * {@link #add(Map)}. This Set is passed to the FSWALEntry so it does not have @@ -376,6 +379,16 @@ public class WALEdit implements HeapSize { private WALEdit addCell(Cell cell) { this.cells.add(cell); + if(!usingDirectByteBuffer){ + if(cell instanceof ByteBufferKeyValue){ + usingDirectByteBuffer = ((ByteBufferKeyValue)cell).getBuffer().isDirect(); + } + } return this; } + + public boolean isUsingDirectByteBuffer() { + return usingDirectByteBuffer; + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java index b45dd5b286..62a3531bc7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java @@ -18,20 +18,37 @@ package org.apache.hadoop.hbase.ipc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ByteBufferWriter; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Before; @@ -41,9 +58,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ RPCTests.class, MediumTests.class }) -public class TestNettyRpcServer { + public class TestNettyRpcServer { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -58,6 +77,9 @@ public class TestNettyRpcServer { private static byte[] PRIVATE_COL = Bytes.toBytes("private"); private static byte[] PUBLIC_COL = Bytes.toBytes("public"); + private static CountDownLatch countDownLatch; + private static boolean foundCorruption = false; + @Before public void setup() { TABLE = TableName.valueOf(name.getMethodName()); @@ -67,8 +89,9 @@ public class TestNettyRpcServer { public static void setupBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.getConfiguration().set( - RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, - NettyRpcServer.class.getName()); + RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class.getName()); + TEST_UTIL.getConfiguration().set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, + TestWalCellCodec.class.getName()); TEST_UTIL.startMiniCluster(); } @@ -112,4 +135,124 @@ public class TestNettyRpcServer { } } + @Test + public void testNettyRpcServerAsyncWalDurabilityNoCorruption() throws Exception { + final Table table = TEST_UTIL.createTable(TABLE, FAMILY); + final Connection connection = TEST_UTIL.getConnection(); + try{ + BufferedMutator mutator = connection.getBufferedMutator(TABLE); + countDownLatch = new CountDownLatch(10); + for (long i = 0; i < 10; i++) { + StringBuilder builder = new StringBuilder(); + for(int l=0; l