From 7c6a70827c91f14094a4b4ad87f894c32368a544 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Tue, 16 Feb 2016 15:34:52 +0800 Subject: [PATCH] KYLIN-1323 Improve performance of converting data to hfile --- .../java/org/apache/kylin/common/KylinConfig.java | 5 +++ conf/kylin.properties | 3 ++ .../apache/kylin/job/constant/BatchConstants.java | 1 + .../apache/kylin/job/cube/CubingJobBuilder.java | 3 +- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 2 +- .../apache/kylin/job/hadoop/cube/CubeHFileJob.java | 47 +++++++++++++++++++++- .../job/hadoop/cube/RangeKeyDistributionJob.java | 2 + .../hadoop/cube/RangeKeyDistributionReducer.java | 10 ++++- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 12 ++++-- .../kylin/job/hadoop/hbase/CreateHTableTest.java | 2 +- 10 files changed, 78 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 59cb86b..b3a2855 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -131,6 +131,7 @@ public class KylinConfig { public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec"; + public static final String HBASE_HFILE_PER_REGION = "kylin.hbase.hfile.per.region"; public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small"; public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium"; public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large"; @@ -715,6 +716,10 @@ public class KylinConfig { kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v); } + public int getHBaseHFilePerRegion() { + return Integer.parseInt(getOptional(HBASE_HFILE_PER_REGION, "1")); + } + public int getHBaseRegionCountMin() { return Integer.parseInt(getOptional(HBASE_REGION_COUNT_MIN, "1")); } diff --git a/conf/kylin.properties b/conf/kylin.properties index e0727ed..b2cf10b 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -83,6 +83,9 @@ kylin.hbase.region.cut.large=100 kylin.hbase.region.count.min=1 kylin.hbase.region.count.max=500 +# The hfile count in one region, making the converting hfile step more concurrency and faster +kylin.hbase.hfile.per.region=1 + # Enable/disable ACL check for cube query kylin.query.security.enabled=true diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java index 38f4a87..9360d73 100644 --- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java +++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java @@ -47,6 +47,7 @@ public interface BatchConstants { String REGION_NUMBER_MIN = "region.number.min"; String REGION_NUMBER_MAX = "region.number.max"; String REGION_SPLIT_SIZE = "region.split.size"; + String HFILE_PER_REGION = "hfile.per.region"; String CUBE_CAPACITY = "cube.capacity"; String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/"; diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index 80c030f..5e0875a 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -372,7 +372,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); @@ -388,6 +388,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { appendMapReduceParameters(cmd, seg); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, "input", inputPath); appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index 698a978..a7d107d 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim"); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level"); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input"); + protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions"); protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename"); protected String name; diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 3c1e4a5..4fa2224 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -18,17 +18,25 @@ package org.apache.kylin.job.hadoop.cube; +import org.apache.avro.mapred.SequenceFileReader; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -38,6 +46,9 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.sound.midi.Sequence; +import java.io.IOException; + /** * @author George Song (ysong1) */ @@ -51,11 +62,14 @@ public class CubeHFileJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); + Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -82,8 +96,14 @@ public class CubeHFileJob extends AbstractHadoopJob { String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); HTable htable = new HTable(conf, tableName); - //Automatic config ! + // Automatic config ! HFileOutputFormat.configureIncrementalLoad(job, htable); + // Replace the partition file of table splits by hfile splits + Path hfilePartitionPath = new Path(partitionFilePath + "_hfile"); + int partitionCount = createPartitionFiles(job.getConfiguration(), partitionFilePath, hfilePartitionPath); + TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), hfilePartitionPath); + // The reduce tasks should be one more than partition keys + job.setNumReduceTasks(partitionCount+1); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -101,6 +121,31 @@ public class CubeHFileJob extends AbstractHadoopJob { } } + private int createPartitionFiles(Configuration conf, Path path, Path partitionPath) throws IOException { + int count = 0; + SequenceFile.Reader reader = null; + SequenceFile.Writer writer = null; + try { + reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf); + Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + writer = new SequenceFile.Writer(partitionPath.getFileSystem(conf), conf, partitionPath, + ImmutableBytesWritable.class, NullWritable.class); + while (reader.next(key, value)) { + writer.append(new ImmutableBytesWritable(((Text) key).copyBytes()), NullWritable.get()); + count++; + } + } finally { + if (reader != null) { + reader.close(); + } + if (writer != null) { + writer.close(); + } + } + return count; + } + public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CubeHFileJob(), args); System.exit(exitCode); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java index 9c50122..b78db6a 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java @@ -92,10 +92,12 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); + int hFilePerRegion = KylinConfig.getInstanceFromEnv().getHBaseHFilePerRegion(); DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity(); int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); + job.getConfiguration().set(BatchConstants.HFILE_PER_REGION, String.valueOf(hFilePerRegion)); job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java index b3ab4db..a107890 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java @@ -46,6 +46,7 @@ public class RangeKeyDistributionReducer extends KylinReducer gbPoints = new ArrayList(); @@ -53,6 +54,10 @@ public class RangeKeyDistributionReducer extends KylinReducer= hfilePerRegion) { + rowkeyList.add(((Text)key).copyBytes()); + splitKeyCount = 0; + } } } catch (Exception e) { e.printStackTrace(); diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java index 365a0d8..001bc01 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/CreateHTableTest.java @@ -57,7 +57,7 @@ public class CreateHTableTest extends LocalFileMetadataTestCase { String input = "src/test/resources/partition_list/part-r-00000"; - byte[][] splits = c.getSplits(conf, new Path(input)); + byte[][] splits = c.getSplits(conf, new Path(input), 1); assertEquals(497, splits.length); assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]); -- 2.3.2 (Apple Git-55)