diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..e8e53d1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -829,6 +829,11 @@ "If the number of keys in a dictionary is greater than this fraction of the total number of\n" + "non-null rows, turn off dictionary encoding. Use 1 to always use dictionary encoding."), HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000, "Define the default ORC index stride"), + HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS("hive.exec.orc.dictionary.check.after.rows", 10000L, + "The number of rows after which we decide whether to use dictionary encoding or not. After\n" + + "adding these many rows, if the ratio of number of keys in dictionary to total number of non-null\n" + + "rows is greater than hive.exec.orc.dictionary.key.size.threshold value then dictionary encoding will\n" + + "be disabled for the entire column."), HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, "Define the default ORC buffer size"), HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true, "Define the default block padding"), HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 76b4d03..22f60b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -96,9 +96,6 @@ private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; - // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB - private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024; - // threshold above which buffer size will be automatically resized private static final int COLUMN_COUNT_THRESHOLD = 1000; @@ -135,8 +132,6 @@ new TreeMap(); private final StreamFactory streamFactory = new StreamFactory(); private final TreeWriter treeWriter; - private final OrcProto.RowIndex.Builder rowIndex = - OrcProto.RowIndex.newBuilder(); private final boolean buildIndex; private final MemoryManager memoryManager; private final OrcFile.Version version; @@ -678,7 +673,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, if (rowIndexStream != null) { if (rowIndex.getEntryCount() != requiredIndexEntries) { throw new IllegalArgumentException("Column has wrong number of " + - "index entries found: " + rowIndexEntry + " expected: " + + "index entries found: " + rowIndex.getEntryCount() + " expected: " + requiredIndexEntries); } rowIndex.build().writeTo(rowIndexStream); @@ -1005,6 +1000,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final float dictionaryKeySizeThreshold; private boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; + private final int rowIndexStride; + private boolean doneDictionaryCheck; + private final long dictionaryCheckAfterRows; + private long rowsAdded; StringTreeWriter(int columnId, ObjectInspector inspector, @@ -1028,6 +1027,13 @@ void recordPosition(PositionRecorder recorder) throws IOException { HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD. defaultFloatVal); + rowIndexStride = writer.getRowIndexStride(); + dictionaryCheckAfterRows = writer.getConfiguration().getLong( + HiveConf.ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS.varname, + HiveConf.ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS. + defaultLongVal); + doneDictionaryCheck = false; + rowsAdded = 0; } /** @@ -1045,21 +1051,81 @@ void write(Object obj) throws IOException { super.write(obj); if (obj != null) { Text val = getTextValue(obj); - rows.add(dictionary.add(val)); + if (useDictionaryEncoding) { + rows.add(dictionary.add(val)); + // after adding HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS number of rows, + // check if dictionary encoding is required or not + if (rowsAdded == dictionaryCheckAfterRows && + !checkDictionaryEncoding()) { + flushDictionary(); + } + } else { + // record the start positions of the direct streams + if (rowIndexStride > 0 && rowsAdded % rowIndexStride == 0) { + directStreamOutput.getPosition(rowIndexPosition); + directLengthOutput.getPosition(rowIndexPosition); + } + + // write data and length + directStreamOutput.write(val.getBytes(), 0, val.getLength()); + directLengthOutput.write(val.getLength()); + } indexStatistics.updateString(val); } + rowsAdded++; + } + + private boolean checkDictionaryEncoding() { + if (!doneDictionaryCheck) { + // Set the flag indicating whether or not to use dictionary encoding + // based on whether or not the fraction of distinct keys over number of + // non-null rows is less than the configured threshold + float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f; + useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold; + doneDictionaryCheck = true; + } + return useDictionaryEncoding; } @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { - // Set the flag indicating whether or not to use dictionary encoding - // based on whether or not the fraction of distinct keys over number of - // non-null rows is less than the configured threshold - useDictionaryEncoding = - (!isDirectV2) || (rows.size() > 0 && - (float)(dictionary.size()) / rows.size() <= - dictionaryKeySizeThreshold); + // if rows in stripe is less than dictionaryCheckAfterRows, dictionary + // checking would not have happened. So do it again here. + checkDictionaryEncoding(); + + if (useDictionaryEncoding) { + flushDictionary(); + } + + if (!useDictionaryEncoding) { + // flushout any left over entries from dictionary + if (rows.size() > 0) { + flushDictionary(); + } + + // suppress the stream for every stripe if dictionary is disabled + stringOutput.suppress(); + } + + // we need to build the rowindex before calling super, since it + // writes it out. + super.writeStripe(builder, requiredIndexEntries); + stringOutput.flush(); + lengthOutput.flush(); + rowOutput.flush(); + directStreamOutput.flush(); + directLengthOutput.flush(); + // reset all of the fields to be ready for the next stripe. + dictionary.clear(); + savedRowIndex.clear(); + rowIndexValueCount.clear(); + recordPosition(rowIndexPosition); + rowIndexValueCount.add(0L); + rowsAdded = 0; + } + + private void flushDictionary() throws IOException { final int[] dumpOrder = new int[dictionary.size()]; if (useDictionaryEncoding) { @@ -1113,21 +1179,7 @@ public void visit(StringRedBlackTree.VisitorContext context } } } - // we need to build the rowindex before calling super, since it - // writes it out. - super.writeStripe(builder, requiredIndexEntries); - stringOutput.flush(); - lengthOutput.flush(); - rowOutput.flush(); - directStreamOutput.flush(); - directLengthOutput.flush(); - // reset all of the fields to be ready for the next stripe. - dictionary.clear(); rows.clear(); - savedRowIndex.clear(); - rowIndexValueCount.clear(); - recordPosition(rowIndexPosition); - rowIndexValueCount.add(0L); } @Override @@ -1165,10 +1217,18 @@ void createRowIndexEntry() throws IOException { OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry(); rowIndexEntry.setStatistics(indexStatistics.serialize()); indexStatistics.reset(); - savedRowIndex.add(rowIndexEntry.build()); + OrcProto.RowIndexEntry base = rowIndexEntry.build(); + savedRowIndex.add(base); rowIndexEntry.clear(); recordPosition(rowIndexPosition); - rowIndexValueCount.add(Long.valueOf(rows.size())); + if (useDictionaryEncoding) { + rowIndexValueCount.add(Long.valueOf(rows.size())); + } else { + // dictionary encoding disabled case. + // positions are already recorded by write() method. We now have all the + // information to create row index entry. + getRowIndex().addEntry(base); + } } @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index e7ea824..36471fe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -1684,7 +1685,7 @@ void addedRow() throws IOException { } @Test - public void testMemoryManagement() throws Exception { + public void testMemoryManagementV11() throws Exception { ObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = ObjectInspectorFactory.getReflectionObjectInspector @@ -1699,7 +1700,8 @@ public void testMemoryManagement() throws Exception { .stripeSize(50000) .bufferSize(100) .rowIndexStride(0) - .memory(memory)); + .memory(memory) + .version(Version.V_0_11)); assertEquals(testFilePath, memory.path); for(int i=0; i < 2500; ++i) { writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i))); @@ -1719,6 +1721,45 @@ public void testMemoryManagement() throws Exception { } @Test + public void testMemoryManagementV12() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (InnerStruct.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .inspector(inspector) + .compress(CompressionKind.NONE) + .stripeSize(50000) + .bufferSize(100) + .rowIndexStride(0) + .memory(memory) + .version(Version.V_0_12)); + assertEquals(testFilePath, memory.path); + for(int i=0; i < 2500; ++i) { + writer.addRow(new InnerStruct(i*300, Integer.toHexString(10*i))); + } + writer.close(); + assertEquals(null, memory.path); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + int i = 0; + for(StripeInformation stripe: reader.getStripes()) { + i += 1; + assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(), + stripe.getDataLength() < 5000); + } + // with HIVE-7832, the dictionaries will be disabled after writing the first + // stripe as there are too many distinct values. Hence only 3 stripes as + // compared to 25 stripes in version 0.11 (above test case) + assertEquals(3, i); + assertEquals(2500, reader.getNumberOfRows()); + } + + @Test public void testPredicatePushdown() throws Exception { ObjectInspector inspector; synchronized (TestOrcFile.class) { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java new file mode 100644 index 0000000..5eb984a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringDictionary.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestStringDictionary { + + Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + @Test + public void testTooManyDistinct() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(i))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(idx++)), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DIRECT_V2, encoding.getKind()); + } + } + } + + @Test + public void testHalfDistinct() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + Random rand = new Random(123); + int[] input = new int[20000]; + for (int i = 0; i < 20000; i++) { + input[i] = rand.nextInt(10000); + } + + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(input[i]))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(input[idx++])), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, encoding.getKind()); + } + } + } + + @Test + public void testTooManyDistinctOldWay() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // setting -1 value will push the dictionary check just before writing stripe + conf.setLong(ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS.varname, -1L); + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(i))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(idx++)), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DIRECT_V2, encoding.getKind()); + } + } + } + + @Test + public void testHalfDistinctOldWay() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // setting -1 value will push the dictionary check just before writing stripe + conf.setLong(ConfVars.HIVE_ORC_DICTIONARY_CHECK_AFTER_ROWS.varname, -1L); + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .bufferSize(10000)); + Random rand = new Random(123); + int[] input = new int[20000]; + for (int i = 0; i < 20000; i++) { + input[i] = rand.nextInt(10000); + } + + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(input[i]))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(input[idx++])), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, encoding.getKind()); + } + } + } + + @Test + public void testTooManyDistinctV11AlwaysDictionary() throws Exception { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Text.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.NONE) + .version(Version.V_0_11).bufferSize(10000)); + for (int i = 0; i < 20000; i++) { + writer.addRow(new Text(String.valueOf(i))); + } + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + int idx = 0; + while (rows.hasNext()) { + Object row = rows.next(null); + assertEquals(new Text(String.valueOf(idx++)), row); + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) rows).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + assertEquals(OrcProto.ColumnEncoding.Kind.DICTIONARY, encoding.getKind()); + } + } + + } + +}