From 6167ded2d5d781b58ef645301b67b8dfde03ff31 Mon Sep 17 00:00:00 2001 From: mswapna Date: Tue, 22 May 2018 14:52:09 -0700 Subject: [PATCH] skip big rows --- .../java/org/apache/hadoop/hbase/HConstants.java | 10 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 59 ++++++- .../hbase/regionserver/TestSkipBigRowScanner.java | 185 +++++++++++++++++++++ 3 files changed, 251 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 9241682036..02aff31841 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -364,6 +364,16 @@ public final class HConstants { */ public static final long TABLE_MAX_ROWSIZE_DEFAULT = 1024 * 1024 * 1024L; + /** + * Whether to skip rows with size more than hbase.table.max.rowsize when doing the table scan + */ + public static final String TABLE_SKIP_BIGROWS_KEY = "hbase.scanner.skip.bigrows"; + + /** + * Default value for hbase.scanner.skip.bigrows + */ + public static final boolean TABLE_SKIP_BIGROWS_DEFAULT = false; + /** * The max number of threads used for opening and closing stores or store * files in parallel diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c0203a4905..603688bdf3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.RowTooBigException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -673,6 +674,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability regionDurability; private final boolean regionStatsEnabled; + private final boolean doSkipBigRows; // Stores the replication scope of the various column families of the table // that has non-default scope private final NavigableMap replicationScope = new TreeMap<>( @@ -839,6 +841,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); this.miniBatchSize = conf.getInt(HBASE_REGIONSERVER_MINIBATCH_SIZE, DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE); + this.doSkipBigRows = conf.getBoolean(HConstants.TABLE_SKIP_BIGROWS_KEY, + HConstants.TABLE_SKIP_BIGROWS_DEFAULT); } void setHTableSpecificConf() { @@ -6508,6 +6512,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getDataSizeProgress(); long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); + boolean initialKeepProgress = scannerContext.getKeepProgress(); // Used to check time limit LimitScope limitScope = LimitScope.BETWEEN_CELLS; @@ -6611,7 +6616,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, current); + try { + populateResult(results, this.storeHeap, scannerContext, current); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + this.storeHeap.reseek(PrivateCellUtil.createLastOnRow(current)); + continue; + } else { + throw e; + } + } + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { throw new IncompatibleFilterException( @@ -6675,7 +6691,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean mayHaveData = joinedHeapMayHaveData(current); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); + + try { + populateFromJoinedHeap(results, scannerContext); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + joinedContinuationRow = null; + continue; + } else { + throw e; + } + } if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; @@ -6684,7 +6711,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } else { // Populating from the joined heap was stopped by limits, populate some more. - populateFromJoinedHeap(results, scannerContext); + try { + populateFromJoinedHeap(results, scannerContext); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + joinedContinuationRow = null; + continue; + } else { + throw e; + } + } + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; } @@ -8603,4 +8641,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi requestFlush0(tracker); } + /** + * Clear state when we encounter a big row during scan + */ + private void logAndClearStateOnBigRow(Cell current, List results, ScannerContext context, + boolean keepProgress) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping big row during scan. Table:" + htableDescriptor.getTableName() + + " ,rowKey:" + Bytes.toStringBinary(current.getRowArray(), + current.getRowOffset(), current.getRowLength())); + } + + results.clear(); + context.setKeepProgress(keepProgress); + } + } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java new file mode 100644 index 0000000000..c40723d5c4 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestSkipBigRowScanner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSkipBigRowScanner.class); + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static int valueWidth = 2 * 1024 * 1024; + + @BeforeClass + public static void before() throws Exception { + HTU.getConfiguration().setLong(HConstants.TABLE_MAX_ROWSIZE_KEY, + 1024 * 1024L); + HTU.getConfiguration().setBoolean(HConstants.TABLE_SKIP_BIGROWS_KEY, + true); + HTU.startMiniCluster(); + } + + @AfterClass + public static void after() throws Exception { + HTU.shutdownMiniCluster(); + } + + + @Test + public void testSkipBigRow() throws Exception { + final TableName tableName = TableName.valueOf("testSkipBigRow"); + final byte[] cf_name = Bytes.toBytes("a"); + final byte[] col_name = Bytes.toBytes("a"); + + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cf_name).build(); + TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); + tableDescriptorBuilder.setColumnFamily(cfd); + TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); + HTU.getAdmin().createTable(tableDescriptor); + Table ht = HTU.getConnection().getTable(tableName); + + byte[] val_large = new byte[valueWidth]; + + List puts = new ArrayList<>(); + Put put = new Put(Bytes.toBytes("0")); + put.addColumn(cf_name, col_name, val_large); + puts.add(put); + + put = new Put(Bytes.toBytes("1")); + put.addColumn(cf_name, col_name, Bytes.toBytes("small")); + puts.add(put); + + put = new Put(Bytes.toBytes("2")); + put.addColumn(cf_name, col_name, val_large); + puts.add(put); + + ht.put(puts); + puts.clear(); + + Scan scan = new Scan(); + scan.addColumn(cf_name, col_name); + ResultScanner result_scanner = ht.getScanner(scan); + Result res; + long rows_count = 0; + //Only 1 row + while ((res = result_scanner.next()) != null) { + Assert.assertEquals("1", Bytes.toString(res.getRow())); + rows_count++; + } + + Assert.assertEquals(1, rows_count); + result_scanner.close(); + ht.close(); + } + + @Test + public void testSkipRowInJoinedHeap() throws IOException { + final TableName tableName = TableName.valueOf("testSkipRowInJoinedHeap"); + final byte[] essential_cf = Bytes.toBytes("essential"); + final byte[] joined_cf = Bytes.toBytes("joined"); + final byte[] col_name = Bytes.toBytes("a"); + final byte[] flag_yes = Bytes.toBytes("Y"); + + + ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(essential_cf); + ColumnFamilyDescriptor essential = cfd.build(); + ColumnFamilyDescriptor joined = ColumnFamilyDescriptorBuilder.newBuilder(joined_cf).build(); + + TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); + tableDescriptorBuilder.setColumnFamily(essential); + tableDescriptorBuilder.setColumnFamily(joined); + + TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); + HTU.getAdmin().createTable(tableDescriptor); + Table ht = HTU.getConnection().getTable(tableName); + + byte[] val_large = new byte[valueWidth]; + List puts = new ArrayList<>(); + Put put = new Put(Bytes.toBytes("0")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, val_large); + puts.add(put); + + put = new Put(Bytes.toBytes("1")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, Bytes.toBytes("small")); + puts.add(put); + + put = new Put(Bytes.toBytes("2")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, val_large); + puts.add(put); + + ht.put(puts); + puts.clear(); + + Scan scan = new Scan(); + scan.addColumn(essential_cf, col_name); + scan.addColumn(joined_cf, col_name); + + SingleColumnValueFilter filter = new SingleColumnValueFilter( + essential_cf, col_name, CompareOperator.EQUAL, flag_yes); + filter.setFilterIfMissing(true); + scan.setFilter(filter); + scan.setLoadColumnFamiliesOnDemand(true); + + ResultScanner result_scanner = ht.getScanner(scan); + Result res; + long rows_count = 0; + //Only 1 row + while ((res = result_scanner.next()) != null) { + Assert.assertEquals("1", Bytes.toString(res.getRow())); + rows_count++; + } + + Assert.assertEquals(1, rows_count); + result_scanner.close(); + ht.close(); + } +} -- 2.14.3 (Apple Git-98)