From fa7f039b51d5800991da546e6628cd29d9013327 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sun, 27 Oct 2019 22:39:01 +0800 Subject: [PATCH] HBASE-23157 UT --- .../regionserver/wal/AbstractTestFSWAL.java | 224 ++++++++++++------ 1 file changed, 151 insertions(+), 73 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index ec3c28d3ed..3bde756030 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -30,6 +30,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -37,10 +39,12 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -65,7 +69,10 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushPolicy; +import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -496,93 +503,164 @@ public abstract class AbstractTestFSWAL { wal.rollWriter(); } - @Test - public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { - final String testName = currentTest.getMethodName(); - final byte[] b = Bytes.toBytes("b"); - - final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); - final CountDownLatch holdAppend = new CountDownLatch(1); - final CountDownLatch closeFinished = new CountDownLatch(1); - final CountDownLatch putFinished = new CountDownLatch(1); - - try (AbstractFSWAL wal = newWAL(FS, FSUtils.getRootDir(CONF), testName, - HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { - wal.init(); - wal.registerWALActionsListener(new WALActionsListener() { - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - if (startHoldingForAppend.get()) { - try { - holdAppend.await(); - } catch (InterruptedException e) { - LOG.error(e.toString(), e); - } - } - } - }); - - // open a new region which uses this WAL - TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); - RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); - TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); - RegionServerServices rsServices = mock(RegionServerServices.class); - when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); - when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); - final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, - TEST_UTIL.getConfiguration(), rsServices, null); - - ExecutorService exec = Executors.newFixedThreadPool(2); - - // do a regular write first because of memstore size calculation. - region.put(new Put(b).addColumn(b, b, b)); - - startHoldingForAppend.set(true); - exec.submit(new Runnable() { - @Override - public void run() { + private AbstractFSWAL createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend, + CountDownLatch holdAppend) throws IOException { + AbstractFSWAL wal = newWAL(FS, FSUtils.getRootDir(CONF), testName, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); + wal.init(); + wal.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + if (startHoldingForAppend.get()) { try { - region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); - putFinished.countDown(); - } catch (IOException e) { + holdAppend.await(); + } catch (InterruptedException e) { LOG.error(e.toString(), e); } } - }); + } + }); + return wal; + } - // give the put a chance to start - Threads.sleep(3000); + private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal) throws IOException { + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); + RegionServerServices rsServices = mock(RegionServerServices.class); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); + when(rsServices.getConfiguration()).thenReturn(conf); + return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, + conf, rsServices, null); + } - exec.submit(new Runnable() { - @Override - public void run() { - try { - Map closeResult = region.close(); - LOG.info("Close result:" + closeResult); - closeFinished.countDown(); - } catch (IOException e) { - LOG.error(e.toString(), e); - } - } - }); + private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put, + Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend, + CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend) + throws InterruptedException, IOException { + // do a regular write first because of memstore size calculation. + region.put(put); + + startHoldingForAppend.set(true); + region.put(new Put(put).setDurability(Durability.ASYNC_WAL)); + + // give the put a chance to start + Threads.sleep(3000); - // give the flush a chance to start. Flush should have got the region lock, and - // should have been waiting on the mvcc complete after this. - Threads.sleep(3000); + exec.submit(flushOrCloseRegion); - // let the append to WAL go through now that the flush already started - holdAppend.countDown(); - putFinished.await(); - closeFinished.await(); + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + flushOrCloseFinished.await(); + } + + // Testcase for HBASE-23181 + @Test + public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { + String testName = currentTest.getMethodName(); + byte[] b = Bytes.toBytes("b"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + + AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + CountDownLatch holdAppend = new CountDownLatch(1); + CountDownLatch closeFinished = new CountDownLatch(1); + ExecutorService exec = Executors.newFixedThreadPool(1); + AbstractFSWAL wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); + // open a new region which uses this WAL + HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal); + try { + doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> { + try { + Map closeResult = region.close(); + LOG.info("Close result:" + closeResult); + closeFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + }, startHoldingForAppend, closeFinished, holdAppend); // now check the region's unflushed seqIds. - long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); + long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, seqId); + } finally { + exec.shutdownNow(); + region.close(); + wal.close(); + } + } + private static final Set STORES_TO_FLUSH = + Collections.newSetFromMap(new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); + + // Testcase for HBASE-23157 + @Test + public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException { + String testName = currentTest.getMethodName(); + byte[] a = Bytes.toBytes("a"); + byte[] b = Bytes.toBytes("b"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + + AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + CountDownLatch holdAppend = new CountDownLatch(1); + CountDownLatch flushFinished = new CountDownLatch(1); + ExecutorService exec = Executors.newFixedThreadPool(2); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class, + FlushPolicy.class); + AbstractFSWAL wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); + // open a new region which uses this WAL + HRegion region = createHoldingHRegion(conf, htd, wal); + try { + Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); + doPutWithAsyncWAL(exec, region, put, () -> { + try { + HRegion.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + }, startHoldingForAppend, flushFinished, holdAppend); + + // get the max flushed sequence id after the first flush + long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); + + region.put(put); + // this time we only flush family a + STORES_TO_FLUSH.add(a); + region.flush(false); + + // get the max flushed sequence id after the second flush + long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); + // make sure that the maxFlushedSequenceId does not go backwards + assertTrue("maxFlushedSeqId1(" + maxFlushedSeqId1 + + ") is not greater than maxFlushedSeqId2(" + maxFlushedSeqId2 + ")", + maxFlushedSeqId1 < maxFlushedSeqId2); + } finally { + exec.shutdownNow(); + region.close(); wal.close(); } } + + public static final class FlushSpecificStoresPolicy extends FlushPolicy { + + @Override + public Collection selectStoresToFlush() { + if (STORES_TO_FLUSH.isEmpty()) { + return region.getStores(); + } else { + return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList()); + } + } + } } -- 2.17.1