commit 1a325cc54e9ed0efafe8216434e9eeb776ed3b46 Author: Todd Lipcon Date: Thu May 20 23:14:01 2010 -0700 HBASE-2588. Provide a utility function to ship HBase dependency jars to the cluster in classpath Fix for tmpjars thing Unit test for HBASE-2588 diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index b332280..07d7911 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -24,7 +24,18 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; @@ -34,14 +45,17 @@ import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.ZooKeeper; /** * Utility for {@link TableMapper} and {@link TableReducer} */ @SuppressWarnings("unchecked") public class TableMapReduceUtil { - + static Log LOG = LogFactory.getLog(TableMapReduceUtil.class); + /** * Use this before submitting a TableMap job. It will appropriately set up * the job. @@ -222,4 +236,105 @@ public class TableMapReduceUtil { job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); } + /** + * Add the HBase dependency jars as well as jars for any of the configured + * job classes to the job configuration, so that JobClient will ship them + * to the cluster and add them to the DistributedCache. + */ + public static void addDependencyJars(Job job) throws IOException { + try { + addDependencyJars(job.getConfiguration(), + ZooKeeper.class, + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getOutputFormatClass(), + job.getPartitionerClass(), + job.getCombinerClass()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Add the jars containing the given classes to the job's configuration + * such that JobClient will ship them to the cluster and add them to + * the DistributedCache. + */ + public static void addDependencyJars(Configuration conf, + Class... classes) throws IOException { + + FileSystem localFs = FileSystem.getLocal(conf); + + Set jars = new HashSet(); + for (Class clazz : classes) { + if (clazz == null) continue; + + String pathStr = findContainingJar(clazz); + if (pathStr == null) { + LOG.warn("Could not find jar for class " + clazz + + " in order to ship it to the cluster."); + continue; + } + Path path = new Path(pathStr); + if (!localFs.exists(path)) { + LOG.warn("Could not validate jar file " + path + " for class " + + clazz); + continue; + } + jars.add(path.makeQualified(localFs).toString()); + } + if (jars.isEmpty()) return; + + String tmpJars = conf.get("tmpjars"); + if (tmpJars == null) { + tmpJars = StringUtils.arrayToString(jars.toArray(new String[0])); + } else { + tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0])); + } + conf.set("tmpjars", tmpJars); + } + + /** + * Find a jar that contains a class of the same name, if any. + * It will return a jar file, even if that is not the first thing + * on the class path that has a class with the same name. + * + * This is shamelessly copied from JobConf + * + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + * @throws IOException + */ + private static String findContainingJar(Class my_class) { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + try { + for(Enumeration itr = loader.getResources(class_file); + itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + } \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index d76c75e..875ebf7 100644 --- src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Hash; @@ -567,6 +568,7 @@ public class PerformanceEvaluation implements HConstants { job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs")); + TableMapReduceUtil.addDependencyJars(job); job.waitForCompletion(true); } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index d8b11db..3093718 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -242,4 +242,17 @@ public class TestTableMapReduce extends MultiRegionTable { scanner.close(); } } + + /** + * Test that we add tmpjars correctly including the ZK jar. + */ + public void testAddDependencyJars() throws Exception { + Job job = new Job(); + TableMapReduceUtil.addDependencyJars(job); + String tmpjars = job.getConfiguration().get("tmpjars"); + + System.err.println("tmpjars: " + tmpjars); + assertTrue(tmpjars.contains("zookeeper")); + assertTrue(tmpjars.contains("guava")); + } }