From 389740853fc66970a3b4f1aaf73bc802fd22f85d Mon Sep 17 00:00:00 2001 From: Shrijeet Paliwal Date: Tue, 31 Jul 2012 11:51:17 -0700 Subject: [PATCH] HBASE-6468 RowCounter may return incorrect results The RowCounter uses FirstKeyOnlyFilter regardless of whether or not the command line argument specified a column family (or family:qualifier). In case when no qualifier was specified as argument, the scan will give correct result. However in the other case the scan instance may have been set with columns other than the very first column in the row, causing scan to get nothing as the FirstKeyOnlyFilter removes everything else. As a fix we do not use FirstKeyOnlyFilter if column was specified in command line, instaed we use FirstKeyValueMatchingQualifiersFilter. FirstKeyValueMatchingQualifiersFilter is a new filter which looks for the given columns in KeyValue. Once there is a match for any one of the columns, it returns ReturnCode.NEXT_ROW for remaining KeyValues in the row. Also add test case for RowCounter job. --- .../hadoop/hbase/filter/FirstKeyOnlyFilter.java | 15 ++ .../FirstKeyValueMatchingQualifiersFilter.java | 80 +++++++++ .../apache/hadoop/hbase/mapreduce/RowCounter.java | 17 ++- hbase-server/src/main/protobuf/Filter.proto | 3 + .../TestFirstKeyValueMatchingQualifiersFilter.java | 74 +++++++++ .../hadoop/hbase/mapreduce/TestRowCounter.java | 173 ++++++++++++++++++++ 6 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java index f67bd8b..fc11e86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java @@ -60,6 +60,21 @@ public class FirstKeyOnlyFilter extends FilterBase { return new FirstKeyOnlyFilter(); } + /** + * @return true if first KV has been found. + */ + protected boolean hasFoundKV() { + return this.foundKV; + } + + /** + * + * @param value update {@link #foundKV} flag with value. + */ + protected void setFoundKV(boolean value) { + this.foundKV = value; + } + public void write(DataOutput out) throws IOException { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java new file mode 100644 index 0000000..ab3eed4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.filter; + +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.mapreduce.RowCounter; + +/** + * The filter looks for the given columns in KeyValue. Once there is a match for + * any one of the columns, it returns ReturnCode.NEXT_ROW for remaining + * KeyValues in the row. + *

+ * Note : It may emit KVs which do not have the given columns in them, if + * these KVs happen to occur before a KV which does have a match. Given this + * caveat, this filter is only useful for special cases like {@link RowCounter}. + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FirstKeyValueMatchingQualifiersFilter extends FirstKeyOnlyFilter { + + private Set qualifiers; + + /** + * This constructor should not be used. + */ + public FirstKeyValueMatchingQualifiersFilter() { + qualifiers = Collections.emptySet(); + } + + /** + * Constructor which takes a set of columns. As soon as first KeyValue + * matching any of these columns is found, filter moves to next row. + * + * @param qualifiers the set of columns to me matched. + */ + public FirstKeyValueMatchingQualifiersFilter(Set qualifiers) { + this.qualifiers = qualifiers; + } + + public ReturnCode filterKeyValue(KeyValue v) { + if (hasFoundKV()) { + return ReturnCode.NEXT_ROW; + } else if (hasOneMatchingQualifier(v)) { + setFoundKV(true); + } + return ReturnCode.INCLUDE; + } + + private boolean hasOneMatchingQualifier(KeyValue v) { + for (byte[] q : qualifiers) { + if (v.matchingQualifier(q)) { + return true; + } + } + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 5e91b25..350e2ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -20,15 +20,17 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; @@ -114,6 +116,7 @@ public class RowCounter { job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); + Set qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); if (startKey != null && !startKey.equals("")) { scan.setStartRow(Bytes.toBytes(startKey)); } @@ -127,10 +130,20 @@ public class RowCounter { if(fields.length == 1) { scan.addFamily(Bytes.toBytes(fields[0])); } else { - scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + byte[] qualifier = Bytes.toBytes(fields[1]); + qualifiers.add(qualifier); + scan.addColumn(Bytes.toBytes(fields[0]), qualifier); } } } + // specified column may or may not be part of first key value for the row. + // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use + // FirstKeyValueMatchingQualifiersFilter. + if (qualifiers.size() == 0) { + scan.setFilter(new FirstKeyOnlyFilter()); + } else { + scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); + } job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); diff --git a/hbase-server/src/main/protobuf/Filter.proto b/hbase-server/src/main/protobuf/Filter.proto index d6cdf72..c665a07 100644 --- a/hbase-server/src/main/protobuf/Filter.proto +++ b/hbase-server/src/main/protobuf/Filter.proto @@ -65,6 +65,9 @@ message FamilyFilter { message FirstKeyOnlyFilter { } +message FirstKeyValueMatchingQualifiersFilter { +} + message InclusiveStopFilter { required bytes stopRowKey = 1; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java new file mode 100644 index 0000000..5ee0c9b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyValueMatchingQualifiersFilter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.util.Set; +import java.util.TreeSet; + +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestFirstKeyValueMatchingQualifiersFilter extends TestCase { + private static final byte[] ROW = Bytes.toBytes("test"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("test"); + private static final byte[] COLUMN_QUALIFIER_1 = Bytes.toBytes("foo"); + private static final byte[] COLUMN_QUALIFIER_2 = Bytes.toBytes("foo_2"); + private static final byte[] COLUMN_QUALIFIER_3 = Bytes.toBytes("foo_3"); + private static final byte[] VAL_1 = Bytes.toBytes("a"); + + /** + * Test the functionality of + * {@link FirstKeyValueMatchingQualifiersFilter#filterKeyValue(KeyValue)} + * + * @throws Exception + */ + public void testFirstKeyMatchingQualifierFilter() throws Exception { + Set quals = new TreeSet(Bytes.BYTES_COMPARATOR); + quals.add(COLUMN_QUALIFIER_1); + quals.add(COLUMN_QUALIFIER_2); + Filter filter = new FirstKeyValueMatchingQualifiersFilter(quals); + + // Match in first attempt + KeyValue kv; + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1); + assertTrue("includeAndSetFlag", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + assertTrue("flagIsSetSkipToNextRow", + filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW); + + // A mismatch in first attempt and match in second attempt. + filter.reset(); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_3, VAL_1); + System.out.println(filter.filterKeyValue(kv)); + assertTrue("includeFlagIsUnset", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + assertTrue("includeAndSetFlag", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1); + assertTrue("flagIsSetSkipToNextRow", + filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java new file mode 100644 index 0000000..bd68052 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the rowcounter map reduce job. + */ +@Category(MediumTests.class) +public class TestRowCounter { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.startMiniMapReduceCluster(); + HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME), + Bytes.toBytes(COL_FAM)); + writeRows(table); + table.close(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + /** + * Test a case when no column was specified in command line arguments. + * + * @throws Exception + */ + @Test + public void testRowCounterNoColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME + }; + runRowCount(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception + */ + @Test + public void testRowCounterExclusiveColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1 + }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception + */ + @Test + public void testRowCounterHiddenColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL2 + }; + runRowCount(args, 10); + } + + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args the command line arguments to be used for rowcounter job. + * @param expectedCount the expected row count (result of map reduce job). + * @throws Exception + */ + private void runRowCount(String[] args, int expectedCount) throws Exception { + GenericOptionsParser opts = new GenericOptionsParser( + TEST_UTIL.getConfiguration(), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = RowCounter.createSubmittableJob(conf, args); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter( + RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + /** + * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have + * two columns, Few have one. + * + * @param table + * @throws IOException + */ + private static void writeRows(HTable table) throws IOException { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] value = Bytes.toBytes("abcd"); + final byte[] col1 = Bytes.toBytes(COL1); + final byte[] col2 = Bytes.toBytes(COL2); + ArrayList rowsUpdate = new ArrayList(); + // write few rows with two columns + int i = 0; + for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col1, value); + put.add(family, col2, value); + rowsUpdate.add(put); + } + + // write few rows with only one column + for (; i < TOTAL_ROWS; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col2, value); + rowsUpdate.add(put); + } + table.put(rowsUpdate); + } +} -- 1.7.7.5 (Apple Git-26)