From 418bc39ece1675f92fea8d744307692b82aed737 Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 15 Aug 2018 17:40:54 +0800 Subject: [PATCH] KYLIN-3491 add a shrunken global dictionary step to improve the encoding process --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/cube/model/CubeDesc.java | 67 +++++---- .../org/apache/kylin/dict/ShrunkenDictionary.java | 159 +++++++++++++++++++++ .../kylin/dict/ShrunkenDictionaryBuilder.java | 49 +++++++ .../apache/kylin/dict/ShrunkenDictionaryTest.java | 77 ++++++++++ .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 10 ++ .../java/org/apache/kylin/engine/mr/IMRInput.java | 4 + .../apache/kylin/engine/mr/JobBuilderSupport.java | 22 +++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 3 + .../kylin/engine/mr/common/BaseCuboidBuilder.java | 6 +- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/common/DictionaryGetterUtil.java | 76 ++++++++++ .../engine/mr/steps/BaseCuboidMapperBase.java | 10 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 5 + .../mr/steps/ExtractDictionaryFromGlobalJob.java | 107 ++++++++++++++ .../steps/ExtractDictionaryFromGlobalMapper.java | 141 ++++++++++++++++++ .../kylin/engine/mr/steps/InMemCuboidJob.java | 5 + .../engine/mr/steps/InMemCuboidMapperBase.java | 15 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 9 ++ .../apache/kylin/source/kafka/KafkaMRInput.java | 7 + .../kylin/storage/hbase/steps/HBaseJobSteps.java | 1 + 22 files changed, 731 insertions(+), 48 deletions(-) create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java create mode 100644 core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index dbf22b5..c7da5a7 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -426,6 +426,10 @@ abstract public class KylinConfigBase implements Serializable { return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); } + public boolean isShrunkenDictFromGlobalEnabled() { + return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", "false")); + } + // ============================================================================ // CUBE // ============================================================================ diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 5b4a134..b72f707 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -18,16 +18,28 @@ package org.apache.kylin.cube.model; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.lang.reflect.Method; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -65,27 +77,16 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Method; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeSet; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** */ @@ -1469,6 +1470,10 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } return globalDictCols; } + + public boolean isShrunkenDictFromGlobalEnabled() { + return config.isShrunkenDictFromGlobalEnabled() && !getAllGlobalDictColumns().isEmpty(); + } // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns public List getAllUHCColumns() { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java new file mode 100644 index 0000000..35c995e --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +public class ShrunkenDictionary extends Dictionary { + + private ImmutableMap valueToIdMap; + private ImmutableMap idToValueMap; + + private int minId; + private int maxId; + private int sizeOfId; + private int sizeOfValue; + + private ValueSerializer valueSerializer; + + public ShrunkenDictionary(ValueSerializer valueSerializer) { // default constructor for Writable interface + this.valueSerializer = valueSerializer; + } + + public ShrunkenDictionary(ValueSerializer valueSerializer, int minId, int maxId, int sizeOfId, int sizeOfValue, + Map valueToIdMap) { + this.valueSerializer = valueSerializer; + + this.minId = minId; + this.maxId = maxId; + this.sizeOfId = sizeOfId; + this.sizeOfValue = sizeOfValue; + + Preconditions.checkNotNull(valueToIdMap); + this.valueToIdMap = ImmutableMap. builder().putAll(valueToIdMap).build(); + } + + @Override + public int getMinId() { + return minId; + } + + @Override + public int getMaxId() { + return maxId; + } + + @Override + public int getSizeOfId() { + return sizeOfId; + } + + @Override + public int getSizeOfValue() { + return sizeOfValue; + } + + @Override + public boolean contains(Dictionary another) { + return false; + } + + protected int getIdFromValueImpl(T value, int roundingFlag) { + Integer id = valueToIdMap.get(value); + if (id == null) { + return -1; + } + return id; + } + + protected T getValueFromIdImpl(int id) { + if (idToValueMap == null) { + idToValueMap = buildIdToValueMap(); + } + return idToValueMap.get(id); + } + + private ImmutableMap buildIdToValueMap() { + ImmutableMap.Builder idToValueMapBuilder = ImmutableMap.builder(); + for (T value : valueToIdMap.keySet()) { + idToValueMapBuilder.put(valueToIdMap.get(value), value); + } + return idToValueMapBuilder.build(); + } + + public void dump(PrintStream out) { + out.println(String.format("Total %d values for ShrunkenDictionary", valueToIdMap.size())); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(minId); + out.writeInt(maxId); + out.writeInt(sizeOfId); + out.writeInt(sizeOfValue); + + out.writeInt(valueToIdMap.size()); + for (T value : valueToIdMap.keySet()) { + valueSerializer.writeValue(out, value); + out.writeInt(valueToIdMap.get(value)); + } + } + + public void readFields(DataInput in) throws IOException { + this.minId = in.readInt(); + this.maxId = in.readInt(); + this.sizeOfId = in.readInt(); + this.sizeOfValue = in.readInt(); + + int sizeValueMap = in.readInt(); + ImmutableMap.Builder valueToIdMapBuilder = ImmutableMap.builder(); + for (int i = 0; i < sizeValueMap; i++) { + T value = valueSerializer.readValue(in); + int id = in.readInt(); + valueToIdMapBuilder.put(value, id); + } + this.valueToIdMap = valueToIdMapBuilder.build(); + } + + public interface ValueSerializer { + void writeValue(DataOutput out, T value) throws IOException; + + T readValue(DataInput in) throws IOException; + } + + public static class StringValueSerializer implements ValueSerializer { + @Override + public void writeValue(DataOutput out, String value) throws IOException { + out.writeUTF(value); + } + + @Override + public String readValue(DataInput in) throws IOException { + return in.readUTF(); + } + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java new file mode 100644 index 0000000..ab3df5e --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.dict.ShrunkenDictionary.ValueSerializer; + +import com.google.common.collect.Maps; + +public class ShrunkenDictionaryBuilder { + + private Map valueToIdMap; + + private Dictionary fullDict; + + public ShrunkenDictionaryBuilder(Dictionary fullDict) { + this.fullDict = fullDict; + + this.valueToIdMap = Maps.newHashMap(); + } + + public void addValue(T value) { + int id = fullDict.getIdFromValue(value); + valueToIdMap.put(value, id); + } + + public ShrunkenDictionary build(ValueSerializer valueSerializer) { + return new ShrunkenDictionary<>(valueSerializer, fullDict.getMinId(), fullDict.getMaxId(), + fullDict.getSizeOfId(), fullDict.getSizeOfValue(), valueToIdMap); + } +} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java new file mode 100644 index 0000000..7a86e5f --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.kylin.common.util.Dictionary; +import org.junit.Assert; +import org.junit.Test; + +public class ShrunkenDictionaryTest { + + @Test + public void testStringDictionary() { + ArrayList strList = new ArrayList(); + strList.add(""); + strList.add("part"); + strList.add("par"); + strList.add("partition"); + strList.add("party"); + strList.add("parties"); + strList.add("paint"); + + TrieDictionaryBuilder dictBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter()); + for (String str : strList) { + dictBuilder.addValue(str); + } + Dictionary dict = dictBuilder.build(0); + + ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); + ShrunkenDictionaryBuilder shrunkenDictBuilder = new ShrunkenDictionaryBuilder<>(dict); + for (int i = 0; i < strList.size(); i += 2) { + shrunkenDictBuilder.addValue(strList.get(i)); + } + Dictionary shrunkenDict = shrunkenDictBuilder.build(valueSerializer); + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + shrunkenDict.write(dos); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + + Dictionary dShrunkenDict = new ShrunkenDictionary<>(valueSerializer); + dShrunkenDict.readFields(dis); + + for (int i = 0; i < strList.size(); i += 2) { + String value = strList.get(i); + Assert.assertEquals(dict.getIdFromValue(value), dShrunkenDict.getIdFromValue(value)); + } + } catch (IOException e) { + } + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index c805f8a..560293c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -34,6 +34,7 @@ public final class ExecutableConstants { public static final String SOURCE_RECORDS_COUNT = "source_records_count"; public static final String SOURCE_RECORDS_SIZE = "source_records_size"; + public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = "Extract Dictionary from Global Dictionary"; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 5498365..1695a22 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -73,6 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { outputSide.addStepPhase2_BuildDictionary(result); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + result.addTask(createExtractDictionaryFromGlobalJob(jobId)); + } + // Phase 3: Build Cube addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute @@ -124,6 +128,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId)); + } cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(getInMemCuboidJob()); @@ -150,6 +157,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId)); + } baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index aca9853..f650321 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr; import java.util.Collection; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -50,6 +51,9 @@ public interface IMRInput { /** Parse a mapper input object into column values. */ public Collection parseMapperInput(Object mapperInput); + + /** Get the signature for the input split*/ + public String getInputSplitSignature(InputSplit inputSplit); } /** diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 649b4c3..57eb5ca 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -36,6 +36,7 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.ExtractDictionaryFromGlobalJob; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; import org.apache.kylin.engine.mr.steps.MergeDictionaryStep; import org.apache.kylin.engine.mr.steps.MergeStatisticsStep; @@ -175,6 +176,23 @@ public class JobBuilderSupport { return buildDictionaryStep; } + public MapReduceExecutable createExtractDictionaryFromGlobalJob(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL); + result.setMapReduceJobClass(ExtractDictionaryFromGlobalJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Extract_Dictionary_from_Global_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getShrunkenDictionaryPath(jobId)); + + result.setMapReduceParams(cmd.toString()); + return result; + } + public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); @@ -291,6 +309,10 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS; } + public String getShrunkenDictionaryPath(String jobId) { + return getRealizationRootPath(jobId) + "/dictionary_shrunken"; + } + public String getDictRootPath(String jobId) { return getRealizationRootPath(jobId) + "/dict"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 329dd56..7b507fb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -112,6 +112,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME) .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); + protected static final Option OPTION_DICTIONARY_SHRUNKEN_PATH = OptionBuilder + .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false) + .withDescription("Dictionary shrunken path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH); protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT) .hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 5dd55b2..13bc688 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -74,18 +74,18 @@ public class BaseCuboidBuilder implements java.io.Serializable { measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); } - public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) { + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, + CubeJoinedFlatTableEnrich intermediateTableDesc, Map> dictionaryMap) { this.kylinConfig = kylinConfig; this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; this.intermediateTableDesc = intermediateTableDesc; + this.dictionaryMap = dictionaryMap; init(); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - dictionaryMap = cubeSegment.buildDictionaryMap(); - } private void init() { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index a4a52ad..bbf0d63 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -104,6 +104,7 @@ public interface BatchConstants { String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID"; String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots"; String ARG_META_URL = "metadataUrl"; + String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java new file mode 100644 index 0000000..0895244 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.ShrunkenDictionary; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DictionaryGetterUtil { + + private static final Logger logger = LoggerFactory.getLogger(DictionaryGetterUtil.class); + + public static String getInputSplitSignature(CubeSegment cubeSegment, InputSplit inputSplit) { + return MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat().getInputSplitSignature(inputSplit); + } + + public static Map> getDictionaryMap(CubeSegment cubeSegment, InputSplit inputSplit, + Configuration configuration) throws IOException { + Map> dictionaryMap = cubeSegment.buildDictionaryMap(); + + String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH); + if (shrunkenDictPath == null) { + return dictionaryMap; + } + + // replace global dictionary with shrunken dictionary if possible + String inputSplitSignature = getInputSplitSignature(cubeSegment, inputSplit); + FileSystem fs = FileSystem.get(configuration); + ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); + for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumns()) { + Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity()); + Path colShrunkenDictPath = new Path(colShrunkenDictDir, inputSplitSignature); + if (!fs.exists(colShrunkenDictPath)) { + logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split " + + inputSplitSignature + " does not exist!!!"); + continue; + } + try (DataInputStream dis = fs.open(colShrunkenDictPath)) { + Dictionary shrunkenDict = new ShrunkenDictionary(valueSerializer); + shrunkenDict.readFields(dis); + + dictionaryMap.put(colRef, shrunkenDict); + } + } + + return dictionaryMap; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 091f9a2..b5dc961 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -35,6 +37,8 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +70,12 @@ abstract public class BaseCuboidMapperBase extends KylinMapper> dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, + context.getInputSplit(), context.getConfiguration()); + + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, + dictionaryMap); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index b49b639..d7da2c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -93,6 +93,7 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_NCUBOID_LEVEL); options.addOption(OPTION_CUBING_JOB_ID); options.addOption(OPTION_CUBOID_MODE); + options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH); parseOptions(options, args); String output = getOptionValue(OPTION_OUTPUT_PATH); @@ -118,6 +119,10 @@ public class CuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); + String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH); + if (shrunkenDictPath != null) { + job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath); + } logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java new file mode 100644 index 0000000..df61ca9 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Directly using global dictionary to encode values will bring lots of memory swapping of the slices, which will make + * the encoding process very slow. This job will change the encoding process for the raw column values to + * 1. For each data block, a mapper will generating distinct values, sort them, extract shrunken dictionary from global + * 2. For each data block, scan again to encode the raw values by the shrunken dictionary rather than the global one + */ +public class ExtractDictionaryFromGlobalJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(ExtractDictionaryFromGlobalJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); + + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + + logger.info("Starting: " + job.getJobName()); + + job.getConfiguration().set("mapreduce.map.speculative", "false"); + setJobClasspath(job, cube.getConfig()); + + // Mapper + job.setMapperClass(ExtractDictionaryFromGlobalMapper.class); + + // Reducer + job.setNumReduceTasks(0); + + // Input + IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment) + .getFlatTableInputFormat(); + flatTableInputFormat.configureJob(job); + // Output + //// prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + FileOutputFormat.setOutputPath(job, output); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + deletePath(job.getConfiguration(), output); + + attachSegmentMetadataWithDict(segment, job.getConfiguration()); + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java new file mode 100644 index 0000000..34a5ec7 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.dict.ShrunkenDictionary; +import org.apache.kylin.dict.ShrunkenDictionaryBuilder; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class ExtractDictionaryFromGlobalMapper extends KylinMapper { + private String cubeName; + private CubeDesc cubeDesc; + private CubeInstance cube; + private CubeSegment cubeSeg; + + private IMRInput.IMRTableInputFormat flatTableInputFormat; + private CubeJoinedFlatTableEnrich intermediateTableDesc; + + private List globalColumns; + private int[] globalColumnIndex; + private List> globalColumnValues; + private List> globalDicts; + + private String splitKey; + + @Override + protected void doSetup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); + flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); + + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); + + globalColumns = cubeDesc.getAllGlobalDictColumns(); + globalColumnIndex = new int[globalColumns.size()]; + globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size()); + globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size()); + for (int i = 0; i < globalColumns.size(); i++) { + TblColRef colRef = globalColumns.get(i); + int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); + globalColumnIndex[i] = columnIndexOnFlatTbl; + + globalColumnValues.add(Sets. newHashSet()); + globalDicts.add(cubeSeg.getDictionary(colRef)); + } + + splitKey = DictionaryGetterUtil.getInputSplitSignature(cubeSeg, context.getInputSplit()); + } + + @Override + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + Collection rowCollection = flatTableInputFormat.parseMapperInput(record); + + for (String[] row : rowCollection) { + for (int i = 0; i < globalColumnIndex.length; i++) { + String fieldValue = row[globalColumnIndex[i]]; + if (fieldValue == null) + continue; + + globalColumnValues.get(i).add(fieldValue); + } + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(context.getConfiguration()); + Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR)); + + ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer(); + for (int i = 0; i < globalColumns.size(); i++) { + List colDistinctValues = Lists.newArrayList(globalColumnValues.get(i)); + // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices + Collections.sort(colDistinctValues); + + ShrunkenDictionaryBuilder dictBuilder = new ShrunkenDictionaryBuilder<>(globalDicts.get(i)); + for (String colValue : colDistinctValues) { + dictBuilder.addValue(colValue); + } + Dictionary shrunkenDict = dictBuilder.build(strValueSerializer); + + Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity()); + if (!fs.exists(colDictDir)) { + fs.mkdirs(colDictDir); + } + try (DataOutputStream dos = fs.create(new Path(colDictDir, splitKey))) { + shrunkenDict.write(dos); + } + } + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index b0ea7b7..f8874fe 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -70,6 +70,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH); parseOptions(options, args); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -88,6 +89,10 @@ public class InMemCuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); + String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH); + if (shrunkenDictPath != null) { + job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath); + } logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java index 73af138..e95ce8a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java @@ -42,13 +42,12 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** */ public abstract class InMemCuboidMapperBase extends KylinMapper { @@ -94,17 +93,7 @@ public abstract class InMemCuboidMapperBase cubeSegment = cube.getSegmentById(segmentID); flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - dictionaryMap = Maps.newHashMap(); - - // dictionary - for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) { - Dictionary dict = cubeSegment.getDictionary(col); - if (dict == null) { - logger.warn("Dictionary for " + col + " was not found."); - } - - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } + dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, context.getInputSplit(), conf); // check memory more often if a single row is big if (cubeDesc.hasMemoryHungryMeasures()) { diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 2e39285..33b1059 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.HCatSplit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; @@ -100,6 +103,12 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { return Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput)); } + @Override + public String getInputSplitSignature(InputSplit inputSplit) { + FileSplit baseSplit = (FileSplit) ((HCatSplit) inputSplit).getBaseSplit(); + //file name(for intermediate table) + start pos + length + return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength(); + } } public static class BatchCubingInputSide implements IMRBatchCubingInputSide { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index a45cc63..2c95c1c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -24,6 +24,8 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -100,6 +102,11 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput { return Collections.singletonList(columns); } + @Override + public String getInputSplitSignature(InputSplit inputSplit) { + FileSplit baseSplit = (FileSplit) inputSplit; + return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength(); + } } public static class BatchCubingInputSide implements IMRBatchCubingInputSide { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java index 4fda139..105f9dc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java @@ -240,6 +240,7 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { List toDeletePaths = new ArrayList<>(); toDeletePaths.add(getFactDistinctColumnsPath(jobId)); toDeletePaths.add(getHFilePath(jobId)); + toDeletePaths.add(getShrunkenDictionaryPath(jobId)); HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE); -- 2.5.4 (Apple Git-61)