From 1453cf829cead486aef441c768330a7d1b666795 Mon Sep 17 00:00:00 2001 From: Shrijeet Paliwal Date: Fri, 27 Jul 2012 14:45:53 -0700 Subject: [PATCH] HBASE-6468 RowCounter may return incorrect result The RowCounter uses FirstKeyOnlyFilter regardless of whether or not the command line argument specified a column family (or family:qualifier). In case when no qualifier was specified as argument, the scan will give correct result. However in the other case the scan instance may have been set with columns other than the very first column in the row, causing scan to get nothing as the FirstKeyOnlyFilter removes everything else. As a fix we do not use FirstKeyOnlyFilter if column was specified in command line. --- .../hadoop/hbase/filter/FirstKeyOnlyFilter.java | 15 ++- .../FirstKeyValueMatchingQualifiersFilter.java | 73 +++++++++ .../apache/hadoop/hbase/mapreduce/RowCounter.java | 58 +++++--- .../TestFirstKeyMatchingQualifiersFilter.java | 73 +++++++++ .../hadoop/hbase/mapreduce/TestRowCounter.java | 167 ++++++++++++++++++++ 5 files changed, 362 insertions(+), 24 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java create mode 100644 src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyMatchingQualifiersFilter.java create mode 100644 src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java diff --git a/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java b/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java index 36170bf..9ccf1f5 100644 --- a/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java +++ b/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java @@ -38,15 +38,24 @@ public class FirstKeyOnlyFilter extends FilterBase { } public void reset() { - foundKV = false; + setFoundKV(false); } public ReturnCode filterKeyValue(KeyValue v) { - if(foundKV) return ReturnCode.NEXT_ROW; - foundKV = true; + if (hasFoundKV()) + return ReturnCode.NEXT_ROW; + setFoundKV(true); return ReturnCode.INCLUDE; } + protected boolean hasFoundKV() { + return foundKV; + } + + protected void setFoundKV(boolean foundKV) { + this.foundKV = foundKV; + } + public void write(DataOutput out) throws IOException { } diff --git a/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java b/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java new file mode 100644 index 0000000..cf36c26 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java @@ -0,0 +1,73 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.filter; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; + +/** + * The filter looks for the given columns in KeyValue. Once there is a match for + * any one of the columns, it returns ReturnCode.NEXT_ROW for remaining + * KeyValues in the row. + *

+ */ +public class FirstKeyValueMatchingQualifiersFilter extends FirstKeyOnlyFilter { + + private List qualifiers; + + /** + * This constructor should not be used. + */ + public FirstKeyValueMatchingQualifiersFilter() { + qualifiers = Collections.emptyList(); + } + + /** + * Constructor which takes a list of columns. As soon as first KeyValue + * matching any of these columns if found, filter moves to next row. + * + * @param qualifiers the list of columns to me matched. + */ + public FirstKeyValueMatchingQualifiersFilter(List qualifiers) { + this.qualifiers = qualifiers; + } + + public ReturnCode filterKeyValue(KeyValue v) { + if (hasFoundKV()) { + return ReturnCode.NEXT_ROW; + } else if (hasOneMatchingQualifier(v)) { + setFoundKV(true); + } + return ReturnCode.INCLUDE; + } + + private boolean hasOneMatchingQualifier(KeyValue v) { + for (byte[] q : qualifiers) { + if (v.matchingQualifier(q)) { + return true; + } + } + return false; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index de58ad2..bb062d8 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -17,15 +17,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -46,25 +50,27 @@ public class RowCounter { * Mapper that runs the count. */ static class RowCounterMapper - extends TableMapper { + extends TableMapper { /** Counter enumeration to count the actual rows. */ - public static enum Counters {ROWS} + public static enum Counters { + ROWS + } /** * Maps the data. - * - * @param row The current table row key. - * @param values The columns. - * @param context The current context. + * + * @param row The current table row key. + * @param values The columns. + * @param context The current context. * @throws IOException When something is broken with the data. * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, - * org.apache.hadoop.mapreduce.Mapper.Context) + * org.apache.hadoop.mapreduce.Mapper.Context) */ @Override public void map(ImmutableBytesWritable row, Result values, - Context context) - throws IOException { + Context context) + throws IOException { // Count every row containing data, whether it's in qualifiers or values context.getCounter(Counters.ROWS).increment(1); } @@ -72,14 +78,14 @@ public class RowCounter { /** * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. + * + * @param conf The current configuration. + * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { + throws IOException { String tableName = args[0]; Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(RowCounter.class); @@ -92,30 +98,40 @@ public class RowCounter { } sb.append(args[i]); } + List qualifiers = new ArrayList(); Scan scan = new Scan(); - scan.setFilter(new FirstKeyOnlyFilter()); if (sb.length() > 0) { - for (String columnName :sb.toString().split(" ")) { - String [] fields = columnName.split(":"); - if(fields.length == 1) { + for (String columnName : sb.toString().split(" ")) { + String[] fields = columnName.split(":"); + if (fields.length == 1) { scan.addFamily(Bytes.toBytes(fields[0])); } else { - scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + byte[] qualifier = Bytes.toBytes(fields[1]); + qualifiers.add(qualifier); + scan.addColumn(Bytes.toBytes(fields[0]), qualifier); } } } + // specified column may or may not be part of first key value for the row. + // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use + // FirstKeyValueMatchingQualifiersFilter. + if (qualifiers.size() == 0) { + scan.setFilter(new FirstKeyOnlyFilter()); + } else { + scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers)); + } // Second argument is the table name. job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, - RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); + RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); job.setNumReduceTasks(0); return job; } /** * Main entry point. - * - * @param args The command line parameters. + * + * @param args The command line parameters. * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { diff --git a/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyMatchingQualifiersFilter.java b/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyMatchingQualifiersFilter.java new file mode 100644 index 0000000..eb5715c --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/filter/TestFirstKeyMatchingQualifiersFilter.java @@ -0,0 +1,73 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.filter; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestFirstKeyMatchingQualifiersFilter extends TestCase { + private static final byte[] ROW = Bytes.toBytes("test"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("test"); + private static final byte[] COLUMN_QUALIFIER_1 = Bytes.toBytes("foo"); + private static final byte[] COLUMN_QUALIFIER_2 = Bytes.toBytes("foo_2"); + private static final byte[] COLUMN_QUALIFIER_3 = Bytes.toBytes("foo_3"); + private static final byte[] VAL_1 = Bytes.toBytes("a"); + + /** + * Test the functionality of + * {@link FirstKeyValueMatchingQualifiersFilter#filterKeyValue(KeyValue)} + * + * @throws Exception + */ + public void testFirstKeyMatchingQualifierFilter() throws Exception { + List quals = new ArrayList(); + quals.add(COLUMN_QUALIFIER_1); + quals.add(COLUMN_QUALIFIER_2); + Filter filter = new FirstKeyValueMatchingQualifiersFilter(quals); + + // Match in first attempt + KeyValue kv; + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1); + assertTrue("includeAndSetFlag", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + assertTrue("flagIsSetSkipToNextRow", + filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW); + + // A mismatch in first attempt and match in second attempt. + filter.reset(); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_3, VAL_1); + System.out.println(filter.filterKeyValue(kv)); + assertTrue("includeFlagIsUnset", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_2, VAL_1); + assertTrue("includeAndSetFlag", + filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE); + kv = new KeyValue(ROW, COLUMN_FAMILY, COLUMN_QUALIFIER_1, VAL_1); + assertTrue("flagIsSetSkipToNextRow", + filter.filterKeyValue(kv) == Filter.ReturnCode.NEXT_ROW); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java new file mode 100644 index 0000000..6739ec7 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -0,0 +1,167 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the rowcounter map reduce job. + */ +public class TestRowCounter { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.startMiniMapReduceCluster(); + HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME), + Bytes.toBytes(COL_FAM)); + writeRows(table); + table.close(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + /** + * Test a case when no column was specified in command line arguments. + * + * @throws Exception + */ + @Test + public void testRowCounterNoColumn() throws Exception { + String[] args = new String[] { TABLE_NAME }; + runRowCount(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception + */ + @Test + public void testRowCounterExclusiveColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL1 }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception + */ + @Test + public void testRowCounterHiddenColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL2 }; + runRowCount(args, 10); + } + + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args + * the command line arguments to be used for rowcounter job. + * @param expectedCount + * the expected row count (result of map reduce job). + * @throws Exception + */ + private void runRowCount(String[] args, int expectedCount) throws Exception { + GenericOptionsParser opts = new GenericOptionsParser( + TEST_UTIL.getConfiguration(), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = RowCounter.createSubmittableJob(conf, args); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter( + RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + /** + * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have + * two columns, Few have one. + * + * @param table + * @throws IOException + */ + private static void writeRows(HTable table) throws IOException { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] value = Bytes.toBytes("abcd"); + final byte[] col1 = Bytes.toBytes(COL1); + final byte[] col2 = Bytes.toBytes(COL2); + ArrayList rowsUpdate = new ArrayList(); + // write few rows with two columns + int i = 0; + for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col1, value); + put.add(family, col2, value); + rowsUpdate.add(put); + } + + // write few rows with only one column + for (; i < TOTAL_ROWS; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col2, value); + rowsUpdate.add(put); + } + table.put(rowsUpdate); + } +} -- 1.7.7.5 (Apple Git-26)