diff --git pom.xml pom.xml
index 0a009cf..dfb6940 100644
--- pom.xml
+++ pom.xml
@@ -432,6 +432,7 @@
1.1.16.1.144.8.1
+ 1.8.41.2.153.3.1
@@ -528,6 +529,12 @@
${junit.version}test
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
@@ -592,6 +599,12 @@
+ com.google.guava
+ guava
+ r03
+
+
+ org.apache.hadoophadoop-core
@@ -691,6 +704,10 @@
junit
+ org.mockito
+ mockito-all
+
+ org.apache.commonscommons-math${commons-math.version}
diff --git src/main/java/org/apache/hadoop/hbase/HRegionInfo.java src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 29b0cd6..7a9af9b 100644
--- src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -259,11 +259,6 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
return elements;
}
- /** @return the endKey */
- public byte [] getEndKey(){
- return endKey;
- }
-
/** @return the regionId */
public long getRegionId(){
return regionId;
@@ -296,6 +291,32 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable
public byte [] getStartKey(){
return startKey;
}
+
+ /** @return the endKey */
+ public byte [] getEndKey(){
+ return endKey;
+ }
+
+ /**
+ * Returns true if the given inclusive range of rows is fully contained
+ * by this region. For example, if the region is foo,a,g and this is
+ * passed ["b","c"] or ["a","c"] it will return true, but if this is passed
+ * ["b","z"] it will return false.
+ * @throws IllegalArgumentException if the range passed is invalid (ie end < start)
+ */
+ public boolean containsRange(byte[] rangeStartKey, byte[] rangeEndKey) {
+ if (Bytes.compareTo(rangeStartKey, rangeEndKey) > 0) {
+ throw new IllegalArgumentException(
+ "Invalid range: " + Bytes.toStringBinary(rangeStartKey) +
+ " > " + Bytes.toStringBinary(rangeEndKey));
+ }
+
+ boolean firstKeyInRange = Bytes.compareTo(rangeStartKey, startKey) >= 0;
+ boolean lastKeyInRange =
+ Bytes.compareTo(rangeEndKey, endKey) < 0 ||
+ Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY);
+ return firstKeyInRange && lastKeyInRange;
+ }
/** @return the tableDesc */
public HTableDescriptor getTableDesc(){
diff --git src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
index 0a9ec4b..10d4607 100644
--- src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
+++ src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.io.BytesWritable;
@@ -253,4 +254,11 @@ implements WritableComparable {
}
return results;
}
+
+ /**
+ * Returns a copy of the bytes referred to by this writable
+ */
+ public byte[] copyBytes() {
+ return Arrays.copyOfRange(bytes, offset, length);
+ }
}
diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index b912a85..870cf98 100644
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -462,7 +462,7 @@ public class HFile {
throw new NullPointerException("Key nor value may be null");
}
if (checkPrefix &&
- Bytes.toString(k).toLowerCase().startsWith(FileInfo.RESERVED_PREFIX)) {
+ Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
" are reserved");
}
@@ -1069,6 +1069,8 @@ public class HFile {
/**
* @return First key in the file. May be null if file has no entries.
+ * Note that this is not the first rowkey, but rather the byte form of
+ * the first KeyValue.
*/
public byte [] getFirstKey() {
if (blockIndex == null) {
@@ -1076,6 +1078,17 @@ public class HFile {
}
return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
}
+
+ /**
+ * @return the first row key, or null if the file is empty.
+ * TODO move this to StoreFile after Ryan's patch goes in
+ * to eliminate KeyValue here
+ */
+ public byte[] getFirstRowKey() {
+ byte[] firstKey = getFirstKey();
+ if (firstKey == null) return null;
+ return KeyValue.createKeyValueFromKey(firstKey).getRow();
+ }
/**
* @return number of KV entries in this HFile
@@ -1089,6 +1102,8 @@ public class HFile {
/**
* @return Last key in the file. May be null if file has no entries.
+ * Note that this is not the last rowkey, but rather the byte form of
+ * the last KeyValue.
*/
public byte [] getLastKey() {
if (!isFileInfoLoaded()) {
@@ -1098,6 +1113,17 @@ public class HFile {
}
/**
+ * @return the last row key, or null if the file is empty.
+ * TODO move this to StoreFile after Ryan's patch goes in
+ * to eliminate KeyValue here
+ */
+ public byte[] getLastRowKey() {
+ byte[] lastKey = getLastKey();
+ if (lastKey == null) return null;
+ return KeyValue.createKeyValueFromKey(lastKey).getRow();
+ }
+
+ /**
* @return number of K entries in this HFile's filter. Returns KV count if no filter.
*/
public int getFilterEntries() {
@@ -1664,6 +1690,7 @@ public class HFile {
*/
static class FileInfo extends HbaseMapWritable {
static final String RESERVED_PREFIX = "hfile.";
+ static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
static final byte [] AVG_KEY_LEN =
Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
@@ -1679,6 +1706,15 @@ public class HFile {
super();
}
}
+
+ /**
+ * Return true if the given file info key is reserved for internal
+ * use by HFile.
+ */
+ public static boolean isReservedFileInfoKey(byte[] key) {
+ return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
+ }
+
/**
* Get names of supported compression algorithms. The names are acceptable by
@@ -1861,5 +1897,4 @@ public class HFile {
e.printStackTrace();
}
}
-
}
diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
index 4cbe52a..be2e5f8 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
@@ -258,5 +258,10 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* @throws IOException e
*/
public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-
+
+ /**
+ * Bulk load an HFile into an open region
+ */
+ public void bulkLoadHFile(String hfilePath,
+ byte[] regionName, byte[] familyName) throws IOException;
}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index 3d40695..c6ec4ae 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -36,6 +36,7 @@ public class Driver {
"Count rows in HBase table");
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+ pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster");
pgd.driver(args);
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
index 9c8e53e..2a272af 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
@@ -20,24 +20,40 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.mortbay.log.Log;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Preconditions;
/**
* Writes HFiles. Passed KeyValues must arrive in order.
@@ -48,7 +64,9 @@ import org.mortbay.log.Log;
* @see KeyValueSortReducer
*/
public class HFileOutputFormat extends FileOutputFormat {
- public RecordWriter getRecordWriter(TaskAttemptContext context)
+ static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+
+ public RecordWriter getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
@@ -86,7 +104,7 @@ public class HFileOutputFormat extends FileOutputFormat sorted =
+ new TreeSet(startKeys);
+
+ ImmutableBytesWritable first = sorted.first();
+ Preconditions.checkArgument(
+ first.equals(HConstants.EMPTY_BYTE_ARRAY),
+ "First region of table should have empty start key. Instead has: %s",
+ Bytes.toStringBinary(first.get()));
+ sorted.remove(first);
+
+ // Write the actual file
+ FileSystem fs = partitionsPath.getFileSystem(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+ conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
+
+ try {
+ for (ImmutableBytesWritable startKey : sorted) {
+ writer.append(startKey, NullWritable.get());
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Configure a MapReduce Job to perform an incremental load into the given
+ * table. This
+ *
+ *
Inspects the table to configure a total order partitioner
+ *
Uploads the partitions file to the cluster and adds it to the DistributedCache
+ *
Sets the number of reduce tasks to match the current number of regions
+ *
Sets the output key/value class to match HFileOutputFormat's requirements
+ *
Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+ * PutSortReducer)
+ *
+ * The user should be sure to set the map output value class to either KeyValue or Put before
+ * running this function.
+ */
+ public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
+ Configuration conf = job.getConfiguration();
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setOutputFormatClass(HFileOutputFormat.class);
+
+ // Based on the configured map output class, set the correct reducer to properly
+ // sort the incoming values.
+ // TODO it would be nice to pick one or the other of these formats.
+ if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(KeyValueSortReducer.class);
+ } else if (Put.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(PutSortReducer.class);
+ } else {
+ LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+ }
+
+ LOG.info("Looking up current regions for table " + table);
+ List startKeys = getRegionStartKeys(table);
+ LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+ "to match current region count");
+ job.setNumReduceTasks(startKeys.size());
+
+ Path partitionsPath = new Path(job.getWorkingDirectory(),
+ "partitions_" + System.currentTimeMillis());
+ LOG.info("Writing partition information to " + partitionsPath);
+
+ FileSystem fs = partitionsPath.getFileSystem(conf);
+ writePartitions(conf, partitionsPath, startKeys);
+ partitionsPath.makeQualified(fs);
+ URI cacheUri;
+ try {
+ cacheUri = new URI(partitionsPath.toString() + "#" +
+ TotalOrderPartitioner.DEFAULT_PATH);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ DistributedCache.addCacheFile(cacheUri, conf);
+ DistributedCache.createSymlink(conf);
+
+ LOG.info("Incremental table output configured.");
+ }
+
}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
new file mode 100644
index 0000000..03026f0
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -0,0 +1,200 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+/**
+ * Tool to import data from a TSV file.
+ * @see ImportTsv#usage(String)
+ */
+public class ImportTsv {
+ final static String NAME = "importtsv";
+
+ /**
+ * Write table content out to files in hdfs.
+ */
+ static class TsvImporter
+ extends Mapper
+ {
+ /**
+ * Column families and qualifiers mapped to the TSV columns
+ */
+ private byte[][] families;
+ private byte[][] qualifiers;
+
+ /** Timestamp for all inserted rows */
+ private long ts;
+
+ @Override
+ protected void setup(Context context) {
+ String columnStrings[] = context.getConfiguration().getStrings("columns");
+ families = new byte[columnStrings.length][];
+ qualifiers = new byte[columnStrings.length][];
+
+ for (int i = 0; i < columnStrings.length; i++) {
+ String str = columnStrings[i];
+ String[] parts = str.split(":", 2);
+ if (parts.length == 1) {
+ families[i] = columnStrings[i].getBytes();
+ qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+ } else {
+ families[i] = parts[0].getBytes();
+ qualifiers[i] = parts[1].getBytes();
+ }
+ }
+
+ ts = System.currentTimeMillis();
+ }
+
+ /**
+ * Convert a line of TSV text into an HBase table row.
+ */
+ @Override
+ public void map(LongWritable offset, Text value,
+ Context context)
+ throws IOException {
+ try {
+ byte[] lineBytes = value.getBytes();
+ LinkedList tabOffsets = new LinkedList();
+ for (int i = 0; i < value.getLength(); i++) {
+ if (lineBytes[i] == '\t') {
+ tabOffsets.add(i);
+ }
+ }
+ tabOffsets.add(lineBytes.length);
+
+ int rowkeySep = tabOffsets.remove();
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(lineBytes, 0, rowkeySep);
+
+ int lastOff = rowkeySep;
+ int colIdx = 0;
+ Put put = new Put(rowKey.copyBytes());
+ for (int tabOff : tabOffsets) {
+ KeyValue kv = new KeyValue(
+ lineBytes, 0, rowkeySep, // row key
+ families[colIdx], 0, families[colIdx].length,
+ qualifiers[colIdx], 0, qualifiers[colIdx].length,
+ ts,
+ KeyValue.Type.Put,
+ lineBytes, lastOff + 1, tabOff - lastOff - 1);
+ put.add(kv);
+ lastOff = tabOff;
+ colIdx++;
+ }
+ context.write(rowKey, put);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ String tableName = args[0];
+ Path inputDir = new Path(args[1]);
+ Job job = new Job(conf, NAME + "_" + tableName);
+ job.setJarByClass(TsvImporter.class);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(TsvImporter.class);
+
+ String hfileOutPath = conf.get("bulk.output");
+ if (hfileOutPath != null) {
+ HTable table = new HTable(conf, tableName);
+ job.setReducerClass(PutSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(Put.class);
+ HFileOutputFormat.configureIncrementalLoad(job, table);
+ } else {
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // to set up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName, null, job);
+ job.setNumReduceTasks(0);
+ }
+
+ TableMapReduceUtil.addDependencyJars(job);
+ return job;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println(
+ "Usage: ImportTsv -Dcolumns=a,b,c ");
+ System.err.println("");
+ System.err.println("For bulk load output, pass -Dbulk.output=/path/to/output");
+ }
+
+ /**
+ * Main entry point.
+ *
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ if (otherArgs.length < 2) {
+ usage("Wrong number of arguments: " + otherArgs.length);
+ System.exit(-1);
+ }
+ String columns[] = conf.getStrings("columns");
+ if (columns == null) {
+ usage("No columns specified. Please specify with -Dcolumns=...");
+ }
+ Job job = createSubmittableJob(conf, otherArgs);
+ System.exit(job.waitForCompletion(true) ? 0 : 1);
+ }
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
new file mode 100644
index 0000000..491377f
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -0,0 +1,319 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool to load the output of HFileOutputFormat into an existing table.
+ * @see usage()
+ */
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+ static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+ public LoadIncrementalHFiles(Configuration conf) {
+ super(conf);
+ }
+
+ public LoadIncrementalHFiles() {
+ super();
+ }
+
+
+ private void usage() {
+ System.err.println("usage: LoadIncrementalFiles " +
+ "/path/to/hfileoutputformat-output " +
+ "tablename");
+ }
+
+ /**
+ * Represents an HFile waiting to be loaded. An queue is used
+ * in this class in order to support the case where a region has
+ * split during the process of the load. When this happens,
+ * the HFile is split into two physical parts across the new
+ * region boundary, and each part is added back into the queue.
+ * The import process finishes when the queue is empty.
+ */
+ private static class LoadQueueItem {
+ final byte[] family;
+ final Path hfilePath;
+
+ public LoadQueueItem(byte[] family, Path hfilePath) {
+ this.family = family;
+ this.hfilePath = hfilePath;
+ }
+ }
+
+ /**
+ * Walk the given directory for all HFiles, and return a Queue
+ * containing all such files.
+ */
+ private Deque discoverLoadQueue(Path hfofDir)
+ throws IOException {
+ FileSystem fs = hfofDir.getFileSystem(getConf());
+
+ if (!fs.exists(hfofDir)) {
+ throw new FileNotFoundException("HFileOutputFormat dir " +
+ hfofDir + " not found");
+ }
+
+ FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+ if (familyDirStatuses == null) {
+ throw new FileNotFoundException("No families found in " + hfofDir);
+ }
+
+ Deque ret = new LinkedList();
+ for (FileStatus stat : familyDirStatuses) {
+ if (!stat.isDir()) {
+ LOG.warn("Skipping non-directory " + stat.getPath());
+ continue;
+ }
+ Path familyDir = stat.getPath();
+ // Skip _logs, etc
+ if (familyDir.getName().startsWith("_")) continue;
+ byte[] family = familyDir.getName().getBytes();
+ Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+ for (Path hfile : hfiles) {
+ if (hfile.getName().startsWith("_")) continue;
+ ret.add(new LoadQueueItem(family, hfile));
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given
+ * pre-existing table.
+ * @param hfofDir the directory that was provided as the output path
+ * of a job using HFileOutputFormat
+ * @param table the table to load into
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public void doBulkLoad(Path hfofDir, HTable table)
+ throws TableNotFoundException, IOException
+ {
+ HConnection conn = table.getConnection();
+
+ if (!conn.isTableAvailable(table.getTableName())) {
+ throw new TableNotFoundException("Table " +
+ Bytes.toStringBinary(table.getTableName()) +
+ "is not currently available.");
+ }
+
+ Deque queue = null;
+ try {
+ queue = discoverLoadQueue(hfofDir);
+ while (!queue.isEmpty()) {
+ LoadQueueItem item = queue.remove();
+ tryLoad(item, conn, table.getTableName(), queue);
+ }
+ } finally {
+ if (queue != null && !queue.isEmpty()) {
+ StringBuilder err = new StringBuilder();
+ err.append("-------------------------------------------------\n");
+ err.append("Bulk load aborted with some files not yet loaded:\n");
+ err.append("-------------------------------------------------\n");
+ for (LoadQueueItem q : queue) {
+ err.append(" ").append(q.hfilePath).append('\n');
+ }
+ LOG.error(err);
+ }
+ }
+ }
+
+ /**
+ * Attempt to load the given load queue item into its target region server.
+ * If the hfile boundary no longer fits into a region, physically splits
+ * the hfile such that the new bottom half will fit, and adds the two
+ * resultant hfiles back into the load queue.
+ */
+ private void tryLoad(final LoadQueueItem item,
+ HConnection conn, final byte[] table,
+ final Deque queue)
+ throws IOException {
+ final Path hfilePath = item.hfilePath;
+ final FileSystem fs = hfilePath.getFileSystem(getConf());
+ HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
+ final byte[] first, last;
+ try {
+ hfr.loadFileInfo();
+ first = hfr.getFirstRowKey();
+ last = hfr.getLastRowKey();
+ } finally {
+ hfr.close();
+ }
+
+ LOG.info("Trying to load hfile=" + hfilePath +
+ " first=" + Bytes.toStringBinary(first) +
+ " last=" + Bytes.toStringBinary(last));
+ if (first == null || last == null) {
+ assert first == null && last == null;
+ LOG.info("hfile " + hfilePath + " has no entries, skipping");
+ return;
+ }
+
+ // We use a '_' prefix which is ignored when walking directory trees
+ // above.
+ final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
+
+ conn.getRegionServerWithRetries(
+ new ServerCallable(conn, table, first) {
+ @Override
+ public Void call() throws Exception {
+ LOG.debug("Going to connect to server " + server +
+ "for row " + Bytes.toStringBinary(row));
+ HRegionInfo hri = location.getRegionInfo();
+ if (!hri.containsRange(first, last)) {
+ LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
+ "region. Splitting...");
+
+ HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
+ Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
+ Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
+ splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
+ botOut, topOut);
+
+ // Add these back at the *front* of the queue, so there's a lower
+ // chance that the region will just split again before we get there.
+ queue.addFirst(new LoadQueueItem(item.family, botOut));
+ queue.addFirst(new LoadQueueItem(item.family, topOut));
+ LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+ return null;
+ }
+
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Split a storefile into a top and bottom half, maintaining
+ * the metadata, recreating bloom filters, etc.
+ */
+ static void splitStoreFile(
+ Configuration conf, Path inFile,
+ HColumnDescriptor familyDesc, byte[] splitKey,
+ Path bottomOut, Path topOut) throws IOException
+ {
+ // Open reader with no block cache, and not in-memory
+ Reference topReference = new Reference(splitKey, Range.top);
+ Reference bottomReference = new Reference(splitKey, Range.bottom);
+
+ copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+ copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+ }
+
+ /**
+ * Copy half of an HFile into a new HFile.
+ */
+ private static void copyHFileHalf(
+ Configuration conf, Path inFile, Path outFile, Reference reference,
+ HColumnDescriptor familyDescriptor)
+ throws IOException {
+ FileSystem fs = inFile.getFileSystem(conf);
+ HalfStoreFileReader halfReader = null;
+ HFile.Writer halfWriter = null;
+ try {
+ halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
+ Map fileInfo = halfReader.loadFileInfo();
+
+ int blocksize = familyDescriptor.getBlocksize();
+ Algorithm compression = familyDescriptor.getCompression();
+ BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+
+ halfWriter = new StoreFile.Writer(
+ fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
+ bloomFilterType, 0);
+ HFileScanner scanner = halfReader.getScanner(false, false);
+ scanner.seekTo();
+ do {
+ KeyValue kv = scanner.getKeyValue();
+ halfWriter.append(kv);
+ } while (scanner.next());
+
+ for (Map.Entry entry : fileInfo.entrySet()) {
+ if (shouldCopyHFileMetaKey(entry.getKey())) {
+ halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+ }
+ }
+ } finally {
+ if (halfWriter != null) halfWriter.close();
+ if (halfReader != null) halfReader.close();
+ }
+ }
+
+ private static boolean shouldCopyHFileMetaKey(byte[] key) {
+ return !HFile.isReservedFileInfoKey(key);
+ }
+
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2) {
+ usage();
+ return -1;
+ }
+
+ Path hfofDir = new Path(args[0]);
+ HTable table = new HTable(args[1]);
+
+ doBulkLoad(hfofDir, table);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new LoadIncrementalHFiles(), args);
+ }
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
new file mode 100644
index 0000000..5fb3e83
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted Puts.
+ * Reads in all Puts from passed Iterator, sorts them, then emits
+ * Puts in sorted order. If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ */
+public class PutSortReducer extends
+ Reducer {
+
+ @Override
+ protected void reduce(
+ ImmutableBytesWritable row,
+ java.lang.Iterable puts,
+ Reducer.Context context)
+ throws java.io.IOException, InterruptedException
+ {
+ TreeSet map = new TreeSet(KeyValue.COMPARATOR);
+
+ for (Put p : puts) {
+ for (List kvs : p.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ map.add(kv.clone());
+ }
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(row, kv);
+ if (index > 0 && index % 100 == 0)
+ context.setStatus("Wrote " + index);
+ }
+ }
+}
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
index af3d588..0cda76e 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Partitioner;
@@ -44,15 +45,55 @@ import org.apache.hadoop.mapreduce.Partitioner;
*/
public class SimpleTotalOrderPartitioner extends Partitioner
implements Configurable {
- private final Log LOG = LogFactory.getLog(this.getClass());
+ private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
+
+ @Deprecated
public static final String START = "hbase.simpletotalorder.start";
+ @Deprecated
public static final String END = "hbase.simpletotalorder.end";
+
+ static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
+ static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
+
private Configuration c;
private byte [] startkey;
private byte [] endkey;
private byte [][] splits;
private int lastReduces = -1;
+ public static void setStartKey(Configuration conf, byte[] startKey) {
+ conf.set(START_BASE64, Base64.encodeBytes(startKey));
+ }
+
+ public static void setEndKey(Configuration conf, byte[] endKey) {
+ conf.set(END_BASE64, Base64.encodeBytes(endKey));
+ }
+
+ @SuppressWarnings("deprecation")
+ static byte[] getStartKey(Configuration conf) {
+ return getKeyFromConf(conf, START_BASE64, START);
+ }
+
+ @SuppressWarnings("deprecation")
+ static byte[] getEndKey(Configuration conf) {
+ return getKeyFromConf(conf, END_BASE64, END);
+ }
+
+ private static byte[] getKeyFromConf(Configuration conf,
+ String base64Key, String deprecatedKey) {
+ String encoded = conf.get(base64Key);
+ if (encoded != null) {
+ return Base64.decode(encoded);
+ }
+ String oldStyleVal = conf.get(deprecatedKey);
+ if (oldStyleVal == null) {
+ return null;
+ }
+ LOG.warn("Using deprecated configuration " + deprecatedKey +
+ " - please use static accessor methods instead.");
+ return Bytes.toBytes(oldStyleVal);
+ }
+
@Override
public int getPartition(final ImmutableBytesWritable key, final VALUE value,
final int reduces) {
@@ -87,10 +128,12 @@ implements Configurable {
@Override
public void setConf(Configuration conf) {
this.c = conf;
- String startStr = this.c.get(START);
- String endStr = this.c.get(END);
- LOG.info("startkey=" + startStr + ", endkey=" + endStr);
- this.startkey = Bytes.toBytes(startStr);
- this.endkey = Bytes.toBytes(endStr);
+ this.startkey = getStartKey(conf);
+ this.endkey = getEndKey(conf);
+ if (startkey == null || endkey == null) {
+ throw new RuntimeException(this.getClass() + " not configured");
+ }
+ LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
+ ", endkey=" + Bytes.toStringBinary(endkey));
}
}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 07d7911..22a8c8d 100644
--- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.ZooKeeper;
+import com.google.common.base.Function;
+
/**
* Utility for {@link TableMapper} and {@link TableReducer}
*/
@@ -245,6 +247,7 @@ public class TableMapReduceUtil {
try {
addDependencyJars(job.getConfiguration(),
ZooKeeper.class,
+ Function.class, // Guava collections
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getOutputKeyClass(),
diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
new file mode 100644
index 0000000..13be81e
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link TotalOrderPartitioner}.
+ *
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774, with the exception of replacing
+ * TaskAttemptContextImpl with TaskAttemptContext.
+ */
+public class InputSampler extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+ static int printUsage() {
+ System.out.println("sampler -r \n" +
+ " [-inFormat ]\n" +
+ " [-keyClass