diff --git pom.xml pom.xml
index 53bbd64..860f17e 100644
--- pom.xml
+++ pom.xml
@@ -402,6 +402,7 @@
1.1.1
6.1.14
4.8.1
+ 1.8.4
1.2.15
3.3.1
@@ -498,6 +499,12 @@
${junit.version}
test
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
@@ -661,6 +668,10 @@
junit
+ org.mockito
+ mockito-all
+
+
org.apache.commons
commons-math
${commons-math.version}
diff --git src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
index 0a9ec4b..613aa21 100644
--- src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
+++ src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.io.BytesWritable;
@@ -253,4 +254,11 @@ implements WritableComparable {
}
return results;
}
+
+ /**
+ * Returns a copy of the bytes referred to by this writable
+ */
+ public byte[] copyBytes() {
+ return Arrays.copyOfRange(bytes, offset, length);
+ }
}
diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
index 4cbe52a..91e44dd 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
@@ -258,5 +258,11 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* @throws IOException e
*/
public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-
+
+
+ /**
+ * Bulk load an HFile into an open region
+ */
+ public void bulkLoadHFile(String hfilePath,
+ byte[] regionName, byte[] familyName) throws IOException;
}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index 3d40695..c6ec4ae 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -36,6 +36,7 @@ public class Driver {
"Count rows in HBase table");
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+ pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster");
pgd.driver(args);
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
index 9c8e53e..028156b 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
@@ -20,24 +20,40 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.Sampler;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.mortbay.log.Log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Writes HFiles. Passed KeyValues must arrive in order.
@@ -48,6 +64,8 @@ import org.mortbay.log.Log;
* @see KeyValueSortReducer
*/
public class HFileOutputFormat extends FileOutputFormat {
+ static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+
public RecordWriter getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the path of the temporary output file
@@ -86,7 +104,7 @@ public class HFileOutputFormat extends FileOutputFormat startKeys = getRegionStartKeys(table);
+ LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+ "to match current region count");
+ job.setNumReduceTasks(startKeys.size());
+
+ Path partitionsPath = new Path(job.getWorkingDirectory(),
+ "partitions_" + System.currentTimeMillis());
+ LOG.info("Writing partition information to " + partitionsPath);
+
+ FileSystem fs = partitionsPath.getFileSystem(conf);
+ writePartitions(conf, partitionsPath, startKeys);
+ partitionsPath.makeQualified(fs);
+ URI cacheUri;
+ try {
+ cacheUri = new URI(partitionsPath.toString() + "#" +
+ TotalOrderPartitioner.DEFAULT_PATH);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ DistributedCache.addCacheFile(cacheUri, conf);
+ DistributedCache.createSymlink(conf);
+
+ LOG.info("Incremental table output configured.");
+ }
+
}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
new file mode 100644
index 0000000..aad6070
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -0,0 +1,192 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class ImportTsv {
+ final static String NAME = "importtsv";
+
+ /**
+ * Write table content out to files in hdfs.
+ */
+ static class TsvImporter
+ extends Mapper
+ {
+ private byte[][] families;
+ private byte[][] qualifiers;
+ private long ts;
+
+ @Override
+ protected void setup(Context context) {
+ String columnStrings[] = context.getConfiguration().getStrings("columns");
+ families = new byte[columnStrings.length][];
+ qualifiers = new byte[columnStrings.length][];
+
+ for (int i = 0; i < columnStrings.length; i++) {
+ String str = columnStrings[i];
+ String[] parts = str.split(":", 2);
+ if (parts.length == 1) {
+ families[i] = columnStrings[i].getBytes();
+ qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+ } else {
+ families[i] = parts[0].getBytes();
+ qualifiers[i] = parts[1].getBytes();
+ }
+ }
+
+ ts = System.currentTimeMillis();
+ }
+ /**
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+ * org.apache.hadoop.mapreduce.Mapper.Context)
+ */
+ @Override
+ public void map(LongWritable offset, Text value,
+ Context context)
+ throws IOException {
+ try {
+ byte[] lineBytes = value.getBytes();
+ LinkedList tabOffsets = new LinkedList();
+ for (int i = 0; i < value.getLength(); i++) {
+ if (lineBytes[i] == '\t') {
+ tabOffsets.add(i);
+ }
+ }
+ tabOffsets.add(lineBytes.length);
+
+ int rowkeySep = tabOffsets.remove();
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(lineBytes, 0, rowkeySep);
+
+ int lastOff = rowkeySep;
+ int colIdx = 0;
+ Put put = new Put(rowKey.copyBytes());
+ for (int tabOff : tabOffsets) {
+ KeyValue kv = new KeyValue(
+ lineBytes, 0, rowkeySep, // row key
+ families[colIdx], 0, families[colIdx].length,
+ qualifiers[colIdx], 0, qualifiers[colIdx].length,
+ ts,
+ KeyValue.Type.Put,
+ lineBytes, lastOff + 1, tabOff - lastOff - 1);
+ put.add(kv);
+ lastOff = tabOff;
+ colIdx++;
+ }
+ context.write(rowKey, put);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ String tableName = args[0];
+ Path inputDir = new Path(args[1]);
+ Job job = new Job(conf, NAME + "_" + tableName);
+ job.setJarByClass(TsvImporter.class);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(TsvImporter.class);
+
+ if (conf.getBoolean("use.hfile", false)) {
+ HTable table = new HTable(conf, tableName);
+ job.setReducerClass(PutSortReducer.class);
+ Path outputDir = new Path("hfof." + tableName);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+ HFileOutputFormat.configureIncrementalLoad(job, table);
+ } else {
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // because it sets up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+ job.setNumReduceTasks(0);
+ }
+
+ TableMapReduceUtil.addDependencyJars(job);
+ return job;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println(
+ "Usage: ImportTsv -Dcolumns=a,b,c ");
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ if (otherArgs.length < 2) {
+ usage("Wrong number of arguments: " + otherArgs.length);
+ System.exit(-1);
+ }
+ String columns[] = conf.getStrings("columns");
+ if (columns == null) {
+ usage("No columns specified. Please specify with -Dcolumns=...");
+ }
+ Job job = createSubmittableJob(conf, otherArgs);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
new file mode 100644
index 0000000..68aaf71
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+ static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+ private void usage() {
+ System.err.println("usage: LoadIncrementalFiles " +
+ "/path/to/hfileoutputformat-output " +
+ "tablename");
+ }
+
+
+ private int doBulkLoad(Path hfofDir, HTable table)
+ throws TableNotFoundException, IOException
+ {
+ FileSystem fs = hfofDir.getFileSystem(getConf());
+ HConnection conn = table.getConnection();
+
+ if (!conn.isTableAvailable(table.getTableName())) {
+ throw new TableNotFoundException("Table " +
+ Bytes.toStringBinary(table.getTableName()) +
+ "is not currently available.");
+ }
+ if (!fs.exists(hfofDir)) {
+ throw new FileNotFoundException("HFileOutputFormat dir " +
+ hfofDir + " not found");
+ }
+
+ Path[] familyDirs = FileUtil.stat2Paths(fs.listStatus(hfofDir));
+ if (familyDirs == null) {
+ throw new FileNotFoundException("No families found in " + hfofDir);
+ }
+
+ // TODO maybe we want to do multifamily load atomically
+ for (Path familyDir : familyDirs) {
+ // Skip _logs, etc
+ if (familyDir.getName().startsWith("_")) continue;
+ doBulkLoadFamily(familyDir, conn, table.getTableName());
+ }
+
+ return 0;
+ }
+
+ private void doBulkLoadFamily(Path familyDir, HConnection conn, byte[] table)
+ throws IOException {
+ FileSystem fs = familyDir.getFileSystem(getConf());
+ Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+ if (hfiles == null) {
+ LOG.info("No hfiles in " + familyDir);
+ }
+ byte[] family = familyDir.getName().getBytes();
+ for (Path hfile : hfiles) {
+ doLoadHFile(hfile, conn, table, family);
+ }
+ }
+
+
+ private void doLoadHFile(
+ final Path hfilePath, HConnection conn, byte[] table, final byte[] family)
+ throws IOException {
+ FileSystem fs = hfilePath.getFileSystem(getConf());
+ HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
+ hfr.loadFileInfo();
+ final byte[] first = hfr.getFirstKey();
+ final byte[] last = hfr.getLastKey();
+
+ LOG.info("hfile=" + hfilePath +
+ " first=" + Bytes.toStringBinary(first) +
+ " last=" + Bytes.toStringBinary(last));
+ if (first == null || last == null) {
+ assert first == null && last == null;
+ LOG.info("hfile " + hfilePath + " has no entries, skipping");
+ return;
+ }
+
+ conn.getRegionServerWithRetries(
+ new ServerCallable(conn, table, first) {
+
+ @Override
+ public Void call() throws Exception {
+ LOG.info("Going to connect to server " + server +
+ "for row " + row);
+ // TODO check range against region!
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ server.bulkLoadHFile(hfilePath.toString(), regionName, family);
+ return null;
+ }
+ });
+ }
+
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ usage();
+ return -1;
+ }
+
+ Path hfofDir = new Path(args[0]);
+ HTable table = new HTable(args[1]);
+
+ return doBulkLoad(hfofDir, table);
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new LoadIncrementalHFiles(), args);
+ }
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
new file mode 100644
index 0000000..e32dc82
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class PutSortReducer extends
+ Reducer {
+
+ @Override
+ protected void reduce(
+ ImmutableBytesWritable row,
+ java.lang.Iterable puts,
+ Reducer.Context context)
+ throws java.io.IOException, InterruptedException
+ {
+ TreeSet map = new TreeSet(KeyValue.COMPARATOR);
+
+ for (Put p : puts) {
+ for (List kvs : p.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ map.add(kv.clone());
+ }
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(row, kv);
+ if (index > 0 && index % 100 == 0)
+ context.setStatus("Wrote " + index);
+ }
+ }
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index b332280..07d7911 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -24,7 +24,18 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
@@ -34,14 +45,17 @@ import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
*/
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
-
+ static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
+
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
@@ -222,4 +236,105 @@ public class TableMapReduceUtil {
job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
}
+ /**
+ * Add the HBase dependency jars as well as jars for any of the configured
+ * job classes to the job configuration, so that JobClient will ship them
+ * to the cluster and add them to the DistributedCache.
+ */
+ public static void addDependencyJars(Job job) throws IOException {
+ try {
+ addDependencyJars(job.getConfiguration(),
+ ZooKeeper.class,
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ job.getOutputKeyClass(),
+ job.getOutputValueClass(),
+ job.getOutputFormatClass(),
+ job.getPartitionerClass(),
+ job.getCombinerClass());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Add the jars containing the given classes to the job's configuration
+ * such that JobClient will ship them to the cluster and add them to
+ * the DistributedCache.
+ */
+ public static void addDependencyJars(Configuration conf,
+ Class... classes) throws IOException {
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ Set jars = new HashSet();
+ for (Class clazz : classes) {
+ if (clazz == null) continue;
+
+ String pathStr = findContainingJar(clazz);
+ if (pathStr == null) {
+ LOG.warn("Could not find jar for class " + clazz +
+ " in order to ship it to the cluster.");
+ continue;
+ }
+ Path path = new Path(pathStr);
+ if (!localFs.exists(path)) {
+ LOG.warn("Could not validate jar file " + path + " for class "
+ + clazz);
+ continue;
+ }
+ jars.add(path.makeQualified(localFs).toString());
+ }
+ if (jars.isEmpty()) return;
+
+ String tmpJars = conf.get("tmpjars");
+ if (tmpJars == null) {
+ tmpJars = StringUtils.arrayToString(jars.toArray(new String[0]));
+ } else {
+ tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0]));
+ }
+ conf.set("tmpjars", tmpJars);
+ }
+
+ /**
+ * Find a jar that contains a class of the same name, if any.
+ * It will return a jar file, even if that is not the first thing
+ * on the class path that has a class with the same name.
+ *
+ * This is shamelessly copied from JobConf
+ *
+ * @param my_class the class to find.
+ * @return a jar file that contains the class, or null.
+ * @throws IOException
+ */
+ private static String findContainingJar(Class my_class) {
+ ClassLoader loader = my_class.getClassLoader();
+ String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for(Enumeration itr = loader.getResources(class_file);
+ itr.hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ // URLDecoder is a misnamed class, since it actually decodes
+ // x-www-form-urlencoded MIME type rather than actual
+ // URL encoding (which the file path has). Therefore it would
+ // decode +s to ' 's which is incorrect (spaces are actually
+ // either unencoded or encoded as "%20"). Replace +s first, so
+ // that they are kept sacred during the decoding process.
+ toReturn = toReturn.replaceAll("\\+", "%2B");
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+
}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
new file mode 100644
index 0000000..13be81e
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link TotalOrderPartitioner}.
+ *
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774, with the exception of replacing
+ * TaskAttemptContextImpl with TaskAttemptContext.
+ */
+public class InputSampler extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+ static int printUsage() {
+ System.out.println("sampler -r \n" +
+ " [-inFormat ]\n" +
+ " [-keyClass