diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 4e2bd6a..08f2f5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -27,6 +27,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT; import java.io.IOException; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -268,31 +269,69 @@ public static Reader createReader(Path path, private String bloomFilterColumns; private double bloomFilterFpp; - WriterOptions(Configuration conf) { + WriterOptions(Properties tableProperties, Configuration conf) { configuration = conf; memoryManagerValue = getMemoryManager(conf); - stripeSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE); - blockSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE); - rowIndexStrideValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE); - bufferSizeValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE); - blockPaddingValue = HiveConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING); - compressValue = CompressionKind.valueOf(HiveConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)); + + String propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.STRIPE_SIZE.propName); + stripeSizeValue = propValue == null ? HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE) + : Long.parseLong(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.BLOCK_SIZE.propName); + blockSizeValue = propValue == null ? HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE) + : Long.parseLong(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.ROW_INDEX_STRIDE.propName); + rowIndexStrideValue = propValue == null ? HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE) + : Integer.parseInt(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.ENABLE_INDEXES.propName); + if (propValue != null && !Boolean.parseBoolean(propValue)) { + rowIndexStrideValue = 0; + } + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.COMPRESSION_BLOCK_SIZE.propName); + bufferSizeValue = propValue == null ? HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE) + : Integer.parseInt(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.BLOCK_PADDING.propName); + blockPaddingValue = propValue == null ? HiveConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING) + : Boolean.parseBoolean(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.COMPRESSION.propName); + compressValue = propValue == null ? CompressionKind.valueOf(HiveConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)) + : CompressionKind.valueOf(propValue); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.BLOOM_FILTER_COLUMNS.propName); + bloomFilterColumns = propValue; + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.BLOOM_FILTER_FPP.propName); + bloomFilterFpp = propValue == null ? BloomFilterIO.DEFAULT_FPP : Double.parseDouble(propValue); + String versionName = HiveConf.getVar(conf, HIVE_ORC_WRITE_FORMAT); if (versionName == null) { versionValue = Version.CURRENT; } else { versionValue = Version.byName(versionName); } - String enString = - conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); - if (enString == null) { - encodingStrategy = EncodingStrategy.SPEED; - } else { - encodingStrategy = EncodingStrategy.valueOf(enString); + + propValue = tableProperties == null ? null + : tableProperties.getProperty(OrcTableProperties.ENCODING_STRATEGY.propName); + if (propValue == null) { + propValue = conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); } + encodingStrategy = propValue == null ? EncodingStrategy.SPEED : EncodingStrategy.valueOf(propValue); - String compString = conf - .get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); + String compString = conf.get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); if (compString == null) { compressionStrategy = CompressionStrategy.SPEED; } else { @@ -301,7 +340,6 @@ public static Reader createReader(Path path, paddingTolerance = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); - bloomFilterFpp = BloomFilterIO.DEFAULT_FPP; } /** @@ -444,7 +482,19 @@ WriterOptions memory(MemoryManager value) { * Create a default set of write options that can be modified. */ public static WriterOptions writerOptions(Configuration conf) { - return new WriterOptions(conf); + return new WriterOptions(null, conf); + } + + /** + * Create a set of write options based on a set of table properties and + * configuration. Properties takes precedence over configuration. + * @param tableProperties the properties of the table + * @param conf the configuration of the query + * @return a WriterOptions object that can be modified + */ + public static WriterOptions writerOptions(Properties tableProperties, + Configuration conf) { + return new WriterOptions(tableProperties, conf); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index bb93a92..d942640 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -253,7 +253,8 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions = ((OrcOptions) options).getOrcOptions(); } if (writerOptions == null) { - writerOptions = OrcFile.writerOptions(options.getConfiguration()); + writerOptions = OrcFile.writerOptions(options.getTableProperties(), + options.getConfiguration()); } writerOptions.fileSystem(fs).callback(indexBuilder); if (!options.isWritingBase()) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 22030b4..973cc40 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -18,6 +18,15 @@ package org.apache.hadoop.hive.ql.io.orc; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.PrintStream; +import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,12 +42,6 @@ import org.apache.hadoop.mapred.Reporter; import org.junit.Test; -import java.io.DataInputStream; -import java.io.File; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - public class TestOrcRecordUpdater { @Test @@ -180,6 +183,49 @@ public void testWriter() throws Exception { } @Test + public void testWriterTblProperties() throws Exception { + Path root = new Path(workDir, "testWriterTblProperties"); + Configuration conf = new Configuration(); + // Must use raw local because the checksummer doesn't honor flushes. + FileSystem fs = FileSystem.getLocal(conf).getRaw(); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + Properties tblProps = new Properties(); + tblProps.setProperty("orc.compress", "SNAPPY"); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .filesystem(fs) + .bucket(10) + .writingBase(false) + .minimumTransactionId(10) + .maximumTransactionId(19) + .inspector(inspector) + .reporter(Reporter.NULL) + .finalDestination(root) + .tableProperties(tblProps); + RecordUpdater updater = new OrcRecordUpdater(root, options); + updater.insert(11, new MyRow("first")); + updater.insert(11, new MyRow("second")); + updater.insert(11, new MyRow("third")); + updater.flush(); + updater.insert(12, new MyRow("fourth")); + updater.insert(12, new MyRow("fifth")); + updater.flush(); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(myOut)); + FileDump.main(new String[]{root.toUri().toString()}); + System.out.flush(); + String outDump = new String(myOut.toByteArray()); + assertEquals(true, outDump.contains("Compression: SNAPPY")); + System.setOut(origOut); + updater.close(false); + } + + @Test public void testUpdates() throws Exception { Path root = new Path(workDir, "testUpdates"); Configuration conf = new Configuration();