diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2d7489b..42f3b4e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1206,6 +1206,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "to use dictionary or not will be retained thereafter."), HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, "Define the default ORC buffer size, in bytes."), + HIVE_ORC_BASE_DELTA_RATIO("hive.exec.orc.base.delta.ratio", 8, "The ratio of base writer and\n" + + "delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."), HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true, "Define the default block padding, which pads stripes to the HDFS block boundaries."), HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f, diff --git orc/src/java/org/apache/orc/OrcConf.java orc/src/java/org/apache/orc/OrcConf.java index 6fcbb72..357318d 100644 --- orc/src/java/org/apache/orc/OrcConf.java +++ orc/src/java/org/apache/orc/OrcConf.java @@ -40,6 +40,8 @@ " number of rows n index entry represents.)"), BUFFER_SIZE("orc.compress.size", "hive.exec.orc.default.buffer.size", 256 * 1024, "Define the default ORC buffer size, in bytes."), + BASE_DELTA_RATIO("orc.base.delta.ratio", "hive.exec.orc.base.delta.ratio", 8, + "The ratio of base writer and delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."), BLOCK_PADDING("orc.block.padding", "hive.exec.orc.default.block.padding", true, "Define whether stripes should be padded to the HDFS block boundaries."), diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 4bf2403..e577961 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -27,6 +27,7 @@ import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.OrcConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -204,19 +205,38 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { flushLengths = null; } OrcFile.WriterOptions writerOptions = null; - if (options instanceof OrcOptions) { - writerOptions = ((OrcOptions) options).getOrcOptions(); - } - if (writerOptions == null) { - writerOptions = OrcFile.writerOptions(options.getTableProperties(), - options.getConfiguration()); - } - writerOptions.fileSystem(fs).callback(indexBuilder); - if (!options.isWritingBase()) { + // If writing delta dirs, we need to make a clone of original options, to avoid polluting it for + // the base writer + if (options.isWritingBase()) { + if (options instanceof OrcOptions) { + writerOptions = ((OrcOptions) options).getOrcOptions(); + } + if (writerOptions == null) { + writerOptions = OrcFile.writerOptions(options.getTableProperties(), + options.getConfiguration()); + } + } else { // delta writer + AcidOutputFormat.Options optionsCloneForDelta = options.clone(); + + if (optionsCloneForDelta instanceof OrcOptions) { + writerOptions = ((OrcOptions) optionsCloneForDelta).getOrcOptions(); + } + if (writerOptions == null) { + writerOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(), + optionsCloneForDelta.getConfiguration()); + } + + // get buffer size and stripe size for base writer + int baseBufferSizeValue = writerOptions.getBufferSize(); + long baseStripeSizeValue = writerOptions.getStripeSize(); + + // overwrite buffer size and stripe size for delta writer, based on BASE_DELTA_RATIO + int ratio = (int) OrcConf.BASE_DELTA_RATIO.getLong(options.getConfiguration()); + writerOptions.bufferSize(baseBufferSizeValue / ratio); + writerOptions.stripeSize(baseStripeSizeValue / ratio); writerOptions.blockPadding(false); - writerOptions.bufferSize(DELTA_BUFFER_SIZE); - writerOptions.stripeSize(DELTA_STRIPE_SIZE); } + writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), options.getRecordIdColumn()))); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 0a61fb8..67c473e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -197,6 +198,8 @@ public void testWriterTblProperties() throws Exception { } Properties tblProps = new Properties(); tblProps.setProperty("orc.compress", "SNAPPY"); + tblProps.setProperty("orc.compress.size", "8192"); + HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_ORC_BASE_DELTA_RATIO, 4); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) .bucket(10) @@ -223,6 +226,7 @@ public void testWriterTblProperties() throws Exception { System.out.flush(); String outDump = new String(myOut.toByteArray()); assertEquals(true, outDump.contains("Compression: SNAPPY")); + assertEquals(true, outDump.contains("Compression size: 2048")); System.setOut(origOut); updater.close(false); }