From 81b82f3d7e41aed633cd3d5311ac4bf34a41dd61 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 25 Nov 2014 12:04:42 -0800 Subject: [PATCH] HBASE-12514 Cleanup HFileOutputFormat legacy code (Solomon Duskis) --- .../hbase/mapreduce/IntegrationTestBulkLoad.java | 2 +- .../hadoop/hbase/mapreduce/HFileOutputFormat.java | 3 +- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 43 +++++++++++++++++----- .../org/apache/hadoop/hbase/mapreduce/Import.java | 2 +- .../apache/hadoop/hbase/mapreduce/ImportTsv.java | 18 ++++----- .../apache/hadoop/hbase/mapreduce/WALPlayer.java | 2 +- .../hbase/mapreduce/TestHFileOutputFormat2.java | 6 +-- 7 files changed, 51 insertions(+), 25 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index c1064b4..0ad65c3 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -275,7 +275,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { FileOutputFormat.setOutputPath(job, p); // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); // Run the job making sure it works. assertEquals(true, job.waitForCompletion(true)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index c10359e..402381b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -65,6 +65,7 @@ public class HFileOutputFormat extends FileOutputFormat getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return HFileOutputFormat2.createRecordWriter(context); @@ -86,7 +87,7 @@ public class HFileOutputFormat extends FileOutputFormat * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, HTable)}. + * using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -107,6 +108,7 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context); @@ -114,7 +116,7 @@ public class HFileOutputFormat2 static RecordWriter createRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); @@ -155,6 +157,7 @@ public class HFileOutputFormat2 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private boolean rollRequested = false; + @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); @@ -267,6 +270,7 @@ public class HFileOutputFormat2 } } + @Override public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (WriterLength wl: this.writers.values()) { @@ -354,13 +358,35 @@ public class HFileOutputFormat2 * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. + * + * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. */ + @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table, HFileOutputFormat2.class); + configureIncrementalLoad(job, table, table); } - static void configureIncrementalLoad(Job job, HTable table, + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *
    + *
  • Inspects the table to configure a total order partitioner
  • + *
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • + *
  • Sets the number of reduce tasks to match the current number of regions
  • + *
  • Sets the output key/value class to match HFileOutputFormat2's requirements
  • + *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)
  • + *
+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) + throws IOException { + configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); + } + + static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -386,8 +412,8 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); - List startKeys = getRegionStartKeys(table); + LOG.info("Looking up current regions for table " + table.getName()); + List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); @@ -401,8 +427,7 @@ public class HFileOutputFormat2 TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + Bytes.toString(table.getTableName()) - + " output configured."); + LOG.info("Incremental table " + table.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 15540c6..853ead3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -452,7 +452,7 @@ public class Import extends Configured implements Tool { FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index b54e3ea..c181fdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -136,7 +136,7 @@ public class ImportTsv extends Configured implements Tool { /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC - * @param separatorStr + * @param separatorStr */ public TsvParser(String columnsSpecification, String separatorStr) { // Configure separator @@ -260,7 +260,7 @@ public class ImportTsv extends Configured implements Tool { public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } - + public long getTimestamp(long ts) throws BadTsvLineException { // Return ts if HBASE_TS_KEY is not configured in column spec if (!hasTimestamp()) { @@ -286,7 +286,7 @@ public class ImportTsv extends Configured implements Tool { getColumnLength(attrKeyColumnIndex)); } } - + public String[] getIndividualAttributes() { String attributes = getAttributes(); if (attributes != null) { @@ -295,7 +295,7 @@ public class ImportTsv extends Configured implements Tool { return null; } } - + public int getAttributeKeyOffset() { if (hasAttributes()) { return getColumnOffset(attrKeyColumnIndex); @@ -461,7 +461,7 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); } } else { if (!admin.tableExists(tableName)) { @@ -520,7 +520,7 @@ public class ImportTsv extends Configured implements Tool { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - String usage = + String usage = "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c \n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + @@ -607,7 +607,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); return -1; } - + int attrKeysFound = 0; for (String col : columns) { if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) @@ -618,7 +618,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.ATTRIBUTES_COLUMN_SPEC); return -1; } - + // Make sure one or more columns are specified excluding rowkey and // timestamp key if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { @@ -633,7 +633,7 @@ public class ImportTsv extends Configured implements Tool { // Set it back to replace invalid timestamp (non-numeric) with current // system time getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); - + Job job = createSubmittableJob(getConf(), otherArgs); return job.waitForCompletion(true) ? 0 : 1; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 26fab5a..255b6d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -265,7 +265,7 @@ public class WALPlayer extends Configured implements Tool { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 6ab675f..b9ecb7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -337,7 +337,7 @@ public class TestHFileOutputFormat2 { job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); HTable table = Mockito.mock(HTable.class); setupMockStartKeys(table); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); assertEquals(job.getNumReduceTasks(), 4); } @@ -467,7 +467,7 @@ public class TestHFileOutputFormat2 { MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, outDir); assertFalse(util.getTestFileSystem().exists(outDir)) ; @@ -810,7 +810,7 @@ public class TestHFileOutputFormat2 { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat2.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat2 hof = new HFileOutputFormat2(); -- 1.8.5.2 (Apple Git-48)