From 68926cc9fb069e7b5072d331e9e9e336bbf3aadc Mon Sep 17 00:00:00 2001 From: Shrijeet Paliwal Date: Fri, 27 Jul 2012 14:45:53 -0700 Subject: [PATCH] HBASE-6468 RowCounter may return incorrect result The RowCounter uses FirstKeyOnlyFilter regardless of whether or not the command line argument specified a column family (or family:qualifier). In case when no qualifier was specified as argument, the scan will give correct result. However in the other case the scan instance may have been set with columns other than the very first column in the row, causing scan to get nothing as the FirstKeyOnlyFilter removes everything else. As a fix we do not use FirstKeyOnlyFilter if column was specified in command line. --- .../apache/hadoop/hbase/mapreduce/RowCounter.java | 8 +- .../hadoop/hbase/mapreduce/TestRowCounter.java | 167 ++++++++++++++++++++ 2 files changed, 174 insertions(+), 1 deletions(-) create mode 100644 src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index de58ad2..17fd871 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -92,18 +92,24 @@ public class RowCounter { } sb.append(args[i]); } + boolean scanHasColumns = false; Scan scan = new Scan(); - scan.setFilter(new FirstKeyOnlyFilter()); if (sb.length() > 0) { for (String columnName :sb.toString().split(" ")) { String [] fields = columnName.split(":"); if(fields.length == 1) { scan.addFamily(Bytes.toBytes(fields[0])); } else { + scanHasColumns = true; scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); } } } + // specified column may or may not be part of first key value for the row, + // hence do not use FirstKeyOnlyFilter if scan has columns. + if (!scanHasColumns) { + scan.setFilter(new FirstKeyOnlyFilter()); + } // Second argument is the table name. job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java new file mode 100644 index 0000000..6739ec7 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -0,0 +1,167 @@ +/** + * Copyright 2012 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the rowcounter map reduce job. + */ +public class TestRowCounter { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + TEST_UTIL.startMiniMapReduceCluster(); + HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME), + Bytes.toBytes(COL_FAM)); + writeRows(table); + table.close(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.shutdownMiniMapReduceCluster(); + } + + /** + * Test a case when no column was specified in command line arguments. + * + * @throws Exception + */ + @Test + public void testRowCounterNoColumn() throws Exception { + String[] args = new String[] { TABLE_NAME }; + runRowCount(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception + */ + @Test + public void testRowCounterExclusiveColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL1 }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception + */ + @Test + public void testRowCounterHiddenColumn() throws Exception { + String[] args = new String[] { TABLE_NAME, COL_FAM + ":" + COL2 }; + runRowCount(args, 10); + } + + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args + * the command line arguments to be used for rowcounter job. + * @param expectedCount + * the expected row count (result of map reduce job). + * @throws Exception + */ + private void runRowCount(String[] args, int expectedCount) throws Exception { + GenericOptionsParser opts = new GenericOptionsParser( + TEST_UTIL.getConfiguration(), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + Job job = RowCounter.createSubmittableJob(conf, args); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter( + RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + /** + * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have + * two columns, Few have one. + * + * @param table + * @throws IOException + */ + private static void writeRows(HTable table) throws IOException { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] value = Bytes.toBytes("abcd"); + final byte[] col1 = Bytes.toBytes(COL1); + final byte[] col2 = Bytes.toBytes(COL2); + ArrayList rowsUpdate = new ArrayList(); + // write few rows with two columns + int i = 0; + for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col1, value); + put.add(family, col2, value); + rowsUpdate.add(put); + } + + // write few rows with only one column + for (; i < TOTAL_ROWS; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.add(family, col2, value); + rowsUpdate.add(put); + } + table.put(rowsUpdate); + } +} -- 1.7.7.5 (Apple Git-26)