From 4bdefc759a989bb504e005198e63666ce6a1768b Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 12 Nov 2013 17:56:50 -0800 Subject: [PATCH] HBASE-9165 [mapreduce] Modularize building dependency jars - Separate adding HBase and dependencies from adding other job dependencies, and expose it as a separate method that other projects can use (for PIG-3285). - Explicitly add hbase-server to the list of dependencies we ship with the job, for users who extend the classes we provide (see HBASE-9112). - Add integration test for addDependencyJars. - Code reuse for TestTableMapReduce. --- .../IntegrationTestTableMapReduceUtil.java | 113 ++++++++++ .../hadoop/hbase/mapred/TableMapReduceUtil.java | 14 +- .../hbase/mapreduce/IdentityTableMapper.java | 2 +- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 44 ++-- .../hadoop/hbase/mapred/TestTableMapReduce.java | 187 ++--------------- .../hadoop/hbase/mapreduce/TestTableMapReduce.java | 170 +-------------- .../hbase/mapreduce/TestTableMapReduceBase.java | 227 +++++++++++++++++++++ .../hbase/mapreduce/TestTableMapReduceUtil.java | 42 ++-- 8 files changed, 424 insertions(+), 375 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java new file mode 100644 index 0000000..88e004a --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableMapReduceUtil.java @@ -0,0 +1,113 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test that we add tmpjars correctly including the named dependencies. Runs + * as an integration test so that classpath is realistic. + */ +@Category(IntegrationTests.class) +public class IntegrationTestTableMapReduceUtil implements Configurable, Tool { + + private static IntegrationTestingUtility util; + + @BeforeClass + public static void provisionCluster() throws Exception { + if (null == util) { + util = new IntegrationTestingUtility(); + } + } + + @Before + public void skipMiniCluster() { + // test probably also works with a local cluster, but + // IntegrationTestingUtility doesn't support this concept. + assumeTrue("test requires a distributed cluster.", util.isDistributedCluster()); + } + + /** + * Look for jars we expect to be on the classpath by name. + */ + @Test + public void testAddDependencyJars() throws Exception { + Job job = new Job(); + TableMapReduceUtil.addDependencyJars(job); + String tmpjars = job.getConfiguration().get("tmpjars"); + + // verify presence of modules + assertTrue(tmpjars.contains("hbase-common")); + assertTrue(tmpjars.contains("hbase-protocol")); + assertTrue(tmpjars.contains("hbase-client")); + assertTrue(tmpjars.contains("hbase-hadoop-compat")); + assertTrue(tmpjars.contains("hbase-server")); + + // verify presence of 3rd party dependencies. + assertTrue(tmpjars.contains("zookeeper")); + assertTrue(tmpjars.contains("netty")); + assertTrue(tmpjars.contains("protobuf")); + assertTrue(tmpjars.contains("guava")); + assertTrue(tmpjars.contains("htrace")); + } + + @Override + public int run(String[] args) throws Exception { + provisionCluster(); + skipMiniCluster(); + testAddDependencyJars(); + return 0; + } + + public void setConf(Configuration conf) { + if (util != null) { + throw new IllegalArgumentException( + "setConf not supported after the test has been initialized."); + } + util = new IntegrationTestingUtility(conf); + } + + @Override + public Configuration getConf() { + return util.getConfiguration(); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestTableMapReduceUtil(), args); + System.exit(status); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index d1c49f0..bdcef56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -37,12 +37,11 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.security.token.Token; import org.apache.zookeeper.KeeperException; @@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException; @Deprecated @InterfaceAudience.Public @InterfaceStability.Stable -@SuppressWarnings("unchecked") +@SuppressWarnings({ "rawtypes", "unchecked" }) public class TableMapReduceUtil { /** @@ -297,15 +296,14 @@ public class TableMapReduceUtil { } /** - * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job) + * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) */ public static void addDependencyJars(JobConf job) throws IOException { + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars( job, - org.apache.zookeeper.ZooKeeper.class, - org.jboss.netty.channel.ChannelFactory.class, - com.google.common.base.Function.class, - com.google.protobuf.Message.class, + // when making changes here, consider also mapreduce.TableMapReduceUtils + // pull job classes job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java index 437f053..344e605 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java @@ -45,7 +45,7 @@ extends TableMapper { * @param job The job configuration. * @throws IOException When setting up the job fails. */ - @SuppressWarnings("unchecked") + @SuppressWarnings("rawtypes") public static void initJob(String table, Scan scan, Class mapper, Job job) throws IOException { TableMapReduceUtil.initTableMapperJob(table, scan, mapper, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 0d2b088..4995ebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtocolBufferException; /** * Utility for {@link TableMapper} and {@link TableReducer} */ -@SuppressWarnings("unchecked") +@SuppressWarnings({ "rawtypes", "unchecked" }) @InterfaceAudience.Public @InterfaceStability.Stable public class TableMapReduceUtil { @@ -558,24 +558,44 @@ public class TableMapReduceUtil { } /** + * Add HBase and its dependencies (only) to the job configuration. + *

+ * This is intended as a low-level API, facilitating code reuse between this + * class and its mapred counterpart. It also of use to extenral tools that + * need to build a MapReduce job that interacts with HBase but want + * fine-grained control over the jars shipped to the cluster. + *

+ * @param conf The Configuration object to extend with dependencies. + * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil + * @see PIG-3285 + */ + public static void addHBaseDependencyJars(Configuration conf) throws IOException { + addDependencyJars(conf, + // explicitly pull a class from each module + org.apache.hadoop.hbase.HConstants.class, // hbase-common + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol + org.apache.hadoop.hbase.client.Put.class, // hbase-client + org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat + org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server + // pull necessary dependencies + org.apache.zookeeper.ZooKeeper.class, + org.jboss.netty.channel.ChannelFactory.class, + com.google.protobuf.Message.class, + com.google.common.collect.Lists.class, + org.cloudera.htrace.Trace.class); + } + + + /** * Add the HBase dependency jars as well as jars for any of the configured * job classes to the job configuration, so that JobClient will ship them * to the cluster and add them to the DistributedCache. */ public static void addDependencyJars(Job job) throws IOException { + addHBaseDependencyJars(job.getConfiguration()); try { addDependencyJars(job.getConfiguration(), - // explicitly pull a class from each module - org.apache.hadoop.hbase.HConstants.class, // hbase-common - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol - org.apache.hadoop.hbase.client.Put.class, // hbase-client - org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat - // pull necessary dependencies - org.apache.zookeeper.ZooKeeper.class, - org.jboss.netty.channel.ChannelFactory.class, - com.google.protobuf.Message.class, - com.google.common.collect.Lists.class, - org.cloudera.htrace.Trace.class, + // when making changes here, consider also mapred.TableMapReduceUtils // pull job classes job.getMapOutputKeyClass(), job.getMapOutputValueClass(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java index 808e13d..8332c17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -18,23 +18,20 @@ */ package org.apache.hadoop.hbase.mapred; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -42,101 +39,40 @@ import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; - /** * Test Map/Reduce job over HBase tables. The map/reduce process we're testing * on our tables is simple - take every row in the table, reverse the value of * a particular cell, and write it back to the table. */ @Category(LargeTests.class) -public class TestTableMapReduce { +@SuppressWarnings("deprecation") +public class TestTableMapReduce extends TestTableMapReduceBase { private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class.getName()); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - - private static final byte [][] columns = new byte [][] { - INPUT_FAMILY, - OUTPUT_FAMILY - }; - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); - UTIL.createMultiRegions(table, INPUT_FAMILY); - UTIL.loadTable(table, INPUT_FAMILY); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } + protected Log getLog() { return LOG; } /** * Pass the given key and processed record reduce */ - public static class ProcessContentsMapper - extends MapReduceBase - implements TableMap { + public static class ProcessContentsMapper extends MapReduceBase implements + TableMap { + /** * Pass the key, and reversed value to reduce - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException */ public void map(ImmutableBytesWritable key, Result value, OutputCollector output, Reporter reporter) throws IOException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - - // Get the original value and reverse it - - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - - // Now set the value to be collected - - Put outval = new Put(key.get()); - outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - output.collect(key, outval); + output.collect(key, TestTableMapReduceBase.map(key, value)); } } - /** - * Test a map/reduce against a multi-region table - * @throws IOException - */ - @Test - public void testMultiRegionTable() throws IOException { - runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME)); - } - - private void runTestOnTable(HTable table) throws IOException { + @Override + protected void runTestOnTable(HTable table) throws IOException { JobConf jobConf = null; try { LOG.info("Before map/reduce startup"); @@ -162,102 +98,5 @@ public class TestTableMapReduce { } } } - - private void verify(String tableName) throws IOException { - HTable table = new HTable(UTIL.getConfiguration(), tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - LOG.info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) throws IOException, NullPointerException { - Scan scan = new Scan(); - TableInputFormat.addColumns(scan, columns); - ResultScanner scanner = table.getScanner(scan); - try { - Iterator itr = scanner.iterator(); - assertTrue(itr.hasNext()); - while(itr.hasNext()) { - Result r = itr.next(); - if (LOG.isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(Cell kv : r.listCells()) { - if (count == 0) { - firstValue = CellUtil.cloneValue(kv); - } - if (count == 1) { - secondValue = CellUtil.cloneValue(kv);; - } - count++; - if (count == 2) { - break; - } - } - - - String first = ""; - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - first = Bytes.toString(firstValue); - - String second = ""; - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = Bytes.toString(secondReversed); - - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - r.getRow() + ", first value=" + first + ", second value=" + - second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index 61dd2fb..534d673 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -58,28 +58,10 @@ import org.junit.experimental.categories.Category; * a particular cell, and write it back to the table. */ @Category(LargeTests.class) -public class TestTableMapReduce { +public class TestTableMapReduce extends TestTableMapReduceBase { private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); - UTIL.createMultiRegions(table, INPUT_FAMILY); - UTIL.loadTable(table, INPUT_FAMILY); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } + protected Log getLog() { return LOG; } /** * Pass the given key and processed record reduce @@ -119,30 +101,7 @@ public class TestTableMapReduce { } } - /** - * Test a map/reduce against a multi-region table - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testMultiRegionTable() - throws IOException, InterruptedException, ClassNotFoundException { - runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), - MULTI_REGION_TABLE_NAME)); - } - - @Test - public void testCombiner() - throws IOException, InterruptedException, ClassNotFoundException { - Configuration conf = new Configuration(UTIL.getConfiguration()); - // force use of combiner for testing purposes - conf.setInt("min.num.spills.for.combine", 1); - runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); - } - - private void runTestOnTable(HTable table) - throws IOException, InterruptedException, ClassNotFoundException { + protected void runTestOnTable(HTable table) throws IOException { Job job = null; try { LOG.info("Before map/reduce startup"); @@ -164,6 +123,10 @@ public class TestTableMapReduce { // verify map-reduce results verify(Bytes.toString(table.getTableName())); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ClassNotFoundException e) { + throw new IOException(e); } finally { table.close(); if (job != null) { @@ -172,123 +135,4 @@ public class TestTableMapReduce { } } } - - private void verify(String tableName) throws IOException { - HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - LOG.info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - table.close(); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) throws IOException, NullPointerException { - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - scan.addFamily(OUTPUT_FAMILY); - ResultScanner scanner = table.getScanner(scan); - try { - Iterator itr = scanner.iterator(); - assertTrue(itr.hasNext()); - while(itr.hasNext()) { - Result r = itr.next(); - if (LOG.isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(Cell kv : r.listCells()) { - if (count == 0) { - firstValue = CellUtil.cloneValue(kv); - } - if (count == 1) { - secondValue = CellUtil.cloneValue(kv); - } - count++; - if (count == 2) { - break; - } - } - - String first = ""; - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - first = Bytes.toString(firstValue); - - String second = ""; - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = Bytes.toString(secondReversed); - - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - Bytes.toStringBinary(r.getRow()) + ", first value=" + first + - ", second value=" + second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } - - /** - * Test that we add tmpjars correctly including the ZK jar. - */ - public void testAddDependencyJars() throws Exception { - Job job = new Job(); - TableMapReduceUtil.addDependencyJars(job); - String tmpjars = job.getConfiguration().get("tmpjars"); - - System.err.println("tmpjars: " + tmpjars); - assertTrue(tmpjars.contains("zookeeper")); - assertFalse(tmpjars.contains("guava")); - - System.err.println("appending guava jar"); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Function.class); - tmpjars = job.getConfiguration().get("tmpjars"); - assertTrue(tmpjars.contains("guava")); - } - } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java new file mode 100644 index 0000000..cc0812d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java @@ -0,0 +1,227 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of a particular cell, + * and write it back to the table. Implements common components between mapred and mapreduce + * implementations. + */ +public abstract class TestTableMapReduceBase { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); + protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + + protected static final byte[][] columns = new byte[][] { + INPUT_FAMILY, + OUTPUT_FAMILY + }; + + /** + * Retrieve my logger instance. + */ + protected abstract Log getLog(); + + /** + * Handles API-specifics for setting up and executing the job. + */ + protected abstract void runTestOnTable(HTable table) throws IOException; + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + HTable table = + UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); + UTIL.createMultiRegions(table, INPUT_FAMILY); + UTIL.loadTable(table, INPUT_FAMILY); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + /** + * Test a map/reduce against a multi-region table + * @throws IOException + */ + @Test + public void testMultiRegionTable() throws IOException { + runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME)); + } + + @Test + public void testCombiner() throws IOException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // force use of combiner for testing purposes + conf.setInt("min.num.spills.for.combine", 1); + runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); + } + + /** + * Implements mapper logic for use across APIs. + */ + protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + + // Get the original value and reverse it + + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + + // Now set the value to be collected + + Put outval = new Put(key.get()); + outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + return outval; + } + + protected void verify(String tableName) throws IOException { + HTable table = new HTable(UTIL.getConfiguration(), tableName); + boolean verified = false; + long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); + int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + getLog().info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + getLog().debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final HTable table) throws IOException, NullPointerException { + Scan scan = new Scan(); + TableInputFormat.addColumns(scan, columns); + ResultScanner scanner = table.getScanner(scan); + try { + Iterator itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); + if (getLog().isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(Cell kv : r.listCells()) { + if (count == 0) { + firstValue = CellUtil.cloneValue(kv); + } + if (count == 1) { + secondValue = CellUtil.cloneValue(kv); + } + count++; + if (count == 2) { + break; + } + } + + + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + String first = Bytes.toString(firstValue); + + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + String second = Bytes.toString(secondReversed); + + if (first.compareTo(second) != 0) { + if (getLog().isDebugEnabled()) { + getLog().debug("second key is not the reverse of first. row=" + + Bytes.toStringBinary(r.getRow()) + ", first value=" + first + + ", second value=" + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index 6bafacd..c6a59ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -15,8 +15,11 @@ package org.apache.hadoop.hbase.mapreduce; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; @@ -24,19 +27,15 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; /** - * Test class TableMapReduceUtil + * Test different variants of initTableMapperJob method */ - -@Category(LargeTests.class) +@Category(SmallTests.class) public class TestTableMapReduceUtil { - /** - * Test different variants ofinitTableMapperJob method - */ - @Test (timeout=600000) - public void testInitTableMapperJob() throws Exception { + + @Test + public void testInitTableMapperJob1() throws Exception { Configuration configuration = new Configuration(); Job job = new Job(configuration, "tableName"); // test @@ -48,9 +47,12 @@ public class TestTableMapReduceUtil { assertEquals(Text.class, job.getOutputValueClass()); assertNull(job.getCombinerClass()); assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } - configuration = new Configuration(); - job = new Job(configuration, "tableName"); + @Test + public void testInitTableMapperJob2() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class); assertEquals(HLogInputFormat.class, job.getInputFormatClass()); @@ -59,9 +61,12 @@ public class TestTableMapReduceUtil { assertEquals(Text.class, job.getOutputValueClass()); assertNull(job.getCombinerClass()); assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } - configuration = new Configuration(); - job = new Job(configuration, "tableName"); + @Test + public void testInitTableMapperJob3() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class, Text.class, job); assertEquals(TableInputFormat.class, job.getInputFormatClass()); @@ -70,9 +75,12 @@ public class TestTableMapReduceUtil { assertEquals(Text.class, job.getOutputValueClass()); assertNull(job.getCombinerClass()); assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } - configuration = new Configuration(); - job = new Job(configuration, "tableName"); + @Test + public void testInitTableMapperJob4() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class, Text.class, job, false); assertEquals(TableInputFormat.class, job.getInputFormatClass()); @@ -82,4 +90,4 @@ public class TestTableMapReduceUtil { assertNull(job.getCombinerClass()); assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); } -} \ No newline at end of file +} -- 1.8.4.2