Index: hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (revision 0) @@ -0,0 +1,544 @@ +/* + * Copyright 2013 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * This test verifies the correctness of the Per Column Family flushing strategy + */ +public class TestPerColumnFamilyFlush extends TestCase { + private static final Log LOG = + LogFactory.getLog(TestPerColumnFamilyFlush.class); + HRegion region = null; + private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final String DIR = TEST_UTIL.getTestDir() + + "/TestHRegion/"; + + public static final String TABLENAME_STR = "t1"; + public static final byte[] TABLENAME = Bytes.toBytes("t1"); + public static final byte[][] families = { Bytes.toBytes("f1"), + Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), + Bytes.toBytes("f5") }; + public static final byte[] FAMILY1 = families[0]; + public static final byte[] FAMILY2 = families[1]; + public static final byte[] FAMILY3 = families[2]; + + private void initHRegion (String callingMethod, + HBaseConfiguration conf) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(htd, null, null, false); + Path path = new Path(DIR + callingMethod); + region = HRegion.createHRegion(info, path, conf); + } + + // A helper function to create puts. + Put createPut(int familyNum, int putNum) { + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + Put p = new Put(row); + p.add(families[familyNum - 1], qf, val); + return p; + } + + // A helper function to create puts. + Get createGet(int familyNum, int putNum) { + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + return new Get(row); + } + + // A helper function to verify edits. + void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { + Result r = table.get(createGet(familyNum, putNum)); + byte[] family = families[familyNum - 1]; + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family)); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family).get(qf)); + assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), + Arrays.equals(r.getFamilyMap(family).get(qf), val)); + } + + @Test + public void testSelectiveFlushWhenEnabled() throws IOException { + // Set up the configuration + HBaseConfiguration conf = new HBaseConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200*1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100*1024); + + // Intialize the HRegion + initHRegion(getName(), conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = + region.getStore(FAMILY1).getSmallestSeqNumberInMemstore(); + long smallestSeqCF2 = + region.getStore(FAMILY2).getSmallestSeqNumberInMemstore(); + long smallestSeqCF3 = + region.getStore(FAMILY3).getSmallestSeqNumberInMemstore(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = + region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = + region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = + region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = + region.getLog(). + getFirstSeqWrittenInCurrentMemstoreForRegion(region); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertTrue(totalMemstoreSize == + (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); + + // Flush! + region.flushcache(true); + + // Will use these to check if anything changed. + long oldCF2MemstoreSize = cf2MemstoreSize; + long oldCF3MemstoreSize = cf3MemstoreSize; + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getFirstSeqWrittenInCurrentMemstoreForRegion(region); + + // We should have cleared out only CF1, since we chose the flush thresholds + // and number of puts accordingly. + assertEquals(0, cf1MemstoreSize); + // Nothing should have happened to CF2, ... + assertTrue(cf2MemstoreSize == oldCF2MemstoreSize); + // ... or CF3 + assertTrue(cf3MemstoreSize == oldCF3MemstoreSize); + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF2. + assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF2); + // Of course, this should hold too. + assertTrue(totalMemstoreSize == (cf2MemstoreSize + cf3MemstoreSize)); + + // Now add more puts (mostly for CF2), so that we only flush CF2 this time. + for (int i = 1200; i < 2400; i++) { + region.put(createPut(2, i)); + + // Add only 100 puts for CF3 + if (i - 1200 < 100) { + region.put(createPut(3, i)); + } + } + + // How much does the CF3 memstore occupy? Will be used later. + oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Flush again + region.flushcache(true); + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getFirstSeqWrittenInCurrentMemstoreForRegion(region); + + // CF1 and CF2, both should be absent. + assertEquals(0, cf1MemstoreSize); + assertEquals(0, cf2MemstoreSize); + // CF3 shouldn't have been touched. + assertTrue(cf3MemstoreSize == oldCF3MemstoreSize); + assertTrue(totalMemstoreSize == cf3MemstoreSize); + assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF3); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // Clearing the existing memstores. + region.flushcache(false); + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flushcache(true); + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores. + Assert.assertEquals(0, region.getMemstoreSize().get()); + } + + @Test + public void testSelectiveFlushWhenNotEnabled() throws IOException { + // Set up the configuration + HBaseConfiguration conf = new HBaseConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100 * 1024); + + // Intialize the HRegion + initHRegion(getName(), conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = + region.getStore(FAMILY1).getSmallestSeqNumberInMemstore(); + long smallestSeqCF2 = + region.getStore(FAMILY2).getSmallestSeqNumberInMemstore(); + long smallestSeqCF3 = + region.getStore(FAMILY3).getSmallestSeqNumberInMemstore(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = + region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = + region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = + region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = + region.getLog(). + getFirstSeqWrittenInCurrentMemstoreForRegion(region); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertTrue(totalMemstoreSize == + (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); + + // Flush! + region.flushcache(true); + + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getFirstSeqWrittenInCurrentMemstoreForRegion(region); + + // Everything should have been cleared + assertEquals(0, cf1MemstoreSize); + assertEquals(0, cf2MemstoreSize); + assertEquals(0, cf3MemstoreSize); + assertEquals(0, totalMemstoreSize); + assertEquals(Long.MAX_VALUE, smallestSeqInRegionCurrentMemstore); + } + + // Find the (first) region which has a name starting with a particular prefix. + private HRegion getRegionWithNameStartingWith(String regionPrefix) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = + cluster.getRegionServerThreads(); + int rsIndexContainingOurRegion = -1; + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region : hrs.getOnlineRegions()) { + if (region.getRegionNameAsString().startsWith(regionPrefix)) { + if (rsIndexContainingOurRegion == -1) { + return region; + } + } + } + } + return null; + } + + @Test + public void testLogReplay() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); + // Carefully chosen limits so that the memstore just flushes when we're done + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 10000); + final int numRegionServers = 4; + try { + TEST_UTIL.startMiniCluster(numRegionServers); + } catch (Exception e) { + LOG.error("Could not start the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + + TEST_UTIL.createTable(TABLENAME, families); + HTable table = new HTable(conf, TABLENAME); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte [] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + } + table.flushCommits(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + throw e; + } + + HRegion desiredRegion = getRegionWithNameStartingWith(TABLENAME_STR); + assertTrue("Could not find a region which hosts the new region.", + desiredRegion != null); + + // Flush the region selectively. + desiredRegion.flushcache(true); + + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + + // CF1 Should have been flushed + assertEquals(0, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); + + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + throw e; + } + + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegion.getRegionServer(); + rs.abort("testing"); + + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. + + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } + } + + try { + TEST_UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.error("Could not shutdown the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + } + + // Test Log Replay with Distributed Splitting on. + // In distributed log splitting, the log splitters ask the master for the + // last flushed sequence id for a region. This test would ensure that we + // are doing the book-keeping correctly. + @Test + public void testLogReplayWithDistributedSplitting() throws Exception { + TEST_UTIL.getConfiguration().setBoolean( + HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); + testLogReplay(); + } + + /** + * When a log roll is about to happen, we do a flush of the regions who will + * be affected by the log roll. These flushes cannot be a selective flushes, + * otherwise we cannot roll the logs. This test ensures that we do a + * full-flush in that scenario. + * @throws IOException + */ + @Test + public void testFlushingWhenLogRolling() throws Exception { + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100000); + + // Also, let us try real hard to get a log roll to happen. + // Keeping the log roll period to 2s. + conf.setLong("hbase.regionserver.logroll.period", 2000); + // Keep the block size small so that we fill up the log files very fast. + conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + final int numRegionServers = 4; + try { + TEST_UTIL.startMiniCluster(numRegionServers); + } catch (Exception e) { + LOG.error("Could not start the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + + TEST_UTIL.createTable(TABLENAME, families); + HTable table = new HTable(conf, TABLENAME); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte [] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + HRegion desiredRegion = getRegionWithNameStartingWith(TABLENAME_STR); + assertTrue("Could not find a region which hosts the new region.", + desiredRegion != null); + + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = desiredRegion.getLog().getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } + } + + // Wait for some time till the flush caused by log rolling happens. + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + throw e; + } + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1525721) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -806,7 +806,7 @@ for (int newValue = 0; newValue < 1000; newValue++) { for (int row = newValue; row < newValue + 1000; row++) { byte[] rowBytes = Bytes.toBytes(row); - size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts); + size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts, -1L); } } System.out.println("Wrote " + ts + " vals"); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1525721) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -260,9 +260,9 @@ int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #"+i); - this.store.add(new KeyValue(row, family, qf1, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, i, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), -1L); flush(i); } // after flush; check the lowest time stamp @@ -311,8 +311,8 @@ public void testEmptyStoreFile() throws IOException { init(this.getName()); // Write a store file. - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); flush(1); // Now put in place an empty store file. Its a little tricky. Have to // do manually with hacked in sequence id. @@ -349,12 +349,12 @@ init(this.getName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //Get result = HBaseTestingUtility.getFromStoreFile(store, @@ -372,20 +372,20 @@ init(this.getName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //flush flush(3); @@ -410,20 +410,20 @@ init(this.getName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //Get result = HBaseTestingUtility.getFromStoreFile(store, @@ -464,7 +464,8 @@ long newValue = 3L; this.store.add(new KeyValue(row, family, qf1, System.currentTimeMillis(), - Bytes.toBytes(oldValue))); + Bytes.toBytes(oldValue)), + -1L); // snapshot the store. this.store.snapshot(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -484,10 +484,10 @@ } @Override - public long add(final KeyValue kv) { + public long add(final KeyValue kv, long seqNum) { lock.readLock().lock(); try { - return this.memstore.add(kv); + return this.memstore.add(kv, seqNum); } finally { lock.readLock().unlock(); } @@ -504,10 +504,10 @@ * @param kv * @return memstore size delta */ - protected long delete(final KeyValue kv) { + protected long delete(final KeyValue kv, long seqNum) { lock.readLock().lock(); try { - return this.memstore.delete(kv); + return this.memstore.delete(kv, seqNum); } finally { lock.readLock().unlock(); } @@ -1723,9 +1723,19 @@ @Override public long getMemStoreSize() { - return this.memstore.heapSize(); + // Use memstore.keySize() instead of heapSize() since heapSize() gives the + // size of the keys + size of the map. + return this.memstore.keySize(); } + /** + * A helper function to get the smallest LSN in the mestore. + * @return + */ + public long getSmallestSeqNumberInMemstore() { + return this.memstore.getSmallestSeqNumber(); + } + @Override public int getCompactPriority() { int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); @@ -1774,11 +1784,12 @@ * @param f family to update * @param qualifier qualifier to update * @param newValue the new value to set into memstore + * @param seqNum The LSN associated with the edit. * @return memstore size delta * @throws IOException */ public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) + byte [] qualifier, long newValue, long seqNum) throws IOException { this.lock.readLock().lock(); @@ -1789,7 +1800,8 @@ f, qualifier, newValue, - now); + now, + seqNum); } finally { this.lock.readLock().unlock(); @@ -1797,10 +1809,10 @@ } @Override - public long upsert(Iterable cells, long readpoint) throws IOException { + public long upsert(Iterable cells, long readpoint, long seqNum) throws IOException { this.lock.readLock().lock(); try { - return this.memstore.upsert(cells, readpoint); + return this.memstore.upsert(cells, readpoint, seqNum); } finally { this.lock.readLock().unlock(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (working copy) @@ -129,7 +129,10 @@ if (r != null) { requester = this.services.getFlushRequester(); if (requester != null) { - requester.requestFlush(r); + // If we do a selective flush, some column families might remain in + // the memstore for a long time, and might cause old logs to + // accumulate. Hence, we would not request for a selective flush. + requester.requestFlush(r, false); scheduled = true; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (working copy) @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -35,6 +34,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,12 +50,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Syncable; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cloudera.htrace.Trace; @@ -115,6 +117,7 @@ private final Path rootDir; private final Path dir; private final Configuration conf; + private final boolean perColumnFamilyFlushEnabled; // Listeners that are called on WAL events. private List listeners = new CopyOnWriteArrayList(); @@ -152,8 +155,8 @@ /** - * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums, - * with the exception of append's putIfAbsent into oldestUnflushedSeqNums. + * This lock synchronizes all operations on firstSeqWrittenInCurrentMemstore + * and firstSeqWrittenInSnapshotMemstore. * We only use these to find out the low bound seqNum, or to find regions with old seqNums to * force flush them, so we don't care about these numbers messing with anything. */ private final Object oldestSeqNumsLock = new Object(); @@ -166,9 +169,9 @@ /** * Map of encoded region names to their most recent sequence/edit id in their memstore. - */ private final ConcurrentSkipListMap oldestUnflushedSeqNums = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + */ /** * Map of encoded region names to their most recent sequence/edit id in their memstore; * contains the regions that are currently flushing. That way we can store two numbers for @@ -178,6 +181,13 @@ new TreeMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; + /* + * Map of regions to first sequence/edit id in their memstore. + */ + private final ConcurrentNavigableMap firstSeqWrittenInCurrentMemstore = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + private final ConcurrentNavigableMap firstSeqWrittenInSnapshotMemstore = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private final AtomicLong logSeqNum = new AtomicLong(0); @@ -324,6 +334,9 @@ this.oldLogDir = new Path(this.rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; + this.perColumnFamilyFlushEnabled = conf.getBoolean( + HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, + HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); if (listeners != null) { for (WALActionsListener i: listeners) { @@ -537,8 +550,7 @@ // Can we delete any of the old log files? if (getNumLogFiles() > 0) { - cleanOldLogs(); - regionsToFlush = getRegionsToForceFlush(); + regionsToFlush = cleanOldLogs(getOldestOutstandingSeqNum()); } } finally { this.logRollRunning = false; @@ -548,6 +560,23 @@ } } + /* + * @return Logs older than this id are safe to remove. + */ + private long getOldestOutstandingSeqNum() { + long oldest = Long.MAX_VALUE; + synchronized (oldestSeqNumsLock) { + if (!this.firstSeqWrittenInCurrentMemstore.isEmpty()) { + oldest = Collections.min(this.firstSeqWrittenInCurrentMemstore.values()); + } + if (!this.firstSeqWrittenInSnapshotMemstore.isEmpty()) { + oldest = Math.min(oldest, + Collections.min(this.firstSeqWrittenInSnapshotMemstore.values())); + } + } + + return oldest; + } /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). @@ -573,16 +602,7 @@ * encoded region names to flush. * @throws IOException */ - private void cleanOldLogs() throws IOException { - long oldestOutstandingSeqNum = Long.MAX_VALUE; - synchronized (oldestSeqNumsLock) { - Long oldestFlushing = (oldestFlushingSeqNums.size() > 0) - ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE; - Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0) - ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE; - oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed); - } - + private byte [][] cleanOldLogs(long oldestOutstandingSeqNum) throws IOException { // Get the set of all log files whose last sequence number is smaller than // the oldest edit's sequence number. TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( @@ -598,39 +618,18 @@ for (Long seq : sequenceNumbers) { archiveLogFile(this.outputfiles.remove(seq), seq); } - } - - /** - * Return regions that have edits that are equal or less than a certain sequence number. - * Static due to some old unit test. - * @param walSeqNum The sequence number to compare with. - * @param regionsToSeqNums Encoded region names to sequence ids - * @return All regions whose seqNum <= walSeqNum. Null if no regions found. - */ - static byte[][] findMemstoresWithEditsEqualOrOlderThan( - final long walSeqNum, final Map regionsToSeqNums) { - List regions = null; - for (Map.Entry e : regionsToSeqNums.entrySet()) { - if (e.getValue().longValue() <= walSeqNum) { - if (regions == null) regions = new ArrayList(); - regions.add(e.getKey()); - } - } - return regions == null ? null : regions - .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); - } - - private byte[][] getRegionsToForceFlush() throws IOException { // If too many log files, figure which regions we need to flush. + // Transactions in these regions have not been flushed, which + // prevent log roller from archiving oldest HLog file. Flush requests + // will be sent to these regions after log rolling. + // Array is an array of encoded region names. byte [][] regions = null; int logCount = getNumLogFiles(); if (logCount > this.maxLogs && logCount > 0) { // This is an array of encoded region names. - synchronized (oldestSeqNumsLock) { - regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), - this.oldestUnflushedSeqNums); - } + regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), + this.firstSeqWrittenInCurrentMemstore, this.firstSeqWrittenInSnapshotMemstore); if (regions != null) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < regions.length; i++) { @@ -645,6 +644,37 @@ return regions; } + /** + * Return regions that have edits that are equal or less than a certain sequence number. + * Static due to some old unit test. + * @param oldestWALseqid The sequence number to compare with. + * @param regionsToCurSeqids Encoded region names to sequence ids + * @param regionsToPrevSeqids + * @return All regions whose seqNum <= oldestWALseqid. Null if no regions found. + */ + static byte[][] findMemstoresWithEditsEqualOrOlderThan( + final long oldestWALseqid, final Map regionsToCurSeqids, + final Map regionsToPrevSeqids) { + // This method is static so it can be unit tested the easier. + List regions = new ArrayList(); + for (Map.Entry e: regionsToCurSeqids.entrySet()) { + if (e.getValue() <= oldestWALseqid) { + byte [] region = e.getKey(); + if (!regions.contains(region)) + regions.add(region); + } + } + for (Map.Entry e: regionsToPrevSeqids.entrySet()) { + if (e.getValue() <= oldestWALseqid) { + byte [] region = e.getKey(); + if (!regions.contains(region)) + regions.add(region); + } + } + return regions.isEmpty() ? + null : regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY}); + } + /* * Cleans up current writer closing and adding to outputfiles. * Presumes we're operating inside an updateLock scope. @@ -867,23 +897,26 @@ * @param clusterIds that have consumed the change (for replication) * @param now * @param doSync shall we sync? - * @return txid of this transaction + * @return Pair of Long's. First is txid of this transaction. + * Second is the log sequence number for the edit used for book-keeping for per-CF flush of the + * memstore. -1 if the WAL is disabled. * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, + private Pair append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, RegionCoprocessorHost regionCoproHost) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get(); + if (edits.isEmpty()) return new Pair(this.unflushedEntries.get(), -1L); if (this.closed) { throw new IOException("Cannot append; log is closed"); } TraceScope traceScope = Trace.startSpan("FSHlog.append"); try { long txid = 0; + long seqNum; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -892,7 +925,9 @@ // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); - if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); + if (isInMemstore) { + this.firstSeqWrittenInCurrentMemstore.putIfAbsent(encodedRegionName, seqNum); + } HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); doWrite(info, logKey, edits, htd, regionCoproHost); this.numEntries.incrementAndGet(); @@ -909,14 +944,14 @@ // sync txn to file system this.sync(txid); } - return txid; + return new Pair(txid, seqNum); } finally { traceScope.close(); } } @Override - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public Pair appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, RegionCoprocessorHost regionCoproHost) throws IOException { @@ -1171,6 +1206,38 @@ return 0; } + /** + * This is a utility method for tests to find the sequence number of the first + * KV in a given region's memstore + * + * @param region + * @return + */ + public long getFirstSeqWrittenInCurrentMemstoreForRegion(HRegion region) { + Long value = firstSeqWrittenInCurrentMemstore.get(region.getRegionName()); + if (value != null) { + return value.longValue(); + } else { + return Long.MAX_VALUE; + } + } + + /** + * This is a utility method for tests to find the sequence number of the first + * KV in a given region's snapshot memstore + * + * @param region + * @return + */ + public long getFirstSeqWrittenInSnapshotMemstoreForRegion(HRegion region) { + Long value = firstSeqWrittenInSnapshotMemstore.get(region.getRegionName()); + if (value != null) { + return value.longValue(); + } else { + return Long.MAX_VALUE; + } + } + boolean canGetCurReplicas() { return this.getNumCurrentReplicas != null; } @@ -1258,34 +1325,62 @@ } /** @return the number of log files in use */ - int getNumLogFiles() { + public int getNumLogFiles() { return outputfiles.size(); } @Override - public Long startCacheFlush(final byte[] encodedRegionName) { - Long oldRegionSeqNum = null; + public Long startCacheFlush(final byte[] regionName) { + return startCacheFlush(regionName, -1L, -1L); + } + + @Override + public Long startCacheFlush(final byte[] encodedRegionName, long firstSeqIdInStoresToFlush, + long firstSeqIdInStoresNotToFlush) { + long num = -1; if (!closeBarrier.beginOp()) { return null; } synchronized (oldestSeqNumsLock) { - oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); - if (oldRegionSeqNum != null) { - Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == null : "Flushing map not cleaned up for " - + Bytes.toString(encodedRegionName); + if (this.firstSeqWrittenInSnapshotMemstore.containsKey(encodedRegionName)) { + LOG.warn("Requested a startCacheFlush while firstSeqWrittenInSnapshotMemstore still" + + " contains " + Bytes.toString(encodedRegionName) + " . Did the previous flush fail?" + + " Will try to complete it"); + } else { + + // If we are flushing the entire memstore, remove the entry from the + // current memstores. + if (firstSeqIdInStoresNotToFlush == Long.MAX_VALUE) { + Long seq = this.firstSeqWrittenInCurrentMemstore.remove(encodedRegionName); + if (seq != null) { + this.firstSeqWrittenInSnapshotMemstore.put(encodedRegionName, seq); + } + num = obtainSeqNum(); + } else { + // Amongst the Stores not being flushed, what is the smallest sequence + // number? Put that in this map. + this.firstSeqWrittenInCurrentMemstore.replace(encodedRegionName, + firstSeqIdInStoresNotToFlush); + + // Amongst the Stores being flushed, what is the smallest sequence + // number? Put that in this map. + this.firstSeqWrittenInSnapshotMemstore.put(encodedRegionName, + firstSeqIdInStoresToFlush); + + // During Log Replay, we can safely discard any edits that have + // the sequence number less than the smallest sequence id amongst the + // stores that we are not flushing. This might re-apply some edits + // which belonged to stores which are going to be flushed, but we + // expect these operations to be idempotent anyways, and this is + // simpler. + num = firstSeqIdInStoresNotToFlush - 1; + } } } - if (oldRegionSeqNum == null) { - // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either - // the region is already flushing (which would make this call invalid), or there - // were no appends after last flush, so why are we starting flush? Maybe we should - // assert not null, and switch to "long" everywhere. Less rigorous, but safer, - // alternative is telling the caller to stop. For now preserve old logic. - LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" - + Bytes.toString(encodedRegionName) + "]"); + if (num == -1) { + num = obtainSeqNum(); } - return obtainSeqNum(); + return num; } @Override @@ -1303,8 +1398,9 @@ synchronized (oldestSeqNumsLock) { seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName); if (seqNumBeforeFlushStarts != null) { - currentSeqNum = + /* currentSeqNum = this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts); + */ } } closeBarrier.endOp(); @@ -1381,7 +1477,7 @@ @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - Long result = oldestUnflushedSeqNums.get(encodedRegionName); + Long result = firstSeqWrittenInCurrentMemstore.get(encodedRegionName); return result == null ? HConstants.NO_SEQNUM : result.longValue(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; @@ -300,7 +301,7 @@ * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public Pair appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, RegionCoprocessorHost regionCoproHost) throws IOException; @@ -337,6 +338,34 @@ Long startCacheFlush(final byte[] encodedRegionName); /** + * WAL keeps track of the sequence numbers that were not yet flushed from memstores + * in order to be able to do cleanup. This method tells WAL that some region is about + * to flush memstore. + * + * We stash the oldest seqNum for the region, and let the the next edit inserted in this + * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor)} + * as new oldest seqnum. In case of flush being aborted, we put the stashed value back; + * in case of flush succeeding, the seqNum of that first edit after start becomes the + * valid oldest seqNum for this region. + * + * In case the per-CF flush is enabled, we cannot simply clear the + * firstSeqWritten entry for the region to be flushed. There might be certain + * CFs whose memstores won't be flushed. Therefore, we need the first LSNs for + * the stores that will be flushed, and first LSNs for the stores that won't + * be flushed. + * + * @param regionName + * @param firstSeqIdInStoresToFlush + * @param firstSeqIdInStoresNotToFlush + * @return current seqNum, to pass on to flushers (who will put it into the metadata of + * the resulting file as an upper-bound seqNum for that file), or NULL if flush + * should not be started. + */ + Long startCacheFlush(final byte[] encodedRegionName, + long firstSeqIdInStoresToFlush, + long firstSeqIdInStoresNotToFlush); + + /** * Complete the cache flush. * @param encodedRegionName Encoded region name. */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -73,6 +73,7 @@ new HashMap(); private AtomicBoolean wakeupPending = new AtomicBoolean(); + private final boolean perColumnFamilyFlushEnabled; private final long threadWakeFrequency; private final HRegionServer server; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -114,6 +115,9 @@ this.globalMemStoreLimitLowMark = lower; this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.perColumnFamilyFlushEnabled = conf.getBoolean( + HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, + HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); this.flushHandlers = new FlushHandler[handlerCount]; LOG.info("globalMemStoreLimit=" + @@ -208,7 +212,7 @@ Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); - flushedOne = flushRegion(regionToFlush, true); + flushedOne = flushRegion(regionToFlush, true, false); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); @@ -317,11 +321,15 @@ } public void requestFlush(HRegion r) { + requestFlush(r, false); + } + + public void requestFlush(HRegion r, boolean selectiveFlushRequest) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, selectiveFlushRequest); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } @@ -332,7 +340,7 @@ synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, false); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -427,7 +435,7 @@ return true; } } - return flushRegion(region, false); + return flushRegion(region, false, fqe.isSelectiveFlushRequest()); } /* @@ -437,12 +445,14 @@ * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). - * + * @param selectiveFlushRequest Do we want to selectively flush only the + * column families that dominate the memstore size? * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + private boolean flushRegion(final HRegion region, final boolean emergencyFlush, + boolean selectiveFlushRequest) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { @@ -453,7 +463,7 @@ } lock.readLock().lock(); try { - boolean shouldCompact = region.flushcache(); + boolean shouldCompact = region.flushcache(selectiveFlushRequest); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { @@ -607,14 +617,31 @@ private final long createTime; private long whenToExpire; private int requeueCount = 0; + private boolean selectiveFlushRequest; - FlushRegionEntry(final HRegion r) { + /** + * @param r The region to flush + * @param selectiveFlushRequest Do we want to flush only the column + * families that dominate the memstore size, + * i.e., do a selective flush? If we are + * doing log rolling, then we should not do a + * selective flush. + */ + FlushRegionEntry(final HRegion r, boolean selectiveFlushRequest) { this.region = r; this.createTime = System.currentTimeMillis(); this.whenToExpire = this.createTime; + this.selectiveFlushRequest = selectiveFlushRequest; } - + /** + * @return Is this a request for a selective flush? + */ + public boolean isSelectiveFlushRequest() { + return selectiveFlushRequest; + } + + /** * @param maximumWait * @return True if we have been delayed > maximumWait milliseconds. */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -72,16 +73,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -379,7 +380,13 @@ long memstoreFlushSize; final long timestampSlop; final long rowProcessorTimeout; - private volatile long lastFlushTime; + // The maximum size a column family's memstore can grow up to, + // before being flushed. + long columnfamilyMemstoreFlushSize; + // Last flush time for each Store. Useful when we are flushing for each column + private Map lastStoreFlushTimeMap = new ConcurrentHashMap(); + // Selective flushing of Column Families which dominate the memstore? + final boolean perColumnFamilyFlushEnabled; final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); @@ -478,6 +485,11 @@ .addWritableMap(htd.getValues()); this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL); + this.columnfamilyMemstoreFlushSize = 0L; + this.perColumnFamilyFlushEnabled = conf.getBoolean( + HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, + HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); + LOG.debug("Per Column Family Flushing: " + perColumnFamilyFlushEnabled); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -554,6 +566,9 @@ this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + this.columnfamilyMemstoreFlushSize = conf.getLong( + HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE); } /** @@ -625,7 +640,10 @@ // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); - this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + for (Store store : stores.values()) { + this.lastStoreFlushTimeMap.put(store, startTime); + } // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; @@ -1156,11 +1174,24 @@ return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return Returns the earliest time a store in the region + * was flushed. All other stores in the region would + * have been flushed either at, or after this time. + */ + public long getMinFlushTimeForAllStores() { + return Collections.min(this.lastStoreFlushTimeMap.values()); } + /** + * Returns the last time a particular store was flushed + * @param store The store in question + * @return The last time this store was flushed + */ + public long getLastStoreFlushTime(Store store) { + return this.lastStoreFlushTimeMap.get(store); + } + ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -1307,6 +1338,16 @@ } /** + * Flush the cache, while disabling selective flushing. + * + * @return + * @throws IOException + */ + public boolean flushcache() throws IOException { + return flushcache(false); + } + + /** * Flush the cache. * * When this method is called the cache will be flushed unless: @@ -1320,18 +1361,27 @@ *

This method may block for some time, so it should not be called from a * time-sensitive thread. * + * @param selectiveFlushRequest If true, selectively flush column families + * which dominate the memstore size, provided it + * is enabled in the configuration. * @return true if the region needs compacting * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public boolean flushcache() throws IOException { + public boolean flushcache(boolean selectiveFlushRequest) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { LOG.debug("Skipping flush on " + this + " because closing"); return false; } + // If a selective flush was requested, but the per-column family switch is + // off, we cannot do a selective flush. + if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) { + LOG.debug("Disabling selective flushing of Column Families' memstores."); + selectiveFlushRequest = false; + } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache @@ -1365,8 +1415,37 @@ return false; } } + Collection specificStoresToFlush = null; try { - boolean result = internalFlushcache(status); + // We now have to flush the memstore since it has + // reached the threshold, however, we might not need + // to flush the entire memstore. If there are certain + // column families that are dominating the memstore size, + // we will flush just those. The second behavior only + // happens when selectiveFlushRequest is true. + boolean result; + + // If it is okay to flush the memstore by selecting the + // column families which dominate the size, we are going + // to populate the specificStoresToFlush set. + if (selectiveFlushRequest) { + specificStoresToFlush = new HashSet(); + for (Store store : stores.values()) { + if (shouldFlushStore(store)) { + specificStoresToFlush.add(store); + LOG.debug("Column Family: " + store.getColumnFamilyName() + + " of region " + this + " will be flushed"); + } + } + // Didn't find any CFs which were above the threshold for selection. + if (specificStoresToFlush.size() == 0) { + LOG.debug("Since none of the CFs were above the size, flushing all."); + specificStoresToFlush = stores.values(); + } + } else { + specificStoresToFlush = stores.values(); + } + result = internalFlushcache(status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1397,7 +1476,7 @@ } long now = EnvironmentEdgeManager.currentTimeMillis(); //if we flushed in the recent past, we don't need to do again now - if ((now - getLastFlushTime() < flushCheckInterval)) { + if ((now - getMinFlushTimeForAllStores() < flushCheckInterval)) { return false; } //since we didn't flush in the recent past, flush now if certain conditions @@ -1448,20 +1527,40 @@ */ protected boolean internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(this.log, -1, status); + return internalFlushcache(this.log, -1, stores.values(), status); } /** + * See {@link #internalFlushcache(org.apache.hadoop.hbase.monitoring.MonitoredTask)} + * @param storesToFlush The specific stores to flush. + * @param status + * @return + * @throws IOException + */ + protected boolean internalFlushcache(Collection storesToFlush, + MonitoredTask status) + throws IOException { + return internalFlushcache(this.log, -1L, storesToFlush, status); + } + + protected boolean internalFlushcache(final HLog wal, final long myseqid, + MonitoredTask status) + throws IOException { + return internalFlushcache(wal, myseqid, stores.values(), status); + } + + /** * @param wal Null if we're NOT to go via hlog/wal. * @param myseqid The seqid to use if wal is null writing out * flush file. + * @param storesToFlush The list of stores to flush. * @param status * @return true if the region needs compacting * @throws IOException * @see #internalFlushcache(MonitoredTask) */ protected boolean internalFlushcache( - final HLog wal, final long myseqid, MonitoredTask status) + final HLog wal, final long myseqid, Collection storesToFlush, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe @@ -1471,13 +1570,26 @@ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { + // Since there is nothing to flush, we will reset the flush times for all + // the stores. + for (Store store : stores.values()) { + lastStoreFlushTimeMap.put(store, startTime); + } return false; } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + ", current region memstore size " + StringUtils.humanReadableInt(this.memstoreSize.get()) + + ", and " + storesToFlush.size() + "/" + stores.size() + + " column families' memstores are being flushed." + ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); + for (Store store : storesToFlush) { + LOG.debug("Flushing Column Family: " + store.getColumnFamilyName() + + " which was occupying " + + StringUtils.humanReadableInt(store.getMemStoreSize()) + + " of memstore."); + } } // Stop updates while we snapshot the memstore of all stores. We only have @@ -1495,9 +1607,28 @@ status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long flushsize = this.memstoreSize.get(); status.setStatus("Preparing to flush by snapshotting stores"); - List storeFlushCtxs = new ArrayList(stores.size()); + long totalMemstoreSizeOfFlushableStores = 0; + + Set storesNotToFlush = new HashSet(stores.values()); + storesNotToFlush.removeAll(storesToFlush); + + // Calculate the smallest LSN numbers for edits in the stores that will + // be flushed and the ones which won't be. This will be used to populate + // the firstSeqWrittenInCurrentMemstore and + // firstSeqWrittenInSnapshotMemstore maps correctly. + long firstSeqIdInStoresToFlush = Long.MAX_VALUE; + for (Store store : storesToFlush) { + firstSeqIdInStoresToFlush = Math.min(firstSeqIdInStoresToFlush, + store.getSmallestSeqNumberInMemstore()); + } + + long firstSeqIdInStoresNotToFlush = Long.MAX_VALUE; + for (Store store : storesNotToFlush) { + firstSeqIdInStoresNotToFlush = Math.min(firstSeqIdInStoresNotToFlush, + store.getSmallestSeqNumberInMemstore()); + } + List storeFlushCtxs = new ArrayList(storesToFlush.size()); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. @@ -1505,7 +1636,8 @@ mvcc.advanceMemstore(w); if (wal != null) { - Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), + firstSeqIdInStoresToFlush, firstSeqIdInStoresNotToFlush); if (startSeqId == null) { status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - WAL is going away"); @@ -1516,7 +1648,8 @@ flushSeqId = myseqid; } - for (Store s : stores.values()) { + for (Store s : storesToFlush) { + totalMemstoreSizeOfFlushableStores += s.getMemStoreSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } @@ -1528,7 +1661,7 @@ this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + flushsize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalMemstoreSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); @@ -1575,7 +1708,7 @@ storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-flushsize); + this.addAndGetGlobalMemstoreSize(-totalMemstoreSizeOfFlushableStores); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1599,8 +1732,9 @@ } // Record latest flush time - this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - + for (Store store : storesToFlush) { + this.lastStoreFlushTimeMap.put(store, startTime); + } // Update the last flushed sequence id for region if (this.rsServices != null) { completeSequenceId = flushSeqId; @@ -1615,15 +1749,13 @@ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(flushsize) + "/" + flushsize + - ", currentsize=" + - StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + + StringUtils.humanReadableInt(totalMemstoreSizeOfFlushableStores) + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); - this.recentFlushes.add(new Pair(time/1000, flushsize)); + this.recentFlushes.add(new Pair(time/1000, totalMemstoreSizeOfFlushableStores)); return compactionRequested; } @@ -2165,7 +2297,7 @@ != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + addedSize += applyFamilyMapToMemstore(familyMaps[i], w, txid); } // ------------------------------------ @@ -2562,11 +2694,12 @@ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param seqNum The log sequence number associated with the edits. * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry, long seqNum) { long size = 0; boolean freemvcc = false; @@ -2584,7 +2717,7 @@ for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); kv.setMvccVersion(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); + size += store.add(kv, seqNum); } } } finally { @@ -2711,7 +2844,9 @@ writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this); + // Request for a selective flush of the memstore, if possible. + // This function is called by put(), delete(), etc. + this.rsServices.getFlushRequester().requestFlush(this, this.perColumnFamilyFlushEnabled); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this); } @@ -2726,6 +2861,15 @@ } /** + * @param store + * @return true if the size of the store is above the flush threshold for column families + */ + private boolean shouldFlushStore(Store store) { + return (store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize) ? + true : false; + } + + /** * Read the edits log put under this region by wal log splitting process. Put * the recovered edits back up into this region. * @@ -2956,9 +3100,11 @@ // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. - flush = restoreEdit(store, kv); + flush = restoreEdit(store, kv, key.getLogSeqNum()); editsCount++; } + // We do not want to write to the WAL again, and hence setting the WAL + // parameter to null. if (flush) internalFlushcache(null, currentEditSeqId, status); if (coprocessorHost != null) { @@ -3026,10 +3172,11 @@ * Used by tests * @param s Store to add edit too. * @param kv KeyValue to add. + * @param seqNum The sequence number for the edit. * @return True if we should flush. */ - protected boolean restoreEdit(final Store s, final KeyValue kv) { - long kvSize = s.add(kv); + protected boolean restoreEdit(final Store s, final KeyValue kv, long seqNum) { + long kvSize = s.add(kv, seqNum); if (this.rsAccounting != null) { rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize); } @@ -4723,12 +4870,12 @@ Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); + size += store.upsert(entry.getValue(), getSmallestReadPoint(), txid); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + size += store.add(kv, txid); } } allKVs.addAll(entry.getValue()); @@ -4871,12 +5018,12 @@ Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); + size += store.upsert(entry.getValue(), getSmallestReadPoint(), txid); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + size += store.add(kv, txid); } } allKVs.addAll(entry.getValue()); @@ -4927,7 +5074,7 @@ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (11 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -112,17 +112,19 @@ * across all of them. * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param seqNum The LSN associated with the key. * @return memstore size delta * @throws IOException */ - long upsert(Iterable cells, long readpoint) throws IOException; + long upsert(Iterable cells, long readpoint, long seqNum) throws IOException; /** * Adds a value to the memstore * @param kv + * @param seqNum The LSN associated with the key. * @return memstore size delta */ - long add(KeyValue kv); + long add(KeyValue kv, long seqNum); /** * When was the last edit done in the memstore @@ -150,6 +152,12 @@ */ KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException; + /** + * A helper function to get the smallest LSN in the mestore. + * @return + */ + public long getSmallestSeqNumberInMemstore(); + FileSystem getFileSystem(); /* Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3690,7 +3690,7 @@ LOG.info("Flushing " + region.getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { - shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs(); + shouldFlush = region.getMinFlushTimeForAllStores() < request.getIfOlderThanTs(); } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { @@ -3701,7 +3701,7 @@ } builder.setFlushed(result); } - builder.setLastFlushTime(region.getLastFlushTime()); + builder.setLastFlushTime(region.getMinFlushTimeForAllStores()); return builder.build(); } catch (IOException ie) { throw new ServiceException(ie); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (working copy) @@ -30,8 +30,12 @@ * Tell the listener the cache needs to be flushed. * * @param region the HRegion requesting the cache flush + * @param selectiveFlushRequest is this a selective flush request? This means + * that if some column families are dominating + * the memstore size, only those column families + * would be flushed. */ - void requestFlush(HRegion region); + void requestFlush(HRegion region, boolean selectiveFlushRequest); /** * Tell the listener the cache needs to be flushed after a delay * Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1525721) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -78,6 +78,9 @@ // Snapshot of memstore. Made for flusher. volatile KeyValueSkipListSet snapshot; + // Smallest LSN amongst all the edits in the Memstore + volatile AtomicLong smallestSeqNumber = new AtomicLong(); + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final KeyValue.KVComparator comparator; @@ -122,6 +125,7 @@ this.allocator = null; this.chunkPool = null; } + this.smallestSeqNumber.set(Long.MAX_VALUE); } void dump() { @@ -134,6 +138,31 @@ } /** + * Get the smallest LSN + * @return + */ + long getSmallestSeqNumber() { + return smallestSeqNumber.get(); + } + + /** + * Update the smallest LSN + * @param seqNum + */ + void updateSmallestSeqNumber(long seqNum) { + if (seqNum < 0) { + return; + } + + // Do a Compare-and-Set instead of synchronized here. + long smallestSeqNumberVal; + do { + smallestSeqNumberVal = smallestSeqNumber.get(); + } while (!smallestSeqNumber.compareAndSet(smallestSeqNumberVal, + Math.min(smallestSeqNumberVal, seqNum))); + } + + /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)} * To get the snapshot made by this method, use {@link #getSnapshot()} @@ -150,6 +179,8 @@ if (!this.kvset.isEmpty()) { this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); + // Reset the smallest sequence number + this.smallestSeqNumber.set(Long.MAX_VALUE); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys @@ -213,15 +244,27 @@ } /** + * Write an update. + * This method should only be used by tests, since it does not specify the + * LSN for the edit. + * @param kv + * @return + */ + long add(final KeyValue kv) { + return add(kv, -1L); + } + + /** * Write an update * @param kv + * @param seqNum * @return approximate size of the passed key and value. */ - long add(final KeyValue kv) { + long add(final KeyValue kv, long seqNum) { this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(kv); - return internalAdd(toAdd); + return internalAdd(toAdd, seqNum); } finally { this.lock.readLock().unlock(); } @@ -255,10 +298,11 @@ * * Callers should ensure they already have the read lock taken */ - private long internalAdd(final KeyValue toAdd) { + private long internalAdd(final KeyValue toAdd, long seqNum) { long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); + updateSmallestSeqNumber(seqNum); return s; } @@ -314,17 +358,27 @@ } /** + * Should only be used in tests, since it does not provide a seqNum. + * @param delete + * @return + */ + long delete(final KeyValue delete) { + return delete(delete, -1); + } + + /** * Write a delete * @param delete * @return approximate size of the passed key and value. */ - long delete(final KeyValue delete) { + long delete(final KeyValue delete, long seqNum) { long s = 0; this.lock.readLock().lock(); try { KeyValue toAdd = maybeCloneWithAllocator(delete); s += heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); + updateSmallestSeqNumber(seqNum); } finally { this.lock.readLock().unlock(); } @@ -480,13 +534,15 @@ * @param qualifier * @param newValue * @param now + * @param seqNum The LSN for the edit * @return Timestamp */ long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, - long now) { + long now, + long seqNum) { this.lock.readLock().lock(); try { KeyValue firstKv = KeyValue.createFirstOnRow( @@ -531,7 +587,7 @@ // 'now' and a 0 memstoreTS == immediately visible List cells = new ArrayList(1); cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); + return upsert(cells, 1L, seqNum); } finally { this.lock.readLock().unlock(); } @@ -555,12 +611,12 @@ * @param readpoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ - public long upsert(Iterable cells, long readpoint) { + public long upsert(Iterable cells, long readpoint, long seqNum) { this.lock.readLock().lock(); try { long size = 0; for (Cell cell : cells) { - size += upsert(cell, readpoint); + size += upsert(cell, readpoint, seqNum); } return size; } finally { @@ -582,7 +638,7 @@ * @param cell * @return change in size of MemStore */ - private long upsert(Cell cell, long readpoint) { + private long upsert(Cell cell, long readpoint, long seqNum) { // Add the KeyValue to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -590,7 +646,7 @@ // test that triggers the pathological case if we don't avoid MSLAB // here. KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - long addedSize = internalAdd(kv); + long addedSize = internalAdd(kv, seqNum); // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts @@ -958,7 +1014,7 @@ } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + ClassSize.OBJECT + (12 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + Index: hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 1525721) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -213,6 +213,9 @@ * the contents are flushed to the store files */ public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024*1024*128L; + + public static final long DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE = + 1024*1024*16L; private final static Map DEFAULT_VALUES = new HashMap(); Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1525721) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -328,7 +328,22 @@ /** Conf key for the memstore size at which we flush the memstore */ public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size"; + + /** Conf key for enabling Per Column Family flushing of memstores */ + public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + "hbase.hregion.memstore.percolumnfamilyflush.enabled"; + /** Default value for the Per Column Family flush knob */ + public static final Boolean DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + false; + + /** + * If Per Column Family flushing is enabled, this is the minimum size + * at which a column family's memstore is flushed. + */ + public static final String HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE = + "hbase.hregion.memstore.percolumnfamilyflush.flush.size"; + public static final String HREGION_EDITS_REPLAY_SKIP_ERRORS = "hbase.hregion.edits.replay.skip.errors"; Index: hbase-common/src/main/resources/hbase-default.xml =================================================================== --- hbase-common/src/main/resources/hbase-default.xml (revision 1525721) +++ hbase-common/src/main/resources/hbase-default.xml (working copy) @@ -519,6 +519,19 @@ every hbase.server.thread.wakefrequency. + hbase.columnfamily.memstore.flush.size + 16777216 + + If per column family flushing is turned on, then every time that we hit the + total memstore limit, we find out all the column families whose memstores + exceed this value, and only flush them, while retaining the others whose + memstores are lower than this limit. If none of the families have their + memstore size more than this, all the memstores will be flushed + (just as usual). This value should less than half of the total memstore + threshold (hbase.hregion.memstore.flush.size). + + + hbase.hregion.preclose.flush.size 5242880