From 18074fce4c856f2438dcb6ad59f9ddc0d729c449 Mon Sep 17 00:00:00 2001 From: mswapna Date: Thu, 31 May 2018 10:44:53 -0700 Subject: [PATCH] skip big row --- .../java/org/apache/hadoop/hbase/HConstants.java | 10 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 56 ++++++- .../hbase/regionserver/TestSkipBigRowScanner.java | 171 +++++++++++++++++++++ 3 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e702236c30..3fa5038807 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -360,6 +360,16 @@ public final class HConstants { */ public static final long TABLE_MAX_ROWSIZE_DEFAULT = 1024 * 1024 * 1024L; + /** + * Whether to skip rows with size more than hbase.table.max.rowsize when doing the table scan + */ + public static final String TABLE_SKIP_BIGROWS_KEY = "hbase.scanner.skip.bigrows"; + + /** + * Default value for hbase.scanner.skip.bigrows + */ + public static final boolean TABLE_SKIP_BIGROWS_DEFAULT = false; + /** * The max number of threads used for opening and closing stores or store * files in parallel diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index af20f40588..155f8a1237 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -677,6 +677,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; private final boolean regionStatsEnabled; + private final boolean doSkipBigRows; /** * HRegion constructor. This constructor should only be used for testing and @@ -836,6 +837,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); + this.doSkipBigRows = conf.getBoolean(HConstants.TABLE_SKIP_BIGROWS_KEY, + HConstants.TABLE_SKIP_BIGROWS_DEFAULT); } void setHTableSpecificConf() { @@ -6291,6 +6294,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int initialBatchProgress = scannerContext.getBatchProgress(); long initialSizeProgress = scannerContext.getSizeProgress(); long initialTimeProgress = scannerContext.getTimeProgress(); + boolean initialKeepProgress = scannerContext.getKeepProgress(); // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. @@ -6386,7 +6390,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); + try { + populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + this.storeHeap.reseek(KeyValueUtil.createLastOnRow(current)); + continue; + } else { + throw e; + } + } + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { @@ -6447,7 +6462,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); + try { + populateFromJoinedHeap(results, scannerContext); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + joinedContinuationRow = null; + continue; + } else { + throw e; + } + } if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; @@ -6456,7 +6481,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } else { // Populating from the joined heap was stopped by limits, populate some more. - populateFromJoinedHeap(results, scannerContext); + try { + populateFromJoinedHeap(results, scannerContext); + } catch(RowTooBigException e) { + if (doSkipBigRows) { + logAndClearStateOnBigRow(current, results, scannerContext, initialKeepProgress); + joinedContinuationRow = null; + continue; + } else { + throw e; + } + } if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { return true; } @@ -8975,4 +9010,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public RegionSplitPolicy getSplitPolicy() { return this.splitPolicy; } + + /** + * Clear state when we encounter a big row during scan + */ + private void logAndClearStateOnBigRow(Cell current, List results, ScannerContext context, + boolean keepProgress) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping big row during scan. Table:" + htableDescriptor.getTableName() + + " ,rowKey:" + Bytes.toStringBinary(current.getRowArray(), + current.getRowOffset(), current.getRowLength())); + } + + results.clear(); + context.setKeepProgress(keepProgress); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java new file mode 100644 index 0000000000..b65e16756a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSkipBigRowScanner.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestSkipBigRowScanner { + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static int valueWidth = 2 * 1024 * 1024; + + @BeforeClass + public static void before() throws Exception { + HTU.getConfiguration().setLong(HConstants.TABLE_MAX_ROWSIZE_KEY, + 1024 * 1024L); + HTU.getConfiguration().setBoolean(HConstants.TABLE_SKIP_BIGROWS_KEY, + true); + HTU.startMiniCluster(); + } + + @AfterClass + public static void after() throws Exception { + HTU.shutdownMiniCluster(); + } + + + @Test + public void testSkipBigRow() throws Exception { + final TableName tableName = TableName.valueOf("testSkipBigRow"); + final byte[] cf_name = Bytes.toBytes("a"); + final byte[] col_name = Bytes.toBytes("a"); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testSkipBigRow"));; + HColumnDescriptor hcd = new HColumnDescriptor(cf_name); + htd.addFamily(hcd); + + HTU.getHBaseAdmin().createTable(htd); + Table ht = HTU.getConnection().getTable(tableName); + + byte[] val_large = new byte[valueWidth]; + + List puts = new ArrayList<>(); + Put put = new Put(Bytes.toBytes("0")); + put.addColumn(cf_name, col_name, val_large); + puts.add(put); + + put = new Put(Bytes.toBytes("1")); + put.addColumn(cf_name, col_name, Bytes.toBytes("small")); + puts.add(put); + + put = new Put(Bytes.toBytes("2")); + put.addColumn(cf_name, col_name, val_large); + puts.add(put); + + ht.put(puts); + puts.clear(); + + Scan scan = new Scan(); + scan.addColumn(cf_name, col_name); + ResultScanner result_scanner = ht.getScanner(scan); + Result res; + long rows_count = 0; + //Only 1 row + while ((res = result_scanner.next()) != null) { + Assert.assertEquals("1", Bytes.toString(res.getRow())); + rows_count++; + } + + Assert.assertEquals(1, rows_count); + result_scanner.close(); + ht.close(); + } + + @Test + public void testSkipRowInJoinedHeap() throws IOException { + final TableName tableName = TableName.valueOf("testSkipRowInJoinedHeap"); + final byte[] essential_cf = Bytes.toBytes("essential"); + final byte[] joined_cf = Bytes.toBytes("joined"); + final byte[] col_name = Bytes.toBytes("a"); + final byte[] flag_yes = Bytes.toBytes("Y"); + + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testSkipRowInJoinedHeap"));; + htd.addFamily(new HColumnDescriptor(essential_cf)); + htd.addFamily(new HColumnDescriptor(joined_cf)); + + HTU.getHBaseAdmin().createTable(htd); + Table ht = HTU.getConnection().getTable(tableName); + + byte[] val_large = new byte[valueWidth]; + List puts = new ArrayList<>(); + Put put = new Put(Bytes.toBytes("0")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, val_large); + puts.add(put); + + put = new Put(Bytes.toBytes("1")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, Bytes.toBytes("small")); + puts.add(put); + + put = new Put(Bytes.toBytes("2")); + put.addColumn(essential_cf, col_name, flag_yes); + put.addColumn(joined_cf, col_name, val_large); + puts.add(put); + + ht.put(puts); + puts.clear(); + + Scan scan = new Scan(); + scan.addColumn(essential_cf, col_name); + scan.addColumn(joined_cf, col_name); + + SingleColumnValueFilter filter = new SingleColumnValueFilter( + essential_cf, col_name, CompareFilter.CompareOp.EQUAL, flag_yes); + filter.setFilterIfMissing(true); + scan.setFilter(filter); + scan.setLoadColumnFamiliesOnDemand(true); + + ResultScanner result_scanner = ht.getScanner(scan); + Result res; + long rows_count = 0; + //Only 1 row + while ((res = result_scanner.next()) != null) { + Assert.assertEquals("1", Bytes.toString(res.getRow())); + rows_count++; + } + + Assert.assertEquals(1, rows_count); + result_scanner.close(); + ht.close(); + } +} -- 2.14.3 (Apple Git-98)