diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..e8e53d1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -829,6 +829,11 @@ "If the number of keys in a dictionary is greater than this fraction of the total number of\n" + "non-null rows, turn off dictionary encoding. Use 1 to always use dictionary encoding."), HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000, "Define the default ORC index stride"), + HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS("hive.exec.orc.dictionary.check.after.rows", 10000L, + "The number of rows after which we decide whether to use dictionary encoding or not. After\n" + + "adding these many rows, if the ratio of number of keys in dictionary to total number of non-null\n" + + "rows is greater than hive.exec.orc.dictionary.key.size.threshold value then dictionary encoding will\n" + + "be disabled for the entire column."), HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, "Define the default ORC buffer size"), HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true, "Define the default block padding"), HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 76b4d03..5d423ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -96,9 +96,6 @@ private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; - // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB - private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024; - // threshold above which buffer size will be automatically resized private static final int COLUMN_COUNT_THRESHOLD = 1000; @@ -135,8 +132,6 @@ new TreeMap(); private final StreamFactory streamFactory = new StreamFactory(); private final TreeWriter treeWriter; - private final OrcProto.RowIndex.Builder rowIndex = - OrcProto.RowIndex.newBuilder(); private final boolean buildIndex; private final MemoryManager memoryManager; private final OrcFile.Version version; @@ -1005,6 +1000,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final float dictionaryKeySizeThreshold; private boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; + private final long dictionaryCheckAfterRows; + private long rowsAdded; StringTreeWriter(int columnId, ObjectInspector inspector, @@ -1028,6 +1025,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD. defaultFloatVal); + dictionaryCheckAfterRows = writer.getConfiguration().getLong( + HiveConf.ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS.varname, + HiveConf.ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS. + defaultLongVal); + rowsAdded = 0; } /** @@ -1044,22 +1046,48 @@ Text getTextValue(Object obj) { void write(Object obj) throws IOException { super.write(obj); if (obj != null) { + rowsAdded++; Text val = getTextValue(obj); - rows.add(dictionary.add(val)); indexStatistics.updateString(val); + if (useDictionaryEncoding) { + rows.add(dictionary.add(val)); + if (rowsAdded == dictionaryCheckAfterRows && !checkDictionaryEncoding()) { + flushDictionary(); + } + } else { + directStreamOutput.write(val.getBytes(), 0, val.getLength()); + directLengthOutput.write(val.getLength()); + } } } - @Override - void writeStripe(OrcProto.StripeFooter.Builder builder, - int requiredIndexEntries) throws IOException { + private void flushDictionary() throws IOException { + // dump dictionary entries to direct stream + Text text = new Text(); + // write the values translated into the dump order. + for (int i = 0; i < rows.size(); i++) { + dictionary.getText(text, rows.get(i)); + directStreamOutput.write(text.getBytes(), 0, text.getLength()); + directLengthOutput.write(text.getLength()); + } + + // after dumping dictionary clear off the entries + rows.clear(); + } + + private boolean checkDictionaryEncoding() { // Set the flag indicating whether or not to use dictionary encoding // based on whether or not the fraction of distinct keys over number of // non-null rows is less than the configured threshold - useDictionaryEncoding = - (!isDirectV2) || (rows.size() > 0 && - (float)(dictionary.size()) / rows.size() <= - dictionaryKeySizeThreshold); + float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f; + useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold; + + return useDictionaryEncoding; + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { final int[] dumpOrder = new int[dictionary.size()]; if (useDictionaryEncoding) { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java new file mode 100644 index 0000000..6d78c49 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestStringDictionary { + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testTooManyDistinct() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(i))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(idx++)), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DIRECT_V2, encoding.getKind()); + } + } + + } + + @Test + public void testHalfDistinct() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + Random rand = new Random(123); + int[] input = new int[20000]; + for (int i = 0; i < 20000; i++) { + input[i] = rand.nextInt(10000); + } + + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(input[i]))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(input[idx++])), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, encoding.getKind()); + } + } + + } + + @Test + public void testTooManyDistinctV11AlwaysDictionary() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .version(Version.V_0_11).bufferSize(10000)); + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(i))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(idx++)), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY, encoding.getKind()); + } + } + + } + +}