commit ae4f6385e1094cb056daf9f8facbac04a4d579f3 Author: Todd Lipcon Date: Wed Sep 15 01:37:45 2010 -0400 TableMapReduceUtil should always add dependency jars Make MR jobs ship dependency jars by default diff --git src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index 41748fe..1a44d27 100644 --- src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -29,6 +29,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; /** * Utility for {@link TableMap} and {@link TableReduce} @@ -59,6 +63,7 @@ public class TableMapReduceUtil { job.setMapperClass(mapper); FileInputFormat.addInputPaths(job, table); job.set(TableInputFormat.COLUMN_LIST, columns); + addDependencyJars(job); } /** @@ -105,6 +110,7 @@ public class TableMapReduceUtil { } else if (partitioner != null) { job.setPartitionerClass(partitioner); } + addDependencyJars(job); } /** @@ -181,4 +187,22 @@ public class TableMapReduceUtil { public static void setScannerCaching(JobConf job, int batchSize) { job.setInt("hbase.client.scanner.caching", batchSize); } + + /** + * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job) + */ + public static void addDependencyJars(JobConf job) throws IOException { + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars( + job, + org.apache.zookeeper.ZooKeeper.class, + com.google.common.base.Function.class, + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getPartitionerClass(), + job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), + job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), + job.getCombinerClass()); + } } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index ff0c542..22533e1 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -47,9 +47,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.zookeeper.ZooKeeper; - -import com.google.common.base.Function; /** * Utility for {@link TableMapper} and {@link TableReducer} @@ -81,6 +78,7 @@ public class TableMapReduceUtil { job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table); job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan)); + addDependencyJars(job); } /** @@ -192,6 +190,7 @@ public class TableMapReduceUtil { } else if (partitioner != null) { job.setPartitionerClass(partitioner); } + addDependencyJars(job); } /** @@ -246,10 +245,11 @@ public class TableMapReduceUtil { public static void addDependencyJars(Job job) throws IOException { try { addDependencyJars(job.getConfiguration(), - ZooKeeper.class, - Function.class, // Guava collections + org.apache.zookeeper.ZooKeeper.class, + com.google.common.base.Function.class, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), + job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(), @@ -267,38 +267,38 @@ public class TableMapReduceUtil { */ public static void addDependencyJars(Configuration conf, Class... classes) throws IOException { - + FileSystem localFs = FileSystem.getLocal(conf); Set jars = new HashSet(); + + // Add jars that are already in the tmpjars variable + jars.addAll( conf.getStringCollection("tmpjars") ); + + // Add jars containing the specified classes for (Class clazz : classes) { if (clazz == null) continue; - + String pathStr = findContainingJar(clazz); if (pathStr == null) { LOG.warn("Could not find jar for class " + clazz + - " in order to ship it to the cluster."); + " in order to ship it to the cluster."); continue; } Path path = new Path(pathStr); if (!localFs.exists(path)) { LOG.warn("Could not validate jar file " + path + " for class " - + clazz); + + clazz); continue; } jars.add(path.makeQualified(localFs).toString()); } if (jars.isEmpty()) return; - - String tmpJars = conf.get("tmpjars"); - if (tmpJars == null) { - tmpJars = StringUtils.arrayToString(jars.toArray(new String[0])); - } else { - tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0])); - } - conf.set("tmpjars", tmpJars); + + conf.set("tmpjars", + StringUtils.arrayToString(jars.toArray(new String[0]))); } - + /** * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing