diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d26573e..8637142 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -550,6 +550,9 @@ // Define the default block padding HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true), + // Define the tolerance for block padding. By default, 5% of stripe size is + // tolerated. For 256MB stripe size, maximum padding will be ~13MB. + HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f), // Define the default compression codec for ORC file HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"), HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 8a74e4e..f7af01e 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -1962,6 +1962,14 @@ + hive.exec.orc.block.padding.tolerance + 0.05 + + Define the tolerance for block padding. + + + + hive.exec.orc.default.compress ZLIB diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 7c542c1..cde66d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -221,6 +221,7 @@ public static Reader createReader(Path path, private MemoryManager memoryManagerValue; private Version versionValue; private WriterCallback callback; + private float paddingTolerance; WriterOptions(Configuration conf) { configuration = conf; @@ -250,6 +251,9 @@ public static Reader createReader(Path path, } else { versionValue = Version.byName(versionName); } + paddingTolerance = + conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, + HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); } /** @@ -301,6 +305,14 @@ public WriterOptions blockPadding(boolean value) { } /** + * Sets the tolerance for block padding. + */ + public WriterOptions paddingTolerance(float value) { + paddingTolerance = value; + return this; + } + + /** * Sets the generic compression that is used to compress the data. */ public WriterOptions compress(CompressionKind value) { @@ -370,7 +382,7 @@ public static Writer createWriter(Path path, opts.stripeSizeValue, opts.compressValue, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, - opts.versionValue, opts.callback); + opts.versionValue, opts.callback, opts.paddingTolerance); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index b3ab96f..d3ba88a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -97,13 +97,15 @@ private final FileSystem fs; private final Path path; - private final long stripeSize; + private long adjustedStripeSize; + private final long defaultStripeSize; private final int rowIndexStride; private final CompressionKind compress; private final CompressionCodec codec; private final boolean addBlockPadding; private final int bufferSize; private final long blockSize; + private final float paddingTolerance; // the streams that make up the current stripe private final Map streams = new TreeMap(); @@ -146,7 +148,8 @@ MemoryManager memoryManager, boolean addBlockPadding, OrcFile.Version version, - OrcFile.WriterCallback callback) throws IOException { + OrcFile.WriterCallback callback, + float paddingTolerance) throws IOException { this.fs = fs; this.path = path; this.conf = conf; @@ -162,11 +165,13 @@ public Writer getWriter() { } else { callbackContext = null; } - this.stripeSize = stripeSize; + this.adjustedStripeSize = stripeSize; + this.defaultStripeSize = stripeSize; this.version = version; this.addBlockPadding = addBlockPadding; // pick large block size to minimize block over or under hangs this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize); + this.paddingTolerance = paddingTolerance; this.compress = compress; this.bufferSize = bufferSize; this.rowIndexStride = rowIndexStride; @@ -211,7 +216,7 @@ static CompressionCodec createCodec(CompressionKind kind) { @Override public synchronized boolean checkMemory(double newScale) throws IOException { - long limit = (long) Math.round(stripeSize * newScale); + long limit = (long) Math.round(adjustedStripeSize * newScale); long size = estimateStripeSize(); if (LOG.isDebugEnabled()) { LOG.debug("ORC writer " + path + " size = " + size + " limit = " + @@ -1802,10 +1807,17 @@ private void flushStripe() throws IOException { // Do we need to pad the file so the stripe doesn't straddle a block // boundary? long start = rawWriter.getPos(); - long stripeSize = indexSize + dataSize + footer.getSerializedSize(); + long currentStripeSize = indexSize + dataSize + footer.getSerializedSize(); + long available = blockSize - (start % blockSize); + float availRatio = (float) available / (float) adjustedStripeSize; + if (availRatio > 0.0f && availRatio < 1.0f && availRatio > paddingTolerance) { + // adjust default stripe size to fit into remaining space + adjustedStripeSize = (long) (availRatio * adjustedStripeSize); + } if (addBlockPadding && - stripeSize < blockSize && - (start % blockSize) + stripeSize > blockSize) { + currentStripeSize < blockSize && + (start % blockSize) + currentStripeSize > blockSize && + availRatio < paddingTolerance ) { long padding = blockSize - (start % blockSize); byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; start += padding; @@ -1814,6 +1826,9 @@ private void flushStripe() throws IOException { rawWriter.write(pad, 0, writeLen); padding -= writeLen; } + + // after padding reset the default stripe size + adjustedStripeSize = defaultStripeSize; } // write out the data streams