From ef4767693437cb1563977309a27cb547188f9a74 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 19 Mar 2018 13:28:33 +0900 Subject: [PATCH] HBASE-20219 An error occurs when scanning with reversed=true and loadColumnFamiliesOnDemand=true --- .../regionserver/ReversedRegionScannerImpl.java | 18 ++- .../hbase/regionserver/TestJoinedScanners.java | 180 ++++++++++++++------- 2 files changed, 131 insertions(+), 67 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index 0ae8fac..3295249 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -19,16 +19,16 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to @@ -51,11 +51,15 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { @Override protected void initializeKVHeap(List scanners, List joinedScanners, HRegion region) throws IOException { - this.storeHeap = new ReversedKeyValueHeap(scanners, comparator); - if (!joinedScanners.isEmpty()) { - this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, - comparator); - } + // HBASE-20219: When loadColumnFamiliesOnDemand is true, joinedHeap.requestSeek() can be + // called when loading data for not essential families. However, ReversedKeyValueHeap doesn't + // support requestSeek(), so we don't set joinedHeap here and we set scanners and + // joinedScanners to storeHeap, meaning we don't use the loadColumnFamiliesOnDemand + // optimization in this case even when loadColumnFamiliesOnDemand is true. + List allScanners = new ArrayList<>(scanners.size() + joinedScanners.size()); + allScanners.addAll(scanners); + allScanners.addAll(joinedScanners); + this.storeHeap = new ReversedKeyValueHeap(allScanners, comparator); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java index 8e84f8a..6d46b11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import org.apache.commons.cli.CommandLine; @@ -27,23 +28,30 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -52,6 +60,10 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + /** * Test performance improvement of joined scanners optimization: * https://issues.apache.org/jira/browse/HBASE-5416 @@ -66,13 +78,12 @@ public class TestJoinedScanners { private static final Logger LOG = LoggerFactory.getLogger(TestJoinedScanners.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString(); private static final byte[] cf_essential = Bytes.toBytes("essential"); private static final byte[] cf_joined = Bytes.toBytes("joined"); private static final byte[] col_name = Bytes.toBytes("a"); private static final byte[] flag_yes = Bytes.toBytes("Y"); - private static final byte[] flag_no = Bytes.toBytes("N"); + private static final byte[] flag_no = Bytes.toBytes("N"); private static DataBlockEncoding blockEncoding = DataBlockEncoding.FAST_DIFF; private static int selectionRatio = 30; @@ -81,79 +92,78 @@ public class TestJoinedScanners { @Rule public TestName name = new TestName(); - @Test - public void testJoinedScanners() throws Exception { - String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; - int regionServersCount = 3; + @BeforeClass + public static void setUpBeforeClass() throws Exception { + final int DEFAULT_BLOCK_SIZE = 1024 * 1024; + TEST_UTIL.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); + TEST_UTIL.getConfiguration().setInt("dfs.replication", 1); + TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L); - HBaseTestingUtility htu = new HBaseTestingUtility(); + String dataNodeHosts[] = new String[] {"host1", "host2", "host3"}; + int regionServersCount = 3; + TEST_UTIL.startMiniCluster(1, regionServersCount, dataNodeHosts); + } - final int DEFAULT_BLOCK_SIZE = 1024*1024; - htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); - htu.getConfiguration().setInt("dfs.replication", 1); - htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L); - MiniHBaseCluster cluster = null; + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } - try { - cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); - byte [][] families = {cf_essential, cf_joined}; + @Test + public void testJoinedScanners() throws Exception { + byte[][] families = {cf_essential, cf_joined}; - final TableName tableName = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(tableName); - for(byte[] family : families) { - HColumnDescriptor hcd = new HColumnDescriptor(family); - hcd.setDataBlockEncoding(blockEncoding); - desc.addFamily(hcd); - } - htu.getAdmin().createTable(desc); - Table ht = htu.getConnection().getTable(tableName); + final TableName tableName = TableName.valueOf(name.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setDataBlockEncoding(blockEncoding); + desc.addFamily(hcd); + } + TEST_UTIL.getAdmin().createTable(desc); + Table ht = TEST_UTIL.getConnection().getTable(tableName); - long rows_to_insert = 1000; - int insert_batch = 20; - long time = System.nanoTime(); - Random rand = new Random(time); + long rows_to_insert = 1000; + int insert_batch = 20; + long time = System.nanoTime(); + Random rand = new Random(time); - LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = " - + Float.toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB"); + LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = " + Float + .toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB"); - byte [] val_large = new byte[valueWidth]; + byte[] val_large = new byte[valueWidth]; - List puts = new ArrayList<>(); + List puts = new ArrayList<>(); - for (long i = 0; i < rows_to_insert; i++) { - Put put = new Put(Bytes.toBytes(Long.toString (i))); - if (rand.nextInt(100) <= selectionRatio) { - put.addColumn(cf_essential, col_name, flag_yes); - } else { - put.addColumn(cf_essential, col_name, flag_no); - } - put.addColumn(cf_joined, col_name, val_large); - puts.add(put); - if (puts.size() >= insert_batch) { - ht.put(puts); - puts.clear(); - } + for (long i = 0; i < rows_to_insert; i++) { + Put put = new Put(Bytes.toBytes(Long.toString(i))); + if (rand.nextInt(100) <= selectionRatio) { + put.addColumn(cf_essential, col_name, flag_yes); + } else { + put.addColumn(cf_essential, col_name, flag_no); } - if (!puts.isEmpty()) { + put.addColumn(cf_joined, col_name, val_large); + puts.add(put); + if (puts.size() >= insert_batch) { ht.put(puts); puts.clear(); } + } + if (!puts.isEmpty()) { + ht.put(puts); + puts.clear(); + } - LOG.info("Data generated in " - + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds"); + LOG.info("Data generated in " + + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds"); - boolean slow = true; - for (int i = 0; i < 10; ++i) { - runScanner(ht, slow); - slow = !slow; - } - - ht.close(); - } finally { - if (cluster != null) { - htu.shutdownMiniCluster(); - } + boolean slow = true; + for (int i = 0; i < 10; ++i) { + runScanner(ht, slow); + slow = !slow; } + + ht.close(); } private void runScanner(Table table, boolean slow) throws Exception { @@ -224,4 +234,54 @@ public class TestJoinedScanners { TestJoinedScanners test = new TestJoinedScanners(); test.testJoinedScanners(); } + + @Test + public void testWithReverseScan() throws Exception { + final byte[] FAM1 = Bytes.toBytes("fam1"); + final byte[] FAM2 = Bytes.toBytes("fam2"); + final byte[] COL1 = Bytes.toBytes("col1"); + final byte[] COL2 = Bytes.toBytes("col2"); + final byte[] VAL1 = Bytes.toBytes("val1"); + final byte[] VAL2 = Bytes.toBytes("val2"); + + try (Connection con = TEST_UTIL.getConnection(); Admin admin = con.getAdmin()) { + TableName tableName = TableName.valueOf(name.getMethodName()); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAM1)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAM2)) + .build(); + admin.createTable(tableDescriptor); + + try (Table table = con.getTable(tableName)) { + Put put = new Put(Bytes.toBytes("row1")); + put.addColumn(FAM1, COL1, VAL1); + put.addColumn(FAM2, COL2, VAL2); + table.put(put); + + put = new Put(Bytes.toBytes("row2")); + put.addColumn(FAM2, COL2, Bytes.toBytes("val")); + table.put(put); + + SingleColumnValueFilter filter = new SingleColumnValueFilter(FAM1, COL1, + CompareOperator.EQUAL, VAL1); + filter.setFilterIfMissing(true); + + Scan scan = new Scan(); + scan.addFamily(FAM1); + scan.addFamily(FAM2); + scan.setFilter(filter); + scan.setReversed(true); + scan.setLoadColumnFamiliesOnDemand(true); + + try (ResultScanner scanner = table.getScanner(scan)) { + Result next = scanner.next(); + assertNotNull(next); + assertFalse(next.isEmpty()); + assertTrue(Arrays.equals(VAL1, next.getValue(FAM1, COL1))); + assertTrue(Arrays.equals(VAL2, next.getValue(FAM2, COL2))); + } + } + } + } } -- 2.10.1 (Apple Git-78)