From 7a85d0fb40e35913822a83e47127f498dbed9c50 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 11 Jul 2019 22:09:12 +0800 Subject: [PATCH] HBASE-22665 UT --- .../regionserver/wal/TestAsyncFSWAL.java | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 5f0f77cd41..1360d05985 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -17,18 +17,49 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.LogRoller; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.io.netty.channel.Channel; @@ -90,4 +121,101 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { wal.init(); return wal; } + + @Test + public void testBrokenWriter() throws Exception { + Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(CONF); + RegionServerServices services = mock(RegionServerServices.class); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); + RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (byte[] fam : td.getColumnFamilyNames()) { + scopes.put(fam, 0); + } + long timestamp = System.currentTimeMillis(); + String testName = currentTest.getMethodName(); + AtomicInteger failedCount = new AtomicInteger(0); + try (LogRoller roller = new LogRoller(server, services); + AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), + testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) { + + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + AsyncWriter writer = super.createWriterInstance(path); + return new AsyncWriter() { + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public long getLength() { + return writer.getLength(); + } + + @Override + public CompletableFuture sync() { + CompletableFuture result = writer.sync(); + if (failedCount.incrementAndGet() < 1000) { + CompletableFuture future = new CompletableFuture<>(); + FutureUtils.addListener(result, + (r, e) -> future.completeExceptionally(new IOException("Inject Error"))); + return future; + } else { + return result; + } + } + + @Override + public void append(Entry entry) { + writer.append(entry); + } + }; + } + }) { + wal.init(); + roller.addWAL(wal); + roller.start(); + int numThreads = 10; + AtomicReference error = new AtomicReference<>(); + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < 10; i++) { + final int index = i; + threads[index] = new Thread("Write-Thread-" + index) { + + @Override + public void run() { + byte[] row = Bytes.toBytes("row" + index); + WALEdit cols = new WALEdit(); + cols.add(new KeyValue(row, row, row, timestamp + index, row)); + WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), + SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, mvcc, scopes); + try { + wal.append(ri, key, cols, true); + } catch (IOException e) { + // should not happen + throw new UncheckedIOException(e); + } + try { + wal.sync(); + } catch (IOException e) { + error.set(e); + } + } + }; + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + assertNull(error.get()); + } + } } -- 2.17.1