diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d26573e..8637142 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -550,6 +550,9 @@
// Define the default block padding
HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding",
true),
+ // Define the tolerance for block padding. By default, 5% of stripe size is
+ // tolerated. For 256MB stripe size, maximum padding will be ~13MB.
+ HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f),
// Define the default compression codec for ORC file
HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 8a74e4e..f7af01e 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1962,6 +1962,14 @@
+ hive.exec.orc.block.padding.tolerance
+ 0.05
+
+ Define the tolerance for block padding.
+
+
+
+
hive.exec.orc.default.compress
ZLIB
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 7c542c1..cde66d3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -221,6 +221,7 @@ public static Reader createReader(Path path,
private MemoryManager memoryManagerValue;
private Version versionValue;
private WriterCallback callback;
+ private float paddingTolerance;
WriterOptions(Configuration conf) {
configuration = conf;
@@ -250,6 +251,9 @@ public static Reader createReader(Path path,
} else {
versionValue = Version.byName(versionName);
}
+ paddingTolerance =
+ conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
+ HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
}
/**
@@ -301,6 +305,14 @@ public WriterOptions blockPadding(boolean value) {
}
/**
+ * Sets the tolerance for block padding.
+ */
+ public WriterOptions paddingTolerance(float value) {
+ paddingTolerance = value;
+ return this;
+ }
+
+ /**
* Sets the generic compression that is used to compress the data.
*/
public WriterOptions compress(CompressionKind value) {
@@ -370,7 +382,7 @@ public static Writer createWriter(Path path,
opts.stripeSizeValue, opts.compressValue,
opts.bufferSizeValue, opts.rowIndexStrideValue,
opts.memoryManagerValue, opts.blockPaddingValue,
- opts.versionValue, opts.callback);
+ opts.versionValue, opts.callback, opts.paddingTolerance);
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index b3ab96f..d3ba88a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -97,13 +97,15 @@
private final FileSystem fs;
private final Path path;
- private final long stripeSize;
+ private long adjustedStripeSize;
+ private final long defaultStripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
private final CompressionCodec codec;
private final boolean addBlockPadding;
private final int bufferSize;
private final long blockSize;
+ private final float paddingTolerance;
// the streams that make up the current stripe
private final Map streams =
new TreeMap();
@@ -146,7 +148,8 @@
MemoryManager memoryManager,
boolean addBlockPadding,
OrcFile.Version version,
- OrcFile.WriterCallback callback) throws IOException {
+ OrcFile.WriterCallback callback,
+ float paddingTolerance) throws IOException {
this.fs = fs;
this.path = path;
this.conf = conf;
@@ -162,11 +165,13 @@ public Writer getWriter() {
} else {
callbackContext = null;
}
- this.stripeSize = stripeSize;
+ this.adjustedStripeSize = stripeSize;
+ this.defaultStripeSize = stripeSize;
this.version = version;
this.addBlockPadding = addBlockPadding;
// pick large block size to minimize block over or under hangs
this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
+ this.paddingTolerance = paddingTolerance;
this.compress = compress;
this.bufferSize = bufferSize;
this.rowIndexStride = rowIndexStride;
@@ -211,7 +216,7 @@ static CompressionCodec createCodec(CompressionKind kind) {
@Override
public synchronized boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(stripeSize * newScale);
+ long limit = (long) Math.round(adjustedStripeSize * newScale);
long size = estimateStripeSize();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
@@ -1802,10 +1807,17 @@ private void flushStripe() throws IOException {
// Do we need to pad the file so the stripe doesn't straddle a block
// boundary?
long start = rawWriter.getPos();
- long stripeSize = indexSize + dataSize + footer.getSerializedSize();
+ long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
+ long available = blockSize - (start % blockSize);
+ float availRatio = (float) available / (float) adjustedStripeSize;
+ if (availRatio > 0.0f && availRatio < 1.0f && availRatio > paddingTolerance) {
+ // adjust default stripe size to fit into remaining space
+ adjustedStripeSize = (long) (availRatio * adjustedStripeSize);
+ }
if (addBlockPadding &&
- stripeSize < blockSize &&
- (start % blockSize) + stripeSize > blockSize) {
+ currentStripeSize < blockSize &&
+ (start % blockSize) + currentStripeSize > blockSize &&
+ availRatio < paddingTolerance ) {
long padding = blockSize - (start % blockSize);
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
start += padding;
@@ -1814,6 +1826,9 @@ private void flushStripe() throws IOException {
rawWriter.write(pad, 0, writeLen);
padding -= writeLen;
}
+
+ // after padding reset the default stripe size
+ adjustedStripeSize = defaultStripeSize;
}
// write out the data streams