diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fd8235d652..13194ed7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2633,6 +2633,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // will be in advance of this sequence id. long flushedSeqId = HConstants.NO_SEQNUM; byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); + // Durability.ASYNC_WAL do not wait wal sync and commit mvcc ahead, so there may be some entry buffered in wal. + // sync unflushed WAL changes + if (wal != null) { + wal.sync(); + } try { if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 810d7df5d4..caf905f9f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -224,4 +225,95 @@ public class TestFSHLog extends AbstractTestFSWAL { region.close(); } } + + @Test + public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { + final String name = this.name.getMethodName(); + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch flushFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (FSHLog log = + new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, + null, true, null, null)) { + log.init(); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) + throws IOException { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + } + } + }); + + // open a new region which uses this WAL + TableDescriptor htd = + TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + region.put(new Put(b).addColumn(b, b,b)); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(b, b,b).setDurability(Durability.ASYNC_WAL)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + HRegion.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + flushFinished.await(); + + // check whether flush went through + assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); + + // now check the region's unflushed seqIds. + long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); + assertEquals("Found seqId for the region which is already flushed", + HConstants.NO_SEQNUM, seqId); + + region.close(); + } + } }