diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index c1064b4..0ad65c3 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -275,7 +275,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { FileOutputFormat.setOutputPath(job, p); // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); // Run the job making sure it works. assertEquals(true, job.waitForCompletion(true)); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index c10359e..402381b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -65,6 +65,7 @@ public class HFileOutputFormat extends FileOutputFormat getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return HFileOutputFormat2.createRecordWriter(context); @@ -86,7 +87,7 @@ public class HFileOutputFormat extends FileOutputFormat * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, HTable)}. + * using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -107,6 +108,7 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context); @@ -114,7 +116,7 @@ public class HFileOutputFormat2 static RecordWriter createRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); @@ -155,6 +157,7 @@ public class HFileOutputFormat2 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private boolean rollRequested = false; + @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); @@ -267,6 +270,7 @@ public class HFileOutputFormat2 } } + @Override public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (WriterLength wl: this.writers.values()) { @@ -354,13 +358,35 @@ public class HFileOutputFormat2 * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. + * + * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. */ + @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table, HFileOutputFormat2.class); + configureIncrementalLoad(job, table, table); } - static void configureIncrementalLoad(Job job, HTable table, + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *
    + *
  • Inspects the table to configure a total order partitioner
  • + *
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • + *
  • Sets the number of reduce tasks to match the current number of regions
  • + *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • + *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)
  • + *
+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) + throws IOException { + configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); + } + + static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -386,8 +412,8 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); - List startKeys = getRegionStartKeys(table); + LOG.info("Looking up current regions for table " + table.getName()); + List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); @@ -401,8 +427,7 @@ public class HFileOutputFormat2 TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + Bytes.toString(table.getTableName()) - + " output configured."); + LOG.info("Incremental table " + table.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 1033dac..ef93af9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -444,7 +444,7 @@ public class Import extends Configured implements Tool { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index f586523..ed3a34d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -457,7 +457,7 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); } else { if (!admin.tableExists(tableName)) { String errorMsg = format("Table '%s' does not exist.", tableName); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index cf9dc56..8bec79b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -253,7 +253,7 @@ public class WALPlayer extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 6ab675f..b9ecb7c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -337,7 +337,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); HTable table = Mockito.mock(HTable.class); setupMockStartKeys(table); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); assertEquals(job.getNumReduceTasks(), 4); } @@ -467,7 +467,7 @@ public class TestHFileOutputFormat2 { MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, outDir); assertFalse(util.getTestFileSystem().exists(outDir)) ; @@ -810,7 +810,7 @@ public class TestHFileOutputFormat2 { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat2 hof = new HFileOutputFormat2();