diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 5e286de..261bfb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -278,6 +278,18 @@ public class CompactingMemStore extends AbstractMemStore { // Phase I: Update the pipeline getRegionServices().blockUpdates(); try { + if (!isActiveSizeAboveInMemFlushLimit()) { + // Many concurrent writing threads might see the memstore size above the flush limit and + // start the new thread. Consider 2 such threads in place. The 1st one got a chance and + // acquired the lock to block updates. It will push the active to pipeline and release the + // lock. At the same time there are other normal write threads trying to write cells to + // memstore. One such can get the region lock lock(read lock in such case). Then it will add + // cell(s) to fresh active segment in memstore. Now the 2nd thread for in memory flush(this + // thread) gets the lock and if below check is not there it will go ahead and push the + // current active segment to pipeline. This might be a very small segment and such an + // operation will be really unwanted. The below double check on size avoids such a case. + return; + } MutableSegment active = getActive(); if (LOG.isDebugEnabled()) { LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " @@ -311,8 +323,12 @@ public class CompactingMemStore extends AbstractMemStore { return getRegionServices().getInMemoryCompactionPool(); } + private boolean isActiveSizeAboveInMemFlushLimit() { + return getActive().getSize() > inmemoryFlushSize; + } + private boolean shouldFlushInMemory() { - if(getActive().getSize() > inmemoryFlushSize) { + if (isActiveSizeAboveInMemFlushLimit()) { // size above flush threshold return (allowCompaction.get() && !inMemoryFlushInProgress.get()); } @@ -387,6 +403,16 @@ public class CompactingMemStore extends AbstractMemStore { allowCompaction.set(true); } + @VisibleForTesting + long getInMemoryFlushSize() { + return this.inmemoryFlushSize; + } + + @VisibleForTesting + void setInMemoryFlushSize(long size) { + this.inmemoryFlushSize = size; + } + /** * @param cell Find the row that comes after this one. If null, we return the * first. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index c5aae00..b7c9ca4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -169,7 +169,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Thread.sleep(1); addRows(this.memstore); Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY); - assertTrue(KeyValue.COMPARATOR.compareRows(closestToEmpty, + assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty, new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); for (int i = 0; i < ROW_COUNT; i++) { Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i), @@ -177,7 +177,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { if (i + 1 == ROW_COUNT) { assertEquals(nr, null); } else { - assertTrue(KeyValue.COMPARATOR.compareRows(nr, + assertTrue(CellComparator.COMPARATOR.compareRows(nr, new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); } } @@ -510,6 +510,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { byte[] qf2 = Bytes.toBytes("testqualifier2"); byte[] qf3 = Bytes.toBytes("testqualifier3"); byte[] val = Bytes.toBytes("testval"); + long oldVal = ((CompactingMemStore)memstore).getInMemoryFlushSize(); // Setting up memstore memstore.add(new KeyValue(row, fam, qf1, 1, val)); @@ -518,7 +519,13 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating a pipeline ((CompactingMemStore)memstore).disableCompaction(); - ((CompactingMemStore)memstore).flushInMemory(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); + } // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); @@ -527,14 +534,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(2, memstore.getActive().getCellsCount()); // pipeline bucket 2 - ((CompactingMemStore)memstore).flushInMemory(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); + } // opening scanner before force flushing List scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data ((CompactingMemStore)memstore).enableCompaction(); - // trigger compaction - ((CompactingMemStore)memstore).flushInMemory(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // trigger compaction + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); + } // Adding value to "new" memstore assertEquals(0, memstore.getActive().getCellsCount()); @@ -586,15 +604,18 @@ public class TestCompactingMemStore extends TestDefaultMemStore { public void testCompaction1Bucket() throws IOException { String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 - + long oldVal = ((CompactingMemStore)memstore).getInMemoryFlushSize(); // test 1 bucket addRowsByKeys(memstore, keys1); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); long size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -614,15 +635,17 @@ public class TestCompactingMemStore extends TestDefaultMemStore { String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; - + long oldVal = ((CompactingMemStore)memstore).getInMemoryFlushSize(); addRowsByKeys(memstore, keys1); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); - long size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(1000); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -631,9 +654,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -654,32 +680,34 @@ public class TestCompactingMemStore extends TestDefaultMemStore { String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; - + long oldVal = ((CompactingMemStore)memstore).getInMemoryFlushSize(); addRowsByKeys(memstore, keys1); assertEquals(496, region.getMemstoreSize()); long size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - - String tstStr = "\n\nFlushable size after first flush in memory:" + size - + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory(); - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(376, regionServicesForStores.getGlobalMemstoreTotalSize()); addRowsByKeys(memstore, keys2); - tstStr += " After adding second part of the keys. Memstore size: " + - region.getMemstoreSize() + ", Memstore Total Size: " + - regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n"; - assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); ((CompactingMemStore)memstore).disableCompaction(); size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); + } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(752, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -688,9 +716,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore { ((CompactingMemStore)memstore).enableCompaction(); size = memstore.getFlushableSize(); - ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact - while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) { - Threads.sleep(10); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + ((CompactingMemStore) memstore).setInMemoryFlushSize(memstore.getActive().getSize() - 1); + try { + ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact + } finally { + ((CompactingMemStore) memstore).setInMemoryFlushSize(oldVal); } assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize()); @@ -703,8 +734,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore { assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize()); memstore.clearSnapshot(snapshot.getId()); - - //assertTrue(tstStr, false); } private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 2acfd12..5682b00 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -58,8 +56,6 @@ import static org.junit.Assert.assertTrue; */ @Category({ RegionServerTests.class, LargeTests.class }) public class TestWalAndCompactingMemStoreFlush { - - private static final Log LOG = LogFactory.getLog(TestWalAndCompactingMemStoreFlush.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", @@ -215,15 +211,26 @@ public class TestWalAndCompactingMemStoreFlush { // Since CF1 and CF3 should be flushed to memory (not to disk), // CF2 is going to be flushed to disk. // CF1 - nothing to compact, CF3 - should be twice compacted - ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); - ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); + CompactingMemStore cms = (CompactingMemStore) region.getStore(FAMILY1).getMemStore(); + long oldVal = cms.getInMemoryFlushSize(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + cms.setInMemoryFlushSize(cms.getActive().getSize() - 1); + try { + cms.flushInMemory(); + } finally { + cms.setInMemoryFlushSize(oldVal); + } + cms = (CompactingMemStore) region.getStore(FAMILY3).getMemStore(); + oldVal = cms.getInMemoryFlushSize(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + cms.setInMemoryFlushSize(cms.getActive().getSize() - 1); + try { + cms.flushInMemory(); + } finally { + cms.setInMemoryFlushSize(oldVal); + } region.flush(false); - // CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - // Recalculate everything long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); @@ -483,15 +490,24 @@ public class TestWalAndCompactingMemStoreFlush { + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n"; // Flush! - ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); - ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); - // CF1 and CF3 should be compacted so wait here to be sure the compaction is done - while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); - while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore()) - .isMemStoreFlushingInMemory()) - Threads.sleep(10); + CompactingMemStore cms = (CompactingMemStore) region.getStore(FAMILY1).getMemStore(); + long oldVal = cms.getInMemoryFlushSize(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + cms.setInMemoryFlushSize(cms.getActive().getSize() - 1); + try { + cms.flushInMemory(); + } finally { + cms.setInMemoryFlushSize(oldVal); + } + cms = (CompactingMemStore) region.getStore(FAMILY3).getMemStore(); + oldVal = cms.getInMemoryFlushSize(); + // Reduce the in mem flush size limit so that an in memory flush can get executed. + cms.setInMemoryFlushSize(cms.getActive().getSize() - 1); + try { + cms.flushInMemory(); + } finally { + cms.setInMemoryFlushSize(oldVal); + } region.flush(false); long smallestSeqInRegionCurrentMemstorePhaseIV =