From 8160363c1ca8ded2df9dca73d928af57d1b22e02 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Sun, 28 Feb 2016 21:02:25 +0800 Subject: [PATCH] KYLIN-1323 Improve performance of converting data to hfile --- build/conf/kylin.properties | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 2 +- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/steps/RangeKeyDistributionJob.java | 115 ----------------- .../mr/steps/RangeKeyDistributionMapper.java | 71 ----------- .../mr/steps/RangeKeyDistributionReducer.java | 100 --------------- kylin-it/pom.xml | 3 + .../kylin/provision/BuildCubeWithEngine.java | 41 +++++++ .../kylin/storage/hbase/steps/CubeHFileJob.java | 37 +++++- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 4 +- .../hbase/steps/RangeKeyDistributionJob.java | 127 +++++++++++++++++++ .../hbase/steps/RangeKeyDistributionMapper.java | 76 ++++++++++++ .../hbase/steps/RangeKeyDistributionReducer.java | 136 +++++++++++++++++++++ .../hbase/util/HBaseRegionSizeCalculator.java | 8 ++ .../hbase/steps/RangeKeyDistributionJobTest.java | 1 - .../steps/RangeKeyDistributionMapperTest.java | 1 - .../steps/RangeKeyDistributionReducerTest.java | 1 - 18 files changed, 443 insertions(+), 293 deletions(-) delete mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java delete mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java delete mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 44a282e..b220b2d 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -65,6 +65,10 @@ kylin.hbase.region.cut.small=5 kylin.hbase.region.cut.medium=10 kylin.hbase.region.cut.large=50 +# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster +# set to 0 or comment this config to disable this optimization +kylin.hbase.hfile.size.gb=5 + # Enable/disable ACL check for cube query kylin.query.security.enabled=true diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5f9983a..4875177 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -415,6 +415,14 @@ public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500")); } + public void setHBaseHFileSizeGB(int size) { + setProperty("kylin.hbase.hfile.size.gb", String.valueOf(size)); + } + + public int getHBaseHFileSizeGB() { + return Integer.parseInt(getOptional("kylin.hbase.hfile.size.gb", "0")); + } + public int getScanThreshold() { return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000")); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 7615269..e4eee96 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat"); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level"); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input"); + protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions"); protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename"); protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled"); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 400a3aa..6943f18 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -42,6 +42,7 @@ public interface BatchConstants { String REGION_NUMBER_MIN = "region.number.min"; String REGION_NUMBER_MAX = "region.number.max"; String REGION_SPLIT_SIZE = "region.split.size"; + String HFILE_SIZE_GB = "hfile.size.gb"; String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/"; String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java deleted file mode 100644 index 5632fc1..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang, ysong1 - * - */ - -public class RangeKeyDistributionJob extends AbstractHadoopJob { - protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class); - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) - */ - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - - parseOptions(options, args); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - job = Job.getInstance(getConf(), jobName); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - // job.getConfiguration().set("dfs.block.size", "67108864"); - - // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(RangeKeyDistributionMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - - // Reducer - only one - job.setReducerClass(RangeKeyDistributionReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - - this.deletePath(job.getConfiguration(), output); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = cubeMgr.getCube(cubeName); - DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity(); - int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); - int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); - int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); - - job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); - - return waitForCompletion(job); - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args); - System.exit(exitCode); - } - -} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java deleted file mode 100644 index 47cbc95..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.engine.mr.KylinMapper; - -/** - * @author ysong1 - * - */ -public class RangeKeyDistributionMapper extends KylinMapper { - - private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L; - - private LongWritable outputValue = new LongWritable(0); - - private long bytesRead = 0; - - private Text lastKey; - - @Override - protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - lastKey = key; - - int bytesLength = key.getLength() + value.getLength(); - bytesRead += bytesLength; - - if (bytesRead >= ONE_MEGA_BYTES) { - outputValue.set(bytesRead); - context.write(key, outputValue); - - // reset bytesRead - bytesRead = 0; - } - - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - if (lastKey != null) { - outputValue.set(bytesRead); - context.write(lastKey, outputValue); - } - } - -} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java deleted file mode 100644 index 68be74e..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.StringUtils; -import org.apache.kylin.engine.mr.KylinReducer; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - * - */ -public class RangeKeyDistributionReducer extends KylinReducer { - - public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L; - private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class); - - private LongWritable outputValue = new LongWritable(0); - - private int minRegionCount = 1; - private int maxRegionCount = 500; - private int cut = 10; - private long bytesRead = 0; - private List gbPoints = new ArrayList(); - - @Override - protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); - - if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) { - cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE)); - } - - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) { - minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN)); - } - - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) { - maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX)); - } - - logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount); - } - - @Override - public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { - for (LongWritable v : values) { - bytesRead += v.get(); - } - - if (bytesRead >= ONE_GIGA_BYTES) { - gbPoints.add(new Text(key)); - bytesRead = 0; // reset bytesRead - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - int nRegion = Math.round((float) gbPoints.size() / (float) cut); - nRegion = Math.max(minRegionCount, nRegion); - nRegion = Math.min(maxRegionCount, nRegion); - - int gbPerRegion = gbPoints.size() / nRegion; - gbPerRegion = Math.max(1, gbPerRegion); - - System.out.println(nRegion + " regions"); - System.out.println(gbPerRegion + " GB per region"); - - for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) { - Text key = gbPoints.get(i); - outputValue.set(i); - System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); - context.write(key, outputValue); - } - } -} diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index 6cb44a5..99b650c 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -301,6 +301,7 @@ test java + -DuseSandbox=true -Dhdp.version=${hdp.version} -DfastBuildMode=${fastBuildMode} -classpath @@ -321,6 +322,7 @@ test java + -DuseSandbox=true -Dhdp.version=${hdp.version} -DfastBuildMode=${fastBuildMode} -classpath @@ -341,6 +343,7 @@ test java + -DuseSandbox=true -Dhdp.version=${hdp.version} -DfastBuildMode=${fastBuildMode} -classpath diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 28808df..cfefef3 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.text.SimpleDateFormat; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -36,11 +37,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -55,6 +59,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.metadata.model.IEngineAware; +import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -143,8 +149,10 @@ public class BuildCubeWithEngine { public void build() throws Exception { DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty"); + KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1); testInner(); testLeft(); + KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0); } protected void waitForJob(String jobId) { @@ -345,6 +353,9 @@ public class BuildCubeWithEngine { DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); + if (segment.getCubeDesc().getEngineType() == IEngineAware.ID_MR_V1) { + checkHFilesInHBase(segment); + } return job.getId(); } @@ -355,4 +366,34 @@ public class BuildCubeWithEngine { return exitCode; } + private void checkHFilesInHBase(CubeSegment segment) throws IOException { + Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + String tableName = segment.getStorageLocationIdentifier(); + HTable table = new HTable(conf, tableName); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); + Map sizeMap = cal.getRegionSizeMap(); + long totalSize = 0; + for (Long size : sizeMap.values()) { + totalSize += size; + } + if (totalSize == 0) { + return; + } + Map> countMap = cal.getRegionHFileCountMap(); + // check if there's region contains more than one hfile, which means the hfile config take effects + boolean hasMultiHFileRegions = false; + for (Pair count : countMap.values()) { + // check if hfile count is greater than store count + if (count.getSecond() > count.getFirst()) { + hasMultiHFileRegions = true; + break; + } + } + if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) { + throw new IOException("hfile size set to 0, but found region contains more than one hfiles"); + } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) { + throw new IOException("hfile size set greater than 0, but all regions still has only one hfile"); + } + } + } \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 1f0b1a0..a302daf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -18,17 +18,24 @@ package org.apache.kylin.storage.hbase.steps; +import java.io.IOException; + import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); + Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob { String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); HTable htable = new HTable(conf, tableName); - //Automatic config ! + // Automatic config ! HFileOutputFormat.configureIncrementalLoad(job, htable); + reconfigurePartitions(conf, partitionFilePath); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob { } } + /** + * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers + * @param conf the job configuration + * @param path the hfile partition file + * @throws IOException + */ + @SuppressWarnings("deprecation") + private void reconfigurePartitions(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) { + int partitionCount = 0; + Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + while (reader.next(key, value)) { + partitionCount++; + } + TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path); + // The reduce tasks should be one more than partition keys + job.setNumReduceTasks(partitionCount+1); + } + } + } + public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CubeHFileJob(), args); System.exit(exitCode); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index c3bd7b5..2a21640 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -12,7 +12,6 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob; import org.apache.kylin.storage.hbase.ii.IICreateHFileJob; import org.apache.kylin.storage.hbase.ii.IICreateHTableJob; -import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.realization.IRealizationSegment; @@ -72,7 +71,7 @@ public class HBaseMRSteps extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats)); createHtableStep.setJobParams(cmd.toString()); @@ -90,6 +89,7 @@ public class HBaseMRSteps extends JobBuilderSupport { appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); appendExecCmdParameters(cmd, "input", inputPath); appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java new file mode 100644 index 0000000..2ff7356 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author xjiang, ysong1 + * + */ + +public class RangeKeyDistributionJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class); + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) + */ + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + + parseOptions(options, args); + + // start job + String jobName = getOptionValue(OPTION_JOB_NAME); + job = Job.getInstance(getConf(), jobName); + + setJobClasspath(job); + + addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); + + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + FileOutputFormat.setOutputPath(job, output); + // job.getConfiguration().set("dfs.block.size", "67108864"); + + // Mapper + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(RangeKeyDistributionMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + + // Reducer - only one + job.setReducerClass(RangeKeyDistributionReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setNumReduceTasks(1); + + this.deletePath(job.getConfiguration(), output); + + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB(); + DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity(); + int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); + int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); + int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB)); + job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); + job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); + job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); + // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class); + + // Passed the sandbox property to mapper, to simulate large dataset + if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) { + job.getConfiguration().setBoolean("useSandbox", true); + } + + return waitForCompletion(job); + } catch (Exception e) { + printUsage(options); + throw e; + } + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args); + System.exit(exitCode); + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java new file mode 100644 index 0000000..6f2d2bc --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.engine.mr.KylinMapper; + +/** + * @author ysong1 + * + */ +public class RangeKeyDistributionMapper extends KylinMapper { + + private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L; + + private LongWritable outputValue = new LongWritable(0); + + private long bytesRead = 0; + + private Text lastKey; + + private Long scaleFactorForSandbox = 1L; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + if (context.getConfiguration().getBoolean("useSandbox", false)) { + scaleFactorForSandbox = 1024L; + } + } + + @Override + public void map(Text key, Text value, Context context) throws IOException, InterruptedException { + lastKey = key; + + int bytesLength = key.getLength() + value.getLength(); + bytesRead += bytesLength; + + if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) { + outputValue.set(bytesRead * scaleFactorForSandbox); + context.write(key, outputValue); + + // reset bytesRead + bytesRead = 0; + } + + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + if (lastKey != null) { + outputValue.set(bytesRead); + context.write(lastKey, outputValue); + } + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java new file mode 100644 index 0000000..acdab62 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.steps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringUtils; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ysong1 + * + */ +public class RangeKeyDistributionReducer extends KylinReducer { + + public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L; + private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class); + + private LongWritable outputValue = new LongWritable(0); + + private int minRegionCount = 1; + private int maxRegionCount = 500; + private int cut = 10; + private int hfileSizeGB = 1; + private long bytesRead = 0; + private List gbPoints = new ArrayList(); + private String output = null; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) { + output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH); + } + + if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) { + hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB)); + } + + if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) { + cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE)); + } + + if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) { + minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN)); + } + + if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) { + maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX)); + } + + logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB); + + // add empty key at position 0 + gbPoints.add(new Text()); + } + + @Override + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + for (LongWritable v : values) { + bytesRead += v.get(); + } + + if (bytesRead >= ONE_GIGA_BYTES) { + gbPoints.add(new Text(key)); + bytesRead = 0; // reset bytesRead + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + int nRegion = Math.round((float) gbPoints.size() / (float) cut); + nRegion = Math.max(minRegionCount, nRegion); + nRegion = Math.min(maxRegionCount, nRegion); + + int gbPerRegion = gbPoints.size() / nRegion; + gbPerRegion = Math.max(1, gbPerRegion); + + if (hfileSizeGB <= 0) { + hfileSizeGB = gbPerRegion; + } + int hfilePerRegion = gbPerRegion / hfileSizeGB; + hfilePerRegion = Math.max(1, hfilePerRegion); + + System.out.println(nRegion + " regions"); + System.out.println(gbPerRegion + " GB per region"); + System.out.println(hfilePerRegion + " hfile per region"); + + Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); + try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer( + hfilePartitionFile.getFileSystem(context.getConfiguration()), + context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) { + int hfileCountInOneRegion = 0; + for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { + hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); + if (++hfileCountInOneRegion >= hfilePerRegion) { + Text key = gbPoints.get(i); + outputValue.set(i); + System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); + context.write(key, outputValue); + + hfileCountInOneRegion = 0; + } + } + } + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java index ba0da00..346c3a2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.kylin.common.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,8 @@ public class HBaseRegionSizeCalculator { **/ private final Map sizeMap = new TreeMap(Bytes.BYTES_COMPARATOR); + private final Map> countMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable"; /** @@ -93,6 +96,7 @@ public class HBaseRegionSizeCalculator { long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte; sizeMap.put(regionId, regionSizeBytes); + countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles())); // logger.info("Region " + regionLoad.getNameAsString() // + " has size " + regionSizeBytes); @@ -125,4 +129,8 @@ public class HBaseRegionSizeCalculator { public Map getRegionSizeMap() { return Collections.unmodifiableMap(sizeMap); } + + public Map> getRegionHFileCountMap() { + return Collections.unmodifiableMap(countMap); + } } diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java index 7f5b24b..70e1ac7 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java index ca716c3..03a3cba 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java @@ -29,7 +29,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.apache.hadoop.mrunit.types.Pair; -import org.apache.kylin.engine.mr.steps.RangeKeyDistributionMapper; import org.junit.Before; import org.junit.Test; diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java index cbf0657..c027c40 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java @@ -24,7 +24,6 @@ import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; -import org.apache.kylin.engine.mr.steps.RangeKeyDistributionReducer; import org.junit.Before; import org.junit.Test; -- 2.3.2 (Apple Git-55)