From 2be5ce9048c0d1d567e53da9b295bb1eaa8a7229 Mon Sep 17 00:00:00 2001 From: xiaowen147 Date: Sat, 11 Nov 2017 00:13:53 +0800 Subject: [PATCH] HBASE-19226 Limit the reduce tasks number of incremental load --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 39 ++++++++++++++++------ .../hbase/mapreduce/TestHFileOutputFormat2.java | 25 ++++++++++++++ 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index d7606fc..059565b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -172,6 +172,10 @@ public class HFileOutputFormat2 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + static final String OUTPUT_REDUCES_RATIO_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.reduces.ratio"; + static final double DEFAULT_OUTPUT_REDUCES_RATIO = 1D; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -499,7 +503,7 @@ public class HFileOutputFormat2 * {@link TotalOrderPartitioner} that contains the split points in startKeys. */ @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, + static int writePartitions(Configuration conf, Path partitionsPath, List startKeys, boolean writeMultipleTables) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { @@ -521,7 +525,14 @@ public class HFileOutputFormat2 "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } - sorted.remove(sorted.first()); + + int startKeysSize = sorted.size(); + double ratio = conf.getDouble(OUTPUT_REDUCES_RATIO_CONF_KEY, DEFAULT_OUTPUT_REDUCES_RATIO); + int bucketCount = (int) Math.min(Math.max(1, startKeysSize * ratio), startKeysSize); + int[] bucket = new int[bucketCount]; + for (int i = 0; i < startKeysSize; i++) { + bucket[i % bucketCount]++; + } // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); @@ -530,12 +541,20 @@ public class HFileOutputFormat2 NullWritable.class); try { + int offset = 0; + int bucketIndex = 0; for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); + if (offset == bucket[bucketIndex]) { + writer.append(startKey, NullWritable.get()); + bucketIndex++; + offset = 0; + } + offset++; } } finally { writer.close(); } + return bucketCount; } /** @@ -630,12 +649,11 @@ public class HFileOutputFormat2 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes .toString(tableSeparator))); List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); - // Use table's region boundaries for TOP split points. - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count for all tables"); - job.setNumReduceTasks(startKeys.size()); - configurePartitioner(job, startKeys, writeMultipleTables); + int numPartitions = configurePartitioner(job, startKeys, writeMultipleTables); + LOG.info("Configuring " + numPartitions + " reduce partitions"); + job.setNumReduceTasks(numPartitions); + // Set compression algorithms based on column families conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, @@ -789,7 +807,7 @@ public class HFileOutputFormat2 * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, List splitPoints, boolean + static int configurePartitioner(Job job, List splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); @@ -800,12 +818,13 @@ public class HFileOutputFormat2 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); + int numPartitions = writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); + return numPartitions; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index ffe3fe1..4605d72 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1490,5 +1491,29 @@ public class TestHFileOutputFormat2 { return null; } + + @Test + public void testWritePartitionsWithRatio() throws Exception { + + double ratio = 0.3d; + Configuration conf = new Configuration(this.util.getConfiguration()); + conf.setDouble(HFileOutputFormat2.OUTPUT_REDUCES_RATIO_CONF_KEY, ratio); + + Path partitionsPath = new Path(util.getDataTestDir(), "partitions_" + UUID.randomUUID()); + + int numStartKeys = 10; + List startKeys = new ArrayList<>(); + startKeys.add(new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY)); + for (int i = 1; i < numStartKeys; i++) { + startKeys.add(new ImmutableBytesWritable(Bytes.toBytes(i))); + } + + try { + int numPartitions = HFileOutputFormat2.writePartitions(conf, partitionsPath, startKeys, false); + assertEquals((int) (numStartKeys * ratio), numPartitions); + } finally { + partitionsPath.getFileSystem(conf).delete(partitionsPath, true); + } + } } -- 2.9.3.windows.2