From 3d20f416c5d426e35d105e3441115571cc6cfcf4 Mon Sep 17 00:00:00 2001 From: xiaowen147 Date: Sat, 11 Nov 2017 17:31:09 +0800 Subject: [PATCH] HBASE-19226 Limit the reduce tasks number of incremental load --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 48 +++++++++++++++++----- .../hbase/mapreduce/TestHFileOutputFormat2.java | 43 +++++++++++++++++++ 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index d7606fc..f0d0d94 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -172,6 +172,15 @@ public class HFileOutputFormat2 static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + /** + * This config is used to adjust the reduce tasks number for the MapReduce job + * when incremental load into a table,which is the ratio of the regions number + * between 0 and 1. + */ + static final String OUTPUT_REDUCES_RATIO_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.reduces.ratio"; + static final float DEFAULT_OUTPUT_REDUCES_RATIO = 1.0f; + public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY; public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; @@ -499,7 +508,7 @@ public class HFileOutputFormat2 * {@link TotalOrderPartitioner} that contains the split points in startKeys. */ @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, + static int writePartitions(Configuration conf, Path partitionsPath, List startKeys, boolean writeMultipleTables) throws IOException { LOG.info("Writing partition information to " + partitionsPath); if (startKeys.isEmpty()) { @@ -521,7 +530,18 @@ public class HFileOutputFormat2 "First region of table should have empty start key. Instead has: " + Bytes.toStringBinary(first.get())); } - sorted.remove(sorted.first()); + + int startKeysSize = sorted.size(); + float ratio = conf.getFloat(OUTPUT_REDUCES_RATIO_CONF_KEY, DEFAULT_OUTPUT_REDUCES_RATIO); + if (ratio > DEFAULT_OUTPUT_REDUCES_RATIO) { + LOG.warn("The value of " + OUTPUT_REDUCES_RATIO_CONF_KEY + " is larger than " + + DEFAULT_OUTPUT_REDUCES_RATIO + ", using the default value"); + } + int bucketCount = Math.min((int) Math.max(1, startKeysSize * ratio), startKeysSize); + int[] bucket = new int[bucketCount]; + for (int i = 0; i < startKeysSize; i++) { + bucket[i % bucketCount]++; + } // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); @@ -530,12 +550,20 @@ public class HFileOutputFormat2 NullWritable.class); try { + int offset = 0; + int bucketIndex = 0; for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); + if (offset == bucket[bucketIndex]) { + writer.append(startKey, NullWritable.get()); + bucketIndex++; + offset = 0; + } + offset++; } } finally { writer.close(); } + return bucketCount; } /** @@ -630,12 +658,11 @@ public class HFileOutputFormat2 conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes .toString(tableSeparator))); List startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); - // Use table's region boundaries for TOP split points. - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count for all tables"); - job.setNumReduceTasks(startKeys.size()); - configurePartitioner(job, startKeys, writeMultipleTables); + int numPartitions = configurePartitioner(job, startKeys, writeMultipleTables); + LOG.info("Configuring " + numPartitions + " reduce partitions"); + job.setNumReduceTasks(numPartitions); + // Set compression algorithms based on column families conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, @@ -789,7 +816,7 @@ public class HFileOutputFormat2 * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, List splitPoints, boolean + static int configurePartitioner(Job job, List splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); @@ -800,12 +827,13 @@ public class HFileOutputFormat2 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); + int numPartitions = writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); + return numPartitions; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index ffe3fe1..fffe319 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordWriter; @@ -1490,5 +1492,46 @@ public class TestHFileOutputFormat2 { return null; } + + @Test + public void testWritePartitionsWithRatio() throws Exception { + + float ratio = 0.3f; + Configuration conf = new Configuration(this.util.getConfiguration()); + conf.setFloat(HFileOutputFormat2.OUTPUT_REDUCES_RATIO_CONF_KEY, ratio); + + Path partitionsPath = new Path(util.getDataTestDir(), "partitions_" + UUID.randomUUID()); + + int numStartKeys = 10; + List startKeys = new ArrayList<>(); + startKeys.add(new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY)); + for (int i = 1; i < numStartKeys; i++) { + startKeys.add(new ImmutableBytesWritable(Bytes.toBytes(i))); + } + + FileSystem fs = partitionsPath.getFileSystem(conf); + try { + int numPartitions = HFileOutputFormat2.writePartitions(conf, partitionsPath, startKeys, + false); + assertEquals((int) (numStartKeys * ratio), numPartitions); + + List partitions = new ArrayList<>(); + @SuppressWarnings("deprecation") + SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionsPath, conf); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + while (reader.next(key)) { + partitions.add(new ImmutableBytesWritable(key.copyBytes())); + } + reader.close(); + + assertEquals((int) (numStartKeys * ratio) - 1, partitions.size()); + assertEquals(new ImmutableBytesWritable(Bytes.toBytes(4)), partitions.get(0)); + assertEquals(new ImmutableBytesWritable(Bytes.toBytes(7)), partitions.get(1)); + + } finally { + fs.delete(partitionsPath, true); + fs.close(); + } + } } -- 2.9.3.windows.2