From 0a16180ed5dc67ab2b8bbf4df3dbbf3841fadf23 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Mon, 15 Apr 2013 11:34:57 -0700 Subject: [PATCH] HBASE-8326 reuse jars produced by JarFinder Appease the elder god Surefire. TableMapReduceUtil#addDependencyJars now maintains a cache of class name to jar. The cache is populated from the entries of jar files created by JarFinder. That way, when multiple classes from the same jar are requested, that jar will not be reproduced. This reduces the overall IO and clock-on-the-wall time necessary for launching a MapReduce job that interacts with HBase. Also breaks TestTableInputFormatScan into two pieces so as to avoid tripping the 900 second test timeout. --- .../hadoop/hbase/mapreduce/TableMapReduceUtil.java | 84 +++-- .../hbase/mapreduce/TestTableInputFormatScan.java | 385 --------------------- .../hbase/mapreduce/TestTableInputFormatScan1.java | 99 ++++++ .../hbase/mapreduce/TestTableInputFormatScan2.java | 117 +++++++ .../mapreduce/TestTableInputFormatScanBase.java | 238 +++++++++++++ 5 files changed, 514 insertions(+), 409 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 7b10b1f..9715d8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -25,9 +25,13 @@ import java.net.URL; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -545,32 +549,33 @@ public class TableMapReduceUtil { * the DistributedCache. */ public static void addDependencyJars(Configuration conf, - Class... classes) throws IOException { + Class... classes) throws IOException { FileSystem localFs = FileSystem.getLocal(conf); - Set jars = new HashSet(); - // Add jars that are already in the tmpjars variable - jars.addAll( conf.getStringCollection("tmpjars") ); + jars.addAll(conf.getStringCollection("tmpjars")); + + // add jars as we find them to a map of contents jar name so that we can avoid + // creating new jars for classes that have already been packaged. + Map packagedClasses = new HashMap(); // Add jars containing the specified classes - for (Class clazz : classes) { + for (Class clazz : classes) { if (clazz == null) continue; - String pathStr = findOrCreateJar(clazz); - if (pathStr == null) { + Path path = findOrCreateJar(clazz, localFs, packagedClasses); + if (path == null) { LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster."); continue; } - Path path = new Path(pathStr); if (!localFs.exists(path)) { LOG.warn("Could not validate jar file " + path + " for class " + clazz); continue; } - jars.add(path.makeQualified(localFs).toString()); + jars.add(path.toString()); } if (jars.isEmpty()) return; @@ -584,17 +589,22 @@ public class TableMapReduceUtil { * a directory in the classpath, it creates a Jar on the fly with the * contents of the directory and returns the path to that Jar. If a Jar is * created, it is created in the system temporary directory. Otherwise, - * returns an existing jar that contains a class of the same name. + * returns an existing jar that contains a class of the same name. Maintains + * a mapping from jar contents to the tmp jar created. * @param my_class the class to find. + * @param fs the FileSystem with which to qualify the returned path. + * @param packagedClasses a map of class name to path. * @return a jar file that contains the class. * @throws IOException */ - private static String findOrCreateJar(Class my_class) + private static Path findOrCreateJar(Class my_class, FileSystem fs, + Map packagedClasses) throws IOException { // attempt to locate an existing jar for the class. - String jar = findContainingJar(my_class); + String jar = findContainingJar(my_class, packagedClasses); if (null == jar || jar.isEmpty()) { jar = getJar(my_class); + updateMap(jar, packagedClasses); } if (null == jar || jar.isEmpty()) { @@ -602,23 +612,45 @@ public class TableMapReduceUtil { } LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); - return jar; + return new Path(jar).makeQualified(fs); } /** - * Find a jar that contains a class of the same name, if any. - * It will return a jar file, even if that is not the first thing - * on the class path that has a class with the same name. - * - * This is shamelessly copied from JobConf - * + * Add entries to packagedClasses corresponding to class files + * contained in jar. + * @param jar The jar who's content to list. + * @param packagedClasses map[class -> jar] + */ + private static void updateMap(String jar, Map packagedClasses) throws IOException { + ZipFile zip = null; + try { + zip = new ZipFile(jar); + for (Enumeration iter = zip.entries(); iter.hasMoreElements();) { + ZipEntry entry = iter.nextElement(); + if (entry.getName().endsWith("class")) { + packagedClasses.put(entry.getName(), jar); + } + } + } finally { + if (null != zip) zip.close(); + } + } + + /** + * Find a jar that contains a class of the same name, if any. It will return + * a jar file, even if that is not the first thing on the class path that + * has a class with the same name. Looks first on the classpath and then in + * the packagedClasses map. * @param my_class the class to find. * @return a jar file that contains the class, or null. * @throws IOException */ - private static String findContainingJar(Class my_class) throws IOException { + private static String findContainingJar(Class my_class, Map packagedClasses) + throws IOException { ClassLoader loader = my_class.getClassLoader(); String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + + // first search the classpath for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { URL url = itr.nextElement(); if ("jar".equals(url.getProtocol())) { @@ -637,14 +669,18 @@ public class TableMapReduceUtil { return toReturn.replaceAll("!.*$", ""); } } + + // now look in any jars we've packaged using JarFinder + for (Map.Entry e : packagedClasses.entrySet()) { + if (e.getKey().equals(class_file)) return e.getValue(); + } return null; } /** - * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration - * contexts (HBASE-8140) and also for testing on MRv2. First check if we have - * HADOOP-9426. Lacking that, fall back to the backport. - * + * Invoke 'getJar' on a JarFinder implementation. Useful for some job + * configuration contexts (HBASE-8140) and also for testing on MRv2. First + * check if we have HADOOP-9426. Lacking that, fall back to the backport. * @param my_class the class to find. * @return a jar file that contains the class, or null. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java deleted file mode 100644 index 183de9a..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java +++ /dev/null @@ -1,385 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests various scan start and stop row scenarios. This is set in a scan and - * tested in a MapReduce job to see if that is handed over and done properly - * too. - */ -@Category(LargeTests.class) -public class TestTableInputFormatScan { - - static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final String KEY_STARTROW = "startRow"; - static final String KEY_LASTROW = "stpRow"; - - private static HTable table = null; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // switch TIF to log at DEBUG level - TEST_UTIL.enableDebug(TableInputFormat.class); - TEST_UTIL.enableDebug(TableInputFormatBase.class); - // start mini hbase cluster - TEST_UTIL.startMiniCluster(3); - // create and fill table - table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); - TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); - TEST_UTIL.loadTable(table, INPUT_FAMILY); - // start MR cluster - TEST_UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Pass the key and value to reduce. - */ - public static class ScanMapper - extends TableMapper { - - /** - * Pass the key and value to reduce. - * - * @param key The key, here "aaa", "aab" etc. - * @param value The value is the same as the key. - * @param context The task context. - * @throws IOException When reading the rows fails. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); - LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + - ", value -> " + val); - context.write(key, key); - } - - } - - /** - * Checks the last and first key seen against the scanner boundaries. - */ - public static class ScanReducer - extends Reducer { - - private String first = null; - private String last = null; - - protected void reduce(ImmutableBytesWritable key, - Iterable values, Context context) - throws IOException ,InterruptedException { - int count = 0; - for (ImmutableBytesWritable value : values) { - String val = Bytes.toStringBinary(value.get()); - LOG.info("reduce: key[" + count + "] -> " + - Bytes.toStringBinary(key.get()) + ", value -> " + val); - if (first == null) first = val; - last = val; - count++; - } - } - - protected void cleanup(Context context) - throws IOException, InterruptedException { - Configuration c = context.getConfiguration(); - String startRow = c.get(KEY_STARTROW); - String lastRow = c.get(KEY_LASTROW); - LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); - LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); - if (startRow != null && startRow.length() > 0) { - assertEquals(startRow, first); - } - if (lastRow != null && lastRow.length() > 0) { - assertEquals(lastRow, last); - } - } - - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, null, null); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToAPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "app", "apo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToBBA() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "bba", "baz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToBBB() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "bbb", "bba"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToOPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "opp", "opo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOBBToOPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("obb", "opp", "opo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOBBToQPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("obb", "qpp", "qpo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOPPToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("opp", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYYXToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yyx", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYYYToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yyy", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYZYToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yzy", null, "zzz"); - } - - @Test - public void testScanFromConfiguration() - throws IOException, InterruptedException, ClassNotFoundException { - testScanFromConfiguration("bba", "bbd", "bbc"); - } - - /** - * Tests an MR Scan initialized from properties set in the Configuration. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScanFromConfiguration(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); - c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); - c.set(KEY_STARTROW, start != null ? start : ""); - c.set(KEY_LASTROW, last != null ? last : ""); - - if (start != null) { - c.set(TableInputFormat.SCAN_ROW_START, start); - } - - if (stop != null) { - c.set(TableInputFormat.SCAN_ROW_STOP, stop); - } - - Job job = new Job(c, jobName); - job.setMapperClass(ScanMapper.class); - job.setReducerClass(ScanReducer.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); - job.setInputFormatClass(TableInputFormat.class); - job.setNumReduceTasks(1); - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - assertTrue(job.waitForCompletion(true)); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScan(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - if (start != null) { - scan.setStartRow(Bytes.toBytes(start)); - } - c.set(KEY_STARTROW, start != null ? start : ""); - if (stop != null) { - scan.setStopRow(Bytes.toBytes(stop)); - } - c.set(KEY_LASTROW, last != null ? last : ""); - LOG.info("scan before: " + scan); - Job job = new Job(c, jobName); - TableMapReduceUtil.initTableMapperJob( - Bytes.toString(TABLE_NAME), scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - job.setReducerClass(ScanReducer.class); - job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - LOG.info("Started " + job.getJobName()); - assertTrue(job.waitForCompletion(true)); - LOG.info("After map/reduce completion - job " + jobName); - } - -} - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java new file mode 100644 index 0000000..77ea47a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.LargeTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * TestTableInputFormatScan part 1. + * @see TestTableInputFormatScanBase + */ +@Category(LargeTests.class) +public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "opp", "opo"); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java new file mode 100644 index 0000000..f35bbd1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.LargeTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * TestTableInputFormatScan part 2. + * @see TestTableInputFormatScanBase + */ +@Category(LargeTests.class) +public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToQPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "qpp", "qpo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOPPToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("opp", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYXToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyx", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYZYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + @Test + public void testScanFromConfiguration() + throws IOException, InterruptedException, ClassNotFoundException { + testScanFromConfiguration("bba", "bbd", "bbc"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java new file mode 100644 index 0000000..15ea498 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -0,0 +1,238 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + *

+ * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + *

+ *

+ * This test is broken into two parts in order to side-step the test timeout + * period of 900, as documented in HBASE-8326. + *

+ */ +public abstract class TestTableInputFormatScanBase { + + static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static HTable table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(TableInputFormat.class); + TEST_UTIL.enableDebug(TableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); + TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); + TEST_UTIL.loadTable(table, INPUT_FAMILY); + // start MR cluster + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Pass the key and value to reduce. + */ + public static class ScanMapper + extends TableMapper { + + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + context.write(key, key); + } + + } + + /** + * Checks the last and first key seen against the scanner boundaries. + */ + public static class ScanReducer + extends Reducer { + + private String first = null; + private String last = null; + + protected void reduce(ImmutableBytesWritable key, + Iterable values, Context context) + throws IOException ,InterruptedException { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.info("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + } + + protected void cleanup(Context context) + throws IOException, InterruptedException { + Configuration c = context.getConfiguration(); + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + + } + + /** + * Tests an MR Scan initialized from properties set in the Configuration. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScanFromConfiguration(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); + c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + if (start != null) { + c.set(TableInputFormat.SCAN_ROW_START, start); + } + + if (stop != null) { + c.set(TableInputFormat.SCAN_ROW_STOP, stop); + } + + Job job = new Job(c, jobName); + job.setMapperClass(ScanMapper.class); + job.setReducerClass(ScanReducer.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setInputFormatClass(TableInputFormat.class); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + assertTrue(job.waitForCompletion(true)); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + + "To" + (stop != null ? stop.toUpperCase() : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob( + Bytes.toString(TABLE_NAME), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } + +} + -- 1.8.1