From 801f900e0f80465e631ed2d36b10bc605d2192e4 Mon Sep 17 00:00:00 2001 From: Wang Ken Date: Wed, 13 Sep 2017 11:36:02 +0800 Subject: [PATCH 1/4] APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics calculation at step FactDistinctColumnsJob Signed-off-by: Zhong --- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../org/apache/kylin/common/util/HadoopUtil.java | 19 ++++ .../mr/steps/FactDistinctColumnPartitioner.java | 38 +++++++- .../engine/mr/steps/FactDistinctColumnsJob.java | 20 +++- .../mr/steps/FactDistinctColumnsReducer.java | 55 ++++++----- .../kylin/engine/mr/steps/SaveStatisticsStep.java | 107 ++++++++++++++++++++- 6 files changed, 215 insertions(+), 32 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 66805df..149ff4d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1023,6 +1023,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000")); } + public int getFactDistinctJobPerReducerHLLCuboidNumber() { + return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-per-reducer-hll-cuboid-number", "100")); + } + + public int getFactDistinctJobHLLMaxReducerNumber() { + return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-hll-max-reducer-number", "50")); + } + //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns public int getUHCReducerCount() { return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5")); diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index f242515..cafcaf2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -160,4 +160,23 @@ public class HadoopUtil { return null; } } + + public static Path[] getFilterPath(FileSystem fs, Path baseDir, final String filter) throws IOException { + if (fs.exists(baseDir) == false) { + return null; + } + + FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(filter); + } + }); + + Path[] result = new Path[fileStatus.length]; + for (int i = 0; i < fileStatus.length; i++) { + result[i] = fileStatus[i].getPath(); + } + return result; + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 5fcfe42..7ac5d02 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -18,25 +18,55 @@ package org.apache.kylin.engine.mr.steps; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ -public class FactDistinctColumnPartitioner extends Partitioner { +public class FactDistinctColumnPartitioner extends Partitioner implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnPartitioner.class); + + public static final String HLL_SHARD_BASE_PROPERTY_NAME = "mapreduce.partition.factdistinctcolumnpartitioner.hll.shard.base"; + + public static void setHLLShard(Configuration conf, int hllShardBase) { + conf.setInt(HLL_SHARD_BASE_PROPERTY_NAME, hllShardBase); + } + + private Configuration conf; + private int hllShardBase = 1; @Override public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) { Text key = skey.getText(); if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) { - // the last reducer is for merging hll - return numReduceTasks - 1; + // the last $hllShard reducers are for merging hll + Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); + int shard = cuboidId.hashCode() % hllShardBase; + if (shard < 0) { + shard += hllShardBase; + } + return numReduceTasks - shard - 1; } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) { // the last but one reducer is for partition col - return numReduceTasks - 2; + return numReduceTasks - hllShardBase - 1; } else { return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } } + + public void setConf(Configuration conf) { + this.conf = conf; + hllShardBase = conf.getInt(HLL_SHARD_BASE_PROPERTY_NAME, 1); + logger.info("shard base for hll is " + hllShardBase); + } + + public Configuration getConf() { + return conf; + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 08dadc9..dee384f 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } setupMapper(segment); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); + setupReducer(output, segment, statistics_enabled, reducerCount); attachCubeMetadata(cube, job.getConfiguration()); @@ -136,6 +136,15 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } + private int getHLLShardBase(CubeSegment segment) { + int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size(); + int shardBase = (nCuboids - 1) / segment.getConfig().getFactDistinctJobPerReducerHLLCuboidNumber() + 1; + if (shardBase > segment.getConfig().getFactDistinctJobHLLMaxReducerNumber()) { + shardBase = segment.getConfig().getFactDistinctJobHLLMaxReducerNumber(); + } + return shardBase; + } + private void setupMapper(CubeSegment cubeSeg) throws IOException { IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); @@ -146,7 +155,14 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setMapOutputValueClass(Text.class); } - private void setupReducer(Path output, int numberOfReducers) throws IOException { + private void setupReducer(Path output, CubeSegment cubeSeg, String statistics_enabled, int reducerCount) + throws IOException { + int numberOfReducers = reducerCount; + if ("true".equalsIgnoreCase(statistics_enabled)) { + int hllShardBase = getHLLShardBase(cubeSeg); + FactDistinctColumnPartitioner.setHLLShard(job.getConfiguration(), hllShardBase); + numberOfReducers += (1 + hllShardBase); + } job.setReducerClass(FactDistinctColumnsReducer.class); job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 0f65a3e..a733430 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -107,24 +107,38 @@ public class FactDistinctColumnsReducer extends KylinReducer= numberOfTasks - hllShardBase) { + // hll + isStatistics = true; + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + baseCuboidRowCountInMappers = Lists.newArrayList(); + cuboidHLLMap = Maps.newHashMap(); + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + logger.info("Reducer " + taskId + " handling stats"); + } else if (taskId == numberOfTasks - hllShardBase - 1) { + // partition col + isPartitionCol = true; + col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (col == null) { + logger.info("No partition col. This reducer will do nothing"); + } else { + logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); + } } else { - logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); + ifCol = true; } - } else { + } + if (ifCol) { // normal col col = columnList.get(reducerIdToColumnIndex.get(taskId)); Preconditions.checkNotNull(col); @@ -291,7 +305,7 @@ public class FactDistinctColumnsReducer extends KylinReducer allCuboids) throws IOException { - logger.info("Total cuboid number: \t" + allCuboids.size()); + logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size()); logger.info("Samping percentage: \t" + samplingPercentage); logger.info("The following statistics are collected based on sampling data."); logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size()); @@ -308,11 +322,8 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { - logger.info("The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal); - } + logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge); + logger.info("After merge, the row count: \t " + grantTotal); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 2196f09..f69bf67 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -19,25 +19,40 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.measure.hllc.HLLCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Save the cube segment statistic to Kylin metadata store */ @@ -56,14 +71,79 @@ public class SaveStatisticsStep extends AbstractExecutable { ResourceStore rs = ResourceStore.getStore(kylinConf); try { + + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams())); - FileSystem fs = HadoopUtil.getFileSystem(statisticsDir); - Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS); - if (statisticsFilePath == null) { + Path[] statisticsFiles = HadoopUtil.getFilterPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS); + if (statisticsFiles == null) { throw new IOException("fail to find the statistics file in base dir: " + statisticsDir); } - FSDataInputStream is = fs.open(statisticsFilePath); + Map cuboidHLLMap = Maps.newHashMap(); + long totalRowsBeforeMerge = 0; + long grantTotal = 0; + int samplingPercentage = -1; + int mapperNumber = -1; + for (Path item : statisticsFiles) { + int pSamplingPercentage = 0; + double pMapperOverlapRatio = 0; + int pMapperNumber = 0; + long pGrantTotal = 0; + try (SequenceFile.Reader reader = new SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) { + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), + hadoopConf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + pSamplingPercentage = Bytes.toInt(value.getBytes()); + } else if (key.get() == -1L) { + pMapperOverlapRatio = Bytes.toDouble(value.getBytes()); + } else if (key.get() == -2L) { + pMapperNumber = Bytes.toInt(value.getBytes()); + } else { + HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision()); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + cuboidHLLMap.put(key.get(), hll); + pGrantTotal += hll.getCountEstimate(); + } + } + totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio; + grantTotal += pGrantTotal; + if (pMapperNumber > 0) { + if (mapperNumber < 0) { + mapperNumber = pMapperNumber; + } else { + throw new RuntimeException( + "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); + } + } + if (samplingPercentage < 0) { + samplingPercentage = pSamplingPercentage; + } else if (samplingPercentage != pSamplingPercentage) { + throw new RuntimeException( + "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + } + } + } + if (samplingPercentage < 0) { + logger.warn("The sampling percentage should be set!!!"); + } + if (mapperNumber < 0) { + logger.warn("The mapper number should be set!!!"); + } + + if (logger.isDebugEnabled()) { + logMapperAndCuboidStatistics(cuboidHLLMap, samplingPercentage, mapperNumber, grantTotal, + totalRowsBeforeMerge); + } + double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, + mapperNumber, mapperOverlapRatio); + + Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store String statisticsFileName = newSegment.getStatisticsResourcePath(); @@ -84,4 +164,23 @@ public class SaveStatisticsStep extends AbstractExecutable { } } + private void logMapperAndCuboidStatistics(Map cuboidHLLMap, int samplingPercentage, + int mapperNumber, long grantTotal, long totalRowsBeforeMerge) throws IOException { + logger.debug("Total cuboid number: \t" + cuboidHLLMap.size()); + logger.debug("Samping percentage: \t" + samplingPercentage); + logger.debug("The following statistics are collected based on sampling data."); + logger.debug("Number of Mappers: " + mapperNumber); + + List allCuboids = Lists.newArrayList(cuboidHLLMap.keySet()); + Collections.sort(allCuboids); + for (long i : allCuboids) { + logger.debug("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate()); + } + + logger.debug("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge); + logger.debug("After merge, the cube has row count: \t " + grantTotal); + if (grantTotal > 0) { + logger.debug("The mapper overlap ratio is: \t" + (double) totalRowsBeforeMerge / grantTotal); + } + } } -- 2.5.4 (Apple Git-61) From ad6608fd332dedfa252d337a628372f21c1042a9 Mon Sep 17 00:00:00 2001 From: Zhong Date: Thu, 28 Sep 2017 19:12:07 +0800 Subject: [PATCH 2/4] APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics calculation at step CalculateStatsFromBaseCuboidJob --- .../kylin/engine/mr/common/CubeStatsWriter.java | 25 +++++++-- .../mr/steps/CalculateStatsFromBaseCuboidJob.java | 10 ++-- .../CalculateStatsFromBaseCuboidPartitioner.java | 59 ++++++++++++++++++++++ .../steps/CalculateStatsFromBaseCuboidReducer.java | 7 ++- .../mr/steps/MergeStatisticsWithOldStep.java | 29 +++++++---- 5 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index 8f400c3..b1e59a7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -20,7 +20,6 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,6 +34,8 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; +import com.google.common.collect.Lists; + public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // @@ -45,17 +46,32 @@ public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, + mapperOverlapRatio); + } + + public static void writePartialCuboidStatistics(Configuration conf, Path outputPath, // + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + int shard) throws IOException { + Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME + "_" + shard); + writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, + mapperOverlapRatio); + } - List allCuboids = new ArrayList(); + private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, // + Map cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) + throws IOException { + List allCuboids = Lists.newArrayList(); allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(outputFilePath), + SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); try { // mapper overlap ratio at key -1 writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); - + // mapper number at key -2 writer.append(new LongWritable(-2), new BytesWritable(Bytes.toBytes(mapperNumber))); @@ -72,5 +88,4 @@ public class CubeStatsWriter { IOUtils.closeQuietly(writer); } } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java index b60076c..8f64272 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java @@ -35,6 +35,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,7 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); setupMapper(input); - setupReducer(output, 1); + setupReducer(output, cubeSegment); attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration()); @@ -101,12 +102,15 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { job.setMapOutputValueClass(Text.class); } - private void setupReducer(Path output, int numberOfReducers) throws IOException { + private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException { + int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg); + job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase); + job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); - job.setNumReduceTasks(numberOfReducers); + job.setNumReduceTasks(hllShardBase); FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java new file mode 100644 index 0000000..70db21b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class CalculateStatsFromBaseCuboidPartitioner extends Partitioner implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class); + + private Configuration conf; + private int hllShardBase = 1; + + @Override + public int getPartition(Text key, Text value, int numReduceTasks) { + Long cuboidId = Bytes.toLong(key.getBytes()); + int shard = cuboidId.hashCode() % hllShardBase; + if (shard < 0) { + shard += hllShardBase; + } + return numReduceTasks - shard - 1; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1); + logger.info("shard base for hll is " + hllShardBase); + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java index 489dac4..7210622 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java @@ -55,6 +55,8 @@ public class CalculateStatsFromBaseCuboidReducer extends KylinReducer Date: Thu, 28 Sep 2017 17:49:42 +0800 Subject: [PATCH 3/4] APACHE-KYLIN-2866: move hll shard base config to BatchConstants --- .../main/java/org/apache/kylin/common/KylinConfigBase.java | 9 +++++---- .../org/apache/kylin/engine/mr/common/BatchConstants.java | 2 ++ .../org/apache/kylin/engine/mr/common/MapReduceUtil.java | 14 ++++++++++++++ .../engine/mr/steps/FactDistinctColumnPartitioner.java | 11 ++++------- .../kylin/engine/mr/steps/FactDistinctColumnsJob.java | 14 +++----------- .../kylin/engine/mr/steps/FactDistinctColumnsReducer.java | 8 ++++---- 6 files changed, 32 insertions(+), 26 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 149ff4d..72b0bd8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1023,12 +1023,13 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000")); } - public int getFactDistinctJobPerReducerHLLCuboidNumber() { - return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-per-reducer-hll-cuboid-number", "100")); + public int getHadoopJobPerReducerHLLCuboidNumber() { + return Integer.parseInt(getOptional("kylin.engine.mr.per-reducer-hll-cuboid-number", "100")); } - public int getFactDistinctJobHLLMaxReducerNumber() { - return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-hll-max-reducer-number", "50")); + public int getHadoopJobHLLMaxReducerNumber() { + // by default multi-reducer hll calculation is disabled + return Integer.parseInt(getOptional("kylin.engine.mr.hll-max-reducer-number", "1")); } //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 129c6dd..50c589a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -70,6 +70,8 @@ public interface BatchConstants { String CFG_SPARK_META_URL = "spark.meta.url"; String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir"; + String CFG_HLL_SHARD_BASE = "mapreduce.partition.hll.shard.base"; + /** * command line ARGuments */ diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java index 0379f64..b249f12 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java @@ -35,6 +35,20 @@ public class MapReduceUtil { private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class); /** + * @return reducer number for calculating hll + */ + public static int getHLLShardBase(CubeSegment segment) { + int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size(); + int shardBase = (nCuboids - 1) / segment.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1; + + int hllMaxReducerNumber = segment.getConfig().getHadoopJobHLLMaxReducerNumber(); + if (shardBase > hllMaxReducerNumber) { + shardBase = hllMaxReducerNumber; + } + return shardBase; + } + + /** * @param cuboidScheduler specified can provide more flexibility * */ public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 7ac5d02..141ca99 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +33,6 @@ import org.slf4j.LoggerFactory; public class FactDistinctColumnPartitioner extends Partitioner implements Configurable { private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnPartitioner.class); - public static final String HLL_SHARD_BASE_PROPERTY_NAME = "mapreduce.partition.factdistinctcolumnpartitioner.hll.shard.base"; - - public static void setHLLShard(Configuration conf, int hllShardBase) { - conf.setInt(HLL_SHARD_BASE_PROPERTY_NAME, hllShardBase); - } - private Configuration conf; private int hllShardBase = 1; @@ -60,12 +55,14 @@ public class FactDistinctColumnPartitioner extends Partitioner segment.getConfig().getFactDistinctJobHLLMaxReducerNumber()) { - shardBase = segment.getConfig().getFactDistinctJobHLLMaxReducerNumber(); - } - return shardBase; - } - private void setupMapper(CubeSegment cubeSeg) throws IOException { IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); @@ -159,8 +151,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { throws IOException { int numberOfReducers = reducerCount; if ("true".equalsIgnoreCase(statistics_enabled)) { - int hllShardBase = getHLLShardBase(cubeSeg); - FactDistinctColumnPartitioner.setHLLShard(job.getConfiguration(), hllShardBase); + int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg); + job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase); numberOfReducers += (1 + hllShardBase); } job.setReducerClass(FactDistinctColumnsReducer.class); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index a733430..37972c0 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -109,11 +109,11 @@ public class FactDistinctColumnsReducer extends KylinReducer= numberOfTasks - hllShardBase) { -- 2.5.4 (Apple Git-61) From 823e2f32f3b4a6b2035aded3026f0869b62e7c70 Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 20 Dec 2017 11:13:02 +0800 Subject: [PATCH 4/4] APACHE-KYLIN-2866: refine reading statistics file by using CubeStatsReader.CubeStatsResult --- .../kylin/engine/mr/common/CubeStatsReader.java | 44 ++++++++++----- .../kylin/engine/mr/steps/SaveStatisticsStep.java | 66 ++++++++-------------- 2 files changed, 52 insertions(+), 58 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 3d7d542..8b9b928 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -93,16 +93,15 @@ public class CubeStatsReader { File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); - CubeStatsResult cubeStatsResult = new CubeStatsResult(); - cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); tmpSeqFile.delete(); this.seg = cubeSegment; this.cuboidScheduler = cuboidScheduler; - this.samplingPercentage = cubeStatsResult.percentage; - this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber; - this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio; - this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap; + this.samplingPercentage = cubeStatsResult.getPercentage(); + this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); + this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); } /** @@ -117,15 +116,14 @@ public class CubeStatsReader { */ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path) throws IOException { - CubeStatsResult cubeStatsResult = new CubeStatsResult(); - cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); this.seg = cubeSegment; this.cuboidScheduler = cuboidScheduler; - this.samplingPercentage = cubeStatsResult.percentage; - this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber; - this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio; - this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap; + this.samplingPercentage = cubeStatsResult.getPercentage(); + this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); + this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -331,13 +329,13 @@ public class CubeStatsReader { return new DecimalFormat("#.##").format(input); } - private class CubeStatsResult { + public static class CubeStatsResult { private int percentage = 100; private double mapperOverlapRatio = 0; private int mapperNumber = 0; - Map counterMap = Maps.newHashMap(); + private Map counterMap = Maps.newHashMap(); - void initialize(Path path, int precision) throws IOException { + public CubeStatsResult(Path path, int precision) throws IOException { Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); Option seqInput = SequenceFile.Reader.file(path); try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) { @@ -359,6 +357,22 @@ public class CubeStatsReader { } } } + + public int getPercentage() { + return percentage; + } + + public double getMapperOverlapRatio() { + return mapperOverlapRatio; + } + + public int getMapperNumber() { + return mapperNumber; + } + + public Map getCounterMap() { + return Collections.unmodifiableMap(counterMap); + } } public static void main(String[] args) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index f69bf67..3ea92c4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -27,19 +27,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; @@ -86,46 +81,31 @@ public class SaveStatisticsStep extends AbstractExecutable { int samplingPercentage = -1; int mapperNumber = -1; for (Path item : statisticsFiles) { - int pSamplingPercentage = 0; - double pMapperOverlapRatio = 0; - int pMapperNumber = 0; - long pGrantTotal = 0; - try (SequenceFile.Reader reader = new SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) { - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), - hadoopConf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - pSamplingPercentage = Bytes.toInt(value.getBytes()); - } else if (key.get() == -1L) { - pMapperOverlapRatio = Bytes.toDouble(value.getBytes()); - } else if (key.get() == -2L) { - pMapperNumber = Bytes.toInt(value.getBytes()); - } else { - HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision()); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - cuboidHLLMap.put(key.get(), hll); - pGrantTotal += hll.getCountEstimate(); - } - } - totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio; - grantTotal += pGrantTotal; - if (pMapperNumber > 0) { - if (mapperNumber < 0) { - mapperNumber = pMapperNumber; - } else { - throw new RuntimeException( - "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); - } - } - if (samplingPercentage < 0) { - samplingPercentage = pSamplingPercentage; - } else if (samplingPercentage != pSamplingPercentage) { + CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item, + kylinConf.getCubeStatsHLLPrecision()); + cuboidHLLMap.putAll(cubeStatsResult.getCounterMap()); + long pGrantTotal = 0L; + for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) { + pGrantTotal += hll.getCountEstimate(); + } + totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio(); + grantTotal += pGrantTotal; + int pMapperNumber = cubeStatsResult.getMapperNumber(); + if (pMapperNumber > 0) { + if (mapperNumber < 0) { + mapperNumber = pMapperNumber; + } else { throw new RuntimeException( - "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); } } + int pSamplingPercentage = cubeStatsResult.getPercentage(); + if (samplingPercentage < 0) { + samplingPercentage = pSamplingPercentage; + } else if (samplingPercentage != pSamplingPercentage) { + throw new RuntimeException( + "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + } } if (samplingPercentage < 0) { logger.warn("The sampling percentage should be set!!!"); -- 2.5.4 (Apple Git-61)