From 054f8bb37e32a5467b4348f284e364592dde1e1f Mon Sep 17 00:00:00 2001 From: lidongsjtu Date: Fri, 30 Oct 2015 11:28:13 +0800 Subject: [PATCH] KYLIN-1099 Support dictionary of cardinality over 10 millions --- .../apache/kylin/job/BuildIIWithStreamTest.java | 2 +- .../org/apache/kylin/cube/util/CubingUtils.java | 7 +- .../cube/inmemcubing/InMemCubeBuilderTest.java | 5 +- .../org/apache/kylin/dict/DictionaryGenerator.java | 102 ++++++--------------- .../kylin/dict/IDictionaryValueEnumerator.java | 32 +++++++ .../dict/IterableDictionaryValueEnumerator.java | 47 ++++++++++ .../dict/MultipleDictionaryValueEnumerator.java | 77 ++++++++++++++++ .../kylin/dict/TableColumnValueEnumerator.java | 75 +++++++++++++++ .../apache/kylin/dict/NumberDictionaryTest.java | 8 +- .../engine/mr/steps/MergeCuboidMapperTest.java | 10 +- .../org/apache/kylin/engine/spark/SparkCubing.java | 6 +- .../invertedindex/streaming/SliceBuilder.java | 3 +- .../invertedindex/util/IIDictionaryBuilder.java | 6 +- .../invertedindex/InvertedIndexLocalTest.java | 8 +- 14 files changed, 286 insertions(+), 102 deletions(-) create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java index 61cf262..1699aad 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java @@ -231,7 +231,7 @@ public class BuildIIWithStreamTest { } } - private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) { + private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { final Slice slice = sliceBuilder.buildSlice(batch); try { loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index cd84a22..0cfd020 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -59,10 +59,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.*; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; import org.slf4j.Logger; @@ -176,7 +173,7 @@ public class CubingUtils { return input == null ? null : input.getBytes(); } }); - final Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes); + final Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes)); result.put(tblColRef, dict); } return result; diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java index 0604d32..e3fb30e 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java @@ -41,6 +41,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -177,7 +178,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { if (desc.getRowkey().isUseDictionary(col)) { logger.info("Building dictionary for " + col); List valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]); - Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); result.put(col, dict); } } @@ -191,7 +192,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1); logger.info("Building dictionary for " + displayCol); List valueList = readValueList(flatTable, nColumns, displayColIdx); - Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(displayCol.getType(), valueList); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(displayCol.getType(), new IterableDictionaryValueEnumerator(valueList)); result.put(displayCol, dict); } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index c0bff8a..b56dd27 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -46,7 +46,7 @@ public class DictionaryGenerator { private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class); - private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd" }; + private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; private static int getDictionaryMaxCardinality() { try { @@ -56,7 +56,7 @@ public class DictionaryGenerator { } } - public static Dictionary buildDictionaryFromValueList(DataType dataType, Iterable values) { + public static Dictionary buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException { Preconditions.checkNotNull(dataType, "dataType cannot be null"); Dictionary dict; int baseId = 0; // always 0 for now @@ -66,13 +66,13 @@ public class DictionaryGenerator { // build dict, case by data type if (dataType.isDateTimeFamily()) { if (dataType.isDate()) - dict = buildDateDict(values, baseId, nSamples, samples); + dict = buildDateDict(valueEnumerator, baseId, nSamples, samples); else dict = new TimeStrDictionary(); // base ID is always 0 } else if (dataType.isNumberFamily()) { - dict = buildNumberDict(values, baseId, nSamples, samples); + dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples); } else { - dict = buildStringDict(values, baseId, nSamples, samples); + dict = buildStringDict(valueEnumerator, baseId, nSamples, samples); } // log a few samples @@ -91,26 +91,8 @@ public class DictionaryGenerator { return dict; } - public static Dictionary mergeDictionaries(DataType dataType, List sourceDicts) { - - HashSet dedup = new HashSet(); - - for (DictionaryInfo info : sourceDicts) { - Dictionary dict = info.getDictionaryObject(); - int minkey = dict.getMinId(); - int maxkey = dict.getMaxId(); - byte[] buffer = new byte[dict.getSizeOfValue()]; - for (int i = minkey; i <= maxkey; ++i) { - int size = dict.getValueBytesFromId(i, buffer, 0); - dedup.add(Bytes.copy(buffer, 0, size)); - } - } - - List valueList = new ArrayList(); - valueList.addAll(dedup); - - Dictionary dict = buildDictionaryFromValueList(dataType, valueList); - return dict; + public static Dictionary mergeDictionaries(DataType dataType, List sourceDicts) throws IOException { + return buildDictionaryFromValueEnumerator(dataType, new MultipleDictionaryValueEnumerator(sourceDicts)); } public static Dictionary buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException { @@ -118,24 +100,29 @@ public class DictionaryGenerator { // currently all data types are casted to string to build dictionary // String dataType = info.getDataType(); - logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info)); - - ArrayList values = loadColumnValues(inpTable, info.getSourceColumnIndex()); - - Dictionary dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()), values); + IDictionaryValueEnumerator columnValueEnumerator = null; + try { + logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info)); - return dict; + columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex()); + return buildDictionaryFromValueEnumerator(DataType.getInstance(info.getDataType()), columnValueEnumerator); + } finally { + if (columnValueEnumerator != null) + columnValueEnumerator.close(); + } } - private static Dictionary buildDateDict(Iterable values, int baseId, int nSamples, ArrayList samples) { + private static Dictionary buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { final int BAD_THRESHOLD = 0; String matchPattern = null; + byte[] value; for (String ptn : DATE_PATTERNS) { matchPattern = ptn; // be optimistic int badCount = 0; SimpleDateFormat sdf = new SimpleDateFormat(ptn); - for (byte[] value : values) { + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value == null || value.length == 0) continue; @@ -161,9 +148,11 @@ public class DictionaryGenerator { throw new IllegalStateException("Unrecognized datetime value"); } - private static Dictionary buildStringDict(Iterable values, int baseId, int nSamples, ArrayList samples) { + private static Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new StringBytesConverter()); - for (byte[] value : values) { + byte[] value; + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value == null) continue; String v = Bytes.toString(value); @@ -174,9 +163,11 @@ public class DictionaryGenerator { return builder.build(baseId); } - private static Dictionary buildNumberDict(Iterable values, int baseId, int nSamples, ArrayList samples) { + private static Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException { NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new StringBytesConverter()); - for (byte[] value : values) { + byte[] value; + while (valueEnumerator.moveNext()) { + value = valueEnumerator.current(); if (value == null) continue; String v = Bytes.toString(value); @@ -189,41 +180,4 @@ public class DictionaryGenerator { } return builder.build(baseId); } - - static ArrayList loadColumnValues(ReadableTable inpTable, int colIndex) throws IOException { - - TableReader reader = inpTable.getReader(); - - try { - ArrayList result = Lists.newArrayList(); - HashSet dedup = new HashSet(); - - while (reader.next()) { - String[] split = reader.getRow(); - - String colValue; - // special single column file, e.g. common_indicator.txt - if (split.length == 1) { - colValue = split[0]; - } - // normal case - else { - if (split.length <= colIndex) { - throw new ArrayIndexOutOfBoundsException("Column no. " + colIndex + " not found, line split is " + Arrays.asList(split)); - } - colValue = split[colIndex]; - } - - if (dedup.contains(colValue) == false) { - dedup.add(colValue); - result.add(Bytes.toBytes(colValue)); - } - } - return result; - - } finally { - reader.close(); - } - } - } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java new file mode 100644 index 0000000..ecf980a --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.IOException; + +/** + * Created by dongli on 10/28/15. + */ +public interface IDictionaryValueEnumerator { + byte[] current() throws IOException; + + boolean moveNext() throws IOException; + + void close() throws IOException; +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java new file mode 100644 index 0000000..2f15eba --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Created by dongli on 10/28/15. + */ +public class IterableDictionaryValueEnumerator implements IDictionaryValueEnumerator { + Iterator iterator; + + public IterableDictionaryValueEnumerator(Iterable list) { + iterator = list.iterator(); + } + + @Override + public byte[] current() throws IOException { + return iterator.next(); + } + + @Override + public boolean moveNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public void close() throws IOException { + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java new file mode 100644 index 0000000..13f7394 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import com.google.common.collect.Lists; +import org.apache.kylin.common.util.Bytes; + +import java.io.IOException; +import java.util.List; + +/** + * Created by dongli on 10/28/15. + */ +@SuppressWarnings("rawtypes") +public class MultipleDictionaryValueEnumerator implements IDictionaryValueEnumerator { + private int curDictIndex = 0; + private Dictionary curDict; + private int curKey; + private byte[] curValue = null; + private List dictionaryList; + + public MultipleDictionaryValueEnumerator(List dictionaryInfoList) { + dictionaryList = Lists.newArrayListWithCapacity(dictionaryInfoList.size()); + for (DictionaryInfo dictInfo : dictionaryInfoList) { + dictionaryList.add(dictInfo.getDictionaryObject()); + } + if (!dictionaryList.isEmpty()) { + curDict = dictionaryList.get(0); + curKey = curDict.getMinId(); + } + } + + @Override + public byte[] current() throws IOException { + return curValue; + } + + @Override + public boolean moveNext() throws IOException { + if (curDictIndex < dictionaryList.size() && curKey <= curDict.getMaxId()) { + byte[] buffer = new byte[curDict.getSizeOfValue()]; + int size = curDict.getValueBytesFromId(curKey, buffer, 0); + curValue = Bytes.copy(buffer, 0, size); + + if (++curKey > curDict.getMaxId()) { + if (++curDictIndex < dictionaryList.size()) { + curDict = dictionaryList.get(curDictIndex); + curKey = curDict.getMinId(); + } + } + + return true; + } + curValue = null; + return false; + } + + @Override + public void close() throws IOException { + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java new file mode 100644 index 0000000..ab9a6ff --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.source.ReadableTable; + +import java.io.IOException; +import java.util.Arrays; + +/** + * Created by dongli on 10/29/15. + */ +public class TableColumnValueEnumerator implements IDictionaryValueEnumerator { + + private ReadableTable.TableReader reader; + private int colIndex; + private byte[] colValue; + + public TableColumnValueEnumerator(ReadableTable.TableReader reader, int colIndex) { + this.reader = reader; + this.colIndex = colIndex; + } + + @Override + public boolean moveNext() throws IOException { + if (reader.next()) { + String colStrValue; + String[] split = reader.getRow(); + if (split.length == 1) { + colStrValue = split[0]; + } else { + // normal case + if (split.length <= colIndex) { + throw new ArrayIndexOutOfBoundsException("Column no. " + colIndex + " not found, line split is " + Arrays.asList(split)); + } + colStrValue = split[colIndex]; + } + + colValue = Bytes.toBytes(colStrValue); + return true; + + } else { + colValue = null; + return false; + } + } + + @Override + public void close() throws IOException { + if (reader != null) + reader.close(); + } + + @Override + public byte[] current() { + return colValue; + } +} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java index ddfa01e..676170a 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java @@ -21,8 +21,8 @@ package org.apache.kylin.dict; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.io.IOException; import java.math.BigDecimal; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -46,15 +46,15 @@ public class NumberDictionaryTest { Random rand = new Random(); @Test - public void testEmptyInput() { + public void testEmptyInput() throws IOException{ String[] ints = new String[] { "", "0", "5", "100", "13" }; - Collection intBytes = new ArrayList(); + Collection intBytes = Lists.newArrayListWithCapacity(ints.length); for (String s : ints) { intBytes.add((s == null) ? null : Bytes.toBytes(s)); } // check "" is treated as NULL, not a code of dictionary - Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance("integer"), intBytes); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance("integer"), new IterableDictionaryValueEnumerator(intBytes)); assertEquals(4, dict.getSize()); final int id = ((NumberDictionary) dict).getIdFromValue(""); diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index e060e9e..2acc751 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -34,11 +34,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.TrieDictionary; +import org.apache.kylin.dict.*; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.TblColRef; @@ -75,7 +71,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { List values = new ArrayList(); values.add(new byte[] { 101, 101, 101 }); values.add(new byte[] { 102, 102, 102 }); - Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); ((TrieDictionary) dict).dump(System.out); @@ -127,7 +123,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { values.add(new byte[] { 99, 99, 99 }); else values.add(new byte[] { 98, 98, 98 }); - Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values); + Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); ((TrieDictionary) dict).dump(System.out); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 0e5081e..c1234e8 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -56,7 +56,7 @@ import org.apache.kylin.cube.model.*; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; @@ -165,7 +165,7 @@ public class SparkCubing extends AbstractApplication { final DataFrame frame = intermediateTable.select(column).distinct(); final Row[] rows = frame.collect(); - dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), new Iterable() { + dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable() { @Override public Iterator iterator() { return new Iterator() { @@ -193,7 +193,7 @@ public class SparkCubing extends AbstractApplication { } }; } - })); + }))); } final long end = System.currentTimeMillis(); CubingUtils.writeDictionary(cubeInstance.getSegmentById(segmentId), dictionaryMap, start, end); diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java index ba337c8..8828f1f 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; /** @@ -50,7 +51,7 @@ public final class SliceBuilder { this.useLocalDict = useLocalDict; } - public Slice buildSlice(StreamingBatch microStreamBatch) { + public Slice buildSlice(StreamingBatch microStreamBatch) throws IOException{ final List> messages = Lists.transform(microStreamBatch.getMessages(), new Function>() { @Nullable @Override diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java index 7fcbe53..2474750 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java @@ -34,6 +34,7 @@ package org.apache.kylin.invertedindex.util; +import java.io.IOException; import java.util.Collection; import java.util.List; @@ -41,6 +42,7 @@ import javax.annotation.Nullable; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -55,7 +57,7 @@ public final class IIDictionaryBuilder { private IIDictionaryBuilder() { } - public static Dictionary[] buildDictionary(List> table, IIDesc desc) { + public static Dictionary[] buildDictionary(List> table, IIDesc desc) throws IOException{ HashMultimap valueMap = HashMultimap.create(); final List allColumns = desc.listAllColumns(); for (List row : table) { @@ -76,7 +78,7 @@ public final class IIDictionaryBuilder { return input == null ? null : input.getBytes(); } }); - final Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes); + final Dictionary dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes)); result[desc.findColumn(tblColRef)] = dict; } return result; diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java index 9ebb650..bf3b50c 100644 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java @@ -37,6 +37,7 @@ import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.index.CompressedValueContainer; @@ -149,7 +150,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase { dump(recordsCopy); } - private Dictionary[] buildDictionary(List> table, IIDesc desc) { + private Dictionary[] buildDictionary(List> table, IIDesc desc) throws IOException{ SetMultimap valueMap = HashMultimap.create(); Set dimensionColumns = Sets.newHashSet(); for (int i = 0; i < desc.listAllColumns().size(); i++) { @@ -165,13 +166,14 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase { } Dictionary[] result = new Dictionary[desc.listAllColumns().size()]; for (TblColRef tblColRef : valueMap.keys()) { - result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), Collections2.transform(valueMap.get(tblColRef), new Function() { + result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), + new IterableDictionaryValueEnumerator(Collections2.transform(valueMap.get(tblColRef), new Function() { @Nullable @Override public byte[] apply(String input) { return input.getBytes(); } - })); + }))); } return result; } -- 2.3.8 (Apple Git-58)