diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 44d9a57..75a5f7e 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -547,6 +547,9 @@
HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", null),
// Define the default ORC stripe size
HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size",
+ 64L * 1024 * 1024),
+ // Define the default file system block size for ORC
+ HIVE_ORC_DEFAULT_BLOCK_SIZE("hive.exec.orc.default.block.size",
256L * 1024 * 1024),
HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD(
"hive.exec.orc.dictionary.key.size.threshold", 0.8f),
@@ -558,6 +561,9 @@
// Define the default block padding
HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding",
true),
+ // Define the tolerance for block padding. The total padded length will
+ // always be less than the specified percentage.
+ HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f),
// Define the default compression codec for ORC file
HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
// Define the default encoding strategy to use
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index e53df4f..ef0e40c 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1957,13 +1957,21 @@
hive.exec.orc.default.stripe.size
- 268435456
+ 67108864
Define the default ORC stripe size.
+ hive.exec.orc.default.block.size
+ 268435456
+
+ Define the default file system block size for ORC files.
+
+
+
+
hive.exec.orc.default.row.index.stride
10000
@@ -1988,6 +1996,17 @@
+ hive.exec.orc.block.padding.tolerance
+ 0.05
+
+ Define the tolerance for block padding as a percentage of stripe size.
+ For the defaults of 64Mb ORC stripe and 256Mb HDFS blocks, a maximum of 3.2Mb will be reserved for padding within the 256Mb block.
+ In that case, if the available size within the block is more than 3.2Mb, a new smaller stripe will be inserted to fit within that space.
+ This will make sure that no stripe written will cross block boundaries and cause remote reads within a node local task.
+
+
+
+
hive.exec.orc.default.compress
ZLIB
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 7686f82..b89b335 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -118,6 +118,7 @@ public int getMinor() {
COMPRESSION("orc.compress"),
COMPRESSION_BLOCK_SIZE("orc.compress.size"),
STRIPE_SIZE("orc.stripe.size"),
+ BLOCK_SIZE("orc.block.size"),
ROW_INDEX_STRIDE("orc.row.index.stride"),
ENABLE_INDEXES("orc.create.index"),
BLOCK_PADDING("orc.block.padding"),
@@ -218,6 +219,7 @@ public static Reader createReader(Path path,
private FileSystem fileSystemValue = null;
private ObjectInspector inspectorValue = null;
private long stripeSizeValue;
+ private long blockSizeValue;
private int rowIndexStrideValue;
private int bufferSizeValue;
private boolean blockPaddingValue;
@@ -226,6 +228,7 @@ public static Reader createReader(Path path,
private Version versionValue;
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
+ private float paddingTolerance;
WriterOptions(Configuration conf) {
configuration = conf;
@@ -233,6 +236,9 @@ public static Reader createReader(Path path,
stripeSizeValue =
conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal);
+ blockSizeValue =
+ conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.varname,
+ HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.defaultLongVal);
rowIndexStrideValue =
conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE
.varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.defaultIntVal);
@@ -262,6 +268,9 @@ public static Reader createReader(Path path,
} else {
encodingStrategy = EncodingStrategy.valueOf(enString);
}
+ paddingTolerance =
+ conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
+ HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
}
/**
@@ -284,6 +293,15 @@ public WriterOptions stripeSize(long value) {
}
/**
+ * Set the file system block size for the file. For optimal performance,
+ * set the block size to be multiple factors of stripe size.
+ */
+ public WriterOptions blockSize(long value) {
+ blockSizeValue = value;
+ return this;
+ }
+
+ /**
* Set the distance between entries in the row index. The minimum value is
* 1000 to prevent the index from overwhelming the data. If the stride is
* set to 0, no indexes will be included in the file.
@@ -321,6 +339,14 @@ public WriterOptions encodingStrategy(EncodingStrategy strategy) {
}
/**
+ * Sets the tolerance for block padding as a percentage of stripe size.
+ */
+ public WriterOptions paddingTolerance(float value) {
+ paddingTolerance = value;
+ return this;
+ }
+
+ /**
* Sets the generic compression that is used to compress the data.
*/
public WriterOptions compress(CompressionKind value) {
@@ -390,7 +416,9 @@ public static Writer createWriter(Path path,
opts.stripeSizeValue, opts.compressValue,
opts.bufferSizeValue, opts.rowIndexStrideValue,
opts.memoryManagerValue, opts.blockPaddingValue,
- opts.versionValue, opts.callback, opts.encodingStrategy);
+ opts.versionValue, opts.callback,
+ opts.encodingStrategy, opts.paddingTolerance,
+ opts.blockSizeValue);
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index ba69246..4323fa9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -105,13 +105,15 @@
private final FileSystem fs;
private final Path path;
- private final long stripeSize;
+ private final long defaultStripeSize;
+ private long adjustedStripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
private final CompressionCodec codec;
private final boolean addBlockPadding;
private final int bufferSize;
private final long blockSize;
+ private final float paddingTolerance;
// the streams that make up the current stripe
private final Map streams =
new TreeMap();
@@ -156,7 +158,9 @@
boolean addBlockPadding,
OrcFile.Version version,
OrcFile.WriterCallback callback,
- OrcFile.EncodingStrategy encodingStrategy) throws IOException {
+ OrcFile.EncodingStrategy encodingStrategy,
+ float paddingTolerance,
+ long blockSizeValue) throws IOException {
this.fs = fs;
this.path = path;
this.conf = conf;
@@ -172,12 +176,13 @@ public Writer getWriter() {
} else {
callbackContext = null;
}
- this.stripeSize = stripeSize;
+ this.adjustedStripeSize = stripeSize;
+ this.defaultStripeSize = stripeSize;
this.version = version;
this.encodingStrategy = encodingStrategy;
this.addBlockPadding = addBlockPadding;
- // pick large block size to minimize block over or under hangs
- this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
+ this.blockSize = blockSizeValue;
+ this.paddingTolerance = paddingTolerance;
this.compress = compress;
this.rowIndexStride = rowIndexStride;
this.memoryManager = memoryManager;
@@ -296,7 +301,7 @@ static CompressionCodec createCodec(CompressionKind kind) {
@Override
public synchronized boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(stripeSize * newScale);
+ long limit = (long) Math.round(adjustedStripeSize * newScale);
long size = estimateStripeSize();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
@@ -1904,18 +1909,49 @@ private void flushStripe() throws IOException {
// Do we need to pad the file so the stripe doesn't straddle a block
// boundary?
long start = rawWriter.getPos();
- long stripeSize = indexSize + dataSize + footer.getSerializedSize();
- if (addBlockPadding &&
- stripeSize < blockSize &&
- (start % blockSize) + stripeSize > blockSize) {
+ final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
+ final long available = blockSize - (start % blockSize);
+ final long overflow = currentStripeSize - adjustedStripeSize;
+ final float availRatio = (float) available / (float) defaultStripeSize;
+
+ if (availRatio > 0.0f && availRatio < 1.0f
+ && availRatio > paddingTolerance) {
+ // adjust default stripe size to fit into remaining space, also adjust
+ // the next stripe for correction based on the current stripe size
+ // and user specified padding tolerance. Since stripe size can overflow
+ // the default stripe size we should apply this correction to avoid
+ // writing portion of last stripe to next hdfs block.
+ float correction = overflow > 0 ? (float) overflow
+ / (float) adjustedStripeSize : 0.0f;
+
+ // correction should not be greater than user specified padding
+ // tolerance
+ correction = correction > paddingTolerance ? paddingTolerance
+ : correction;
+
+ // adjust next stripe size based on current stripe estimate correction
+ adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+ } else if (availRatio >= 1.0) {
+ adjustedStripeSize = defaultStripeSize;
+ }
+
+ if (availRatio < paddingTolerance && addBlockPadding) {
long padding = blockSize - (start % blockSize);
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+ LOG.info(String.format("Padding ORC by %d bytes (<= %0.2f * %d)",
+ padding, availRatio, defaultStripeSize));
start += padding;
while (padding > 0) {
int writeLen = (int) Math.min(padding, pad.length);
rawWriter.write(pad, 0, writeLen);
padding -= writeLen;
}
+ adjustedStripeSize = defaultStripeSize;
+ } else if (currentStripeSize < blockSize
+ && (start % blockSize) + currentStripeSize > blockSize) {
+ // even if you don't pad, reset the default stripe size when crossing a
+ // block boundary
+ adjustedStripeSize = defaultStripeSize;
}
// write out the data streams
diff --git ql/src/test/resources/orc-file-dump-dictionary-threshold.out ql/src/test/resources/orc-file-dump-dictionary-threshold.out
index 0a06481..f6e7a50 100644
--- ql/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ ql/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -90,15 +90,15 @@ Stripes:
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
Encoding column 3: DIRECT_V2
- Stripe: offset: 1800000 data: 177935 rows: 1000 tail: 67 index: 813
- Stream: column 0 section ROW_INDEX start: 1800000 length 10
- Stream: column 1 section ROW_INDEX start: 1800010 length 36
- Stream: column 2 section ROW_INDEX start: 1800046 length 39
- Stream: column 3 section ROW_INDEX start: 1800085 length 728
- Stream: column 1 section DATA start: 1800813 length 4007
- Stream: column 2 section DATA start: 1804820 length 8007
- Stream: column 3 section DATA start: 1812827 length 164661
- Stream: column 3 section LENGTH start: 1977488 length 1260
+ Stripe: offset: 1751794 data: 177935 rows: 1000 tail: 67 index: 813
+ Stream: column 0 section ROW_INDEX start: 1751794 length 10
+ Stream: column 1 section ROW_INDEX start: 1751804 length 36
+ Stream: column 2 section ROW_INDEX start: 1751840 length 39
+ Stream: column 3 section ROW_INDEX start: 1751879 length 728
+ Stream: column 1 section DATA start: 1752607 length 4007
+ Stream: column 2 section DATA start: 1756614 length 8007
+ Stream: column 3 section DATA start: 1764621 length 164661
+ Stream: column 3 section LENGTH start: 1929282 length 1260
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
diff --git ql/src/test/resources/orc-file-dump.out ql/src/test/resources/orc-file-dump.out
index 7a5b907..f46a89b 100644
--- ql/src/test/resources/orc-file-dump.out
+++ ql/src/test/resources/orc-file-dump.out
@@ -80,30 +80,30 @@ Stripes:
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY_V2
- Stripe: offset: 200000 data: 63796 rows: 5000 tail: 74 index: 123
- Stream: column 0 section ROW_INDEX start: 200000 length 10
- Stream: column 1 section ROW_INDEX start: 200010 length 35
- Stream: column 2 section ROW_INDEX start: 200045 length 39
- Stream: column 3 section ROW_INDEX start: 200084 length 39
- Stream: column 1 section DATA start: 200123 length 20029
- Stream: column 2 section DATA start: 220152 length 40035
- Stream: column 3 section DATA start: 260187 length 3574
- Stream: column 3 section LENGTH start: 263761 length 25
- Stream: column 3 section DICTIONARY_DATA start: 263786 length 133
+ Stripe: offset: 191881 data: 63796 rows: 5000 tail: 74 index: 123
+ Stream: column 0 section ROW_INDEX start: 191881 length 10
+ Stream: column 1 section ROW_INDEX start: 191891 length 35
+ Stream: column 2 section ROW_INDEX start: 191926 length 39
+ Stream: column 3 section ROW_INDEX start: 191965 length 39
+ Stream: column 1 section DATA start: 192004 length 20029
+ Stream: column 2 section DATA start: 212033 length 40035
+ Stream: column 3 section DATA start: 252068 length 3574
+ Stream: column 3 section LENGTH start: 255642 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 255667 length 133
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2
Encoding column 3: DICTIONARY_V2
- Stripe: offset: 263993 data: 12940 rows: 1000 tail: 71 index: 123
- Stream: column 0 section ROW_INDEX start: 263993 length 10
- Stream: column 1 section ROW_INDEX start: 264003 length 36
- Stream: column 2 section ROW_INDEX start: 264039 length 39
- Stream: column 3 section ROW_INDEX start: 264078 length 38
- Stream: column 1 section DATA start: 264116 length 4007
- Stream: column 2 section DATA start: 268123 length 8007
- Stream: column 3 section DATA start: 276130 length 768
- Stream: column 3 section LENGTH start: 276898 length 25
- Stream: column 3 section DICTIONARY_DATA start: 276923 length 133
+ Stripe: offset: 255874 data: 12940 rows: 1000 tail: 71 index: 123
+ Stream: column 0 section ROW_INDEX start: 255874 length 10
+ Stream: column 1 section ROW_INDEX start: 255884 length 36
+ Stream: column 2 section ROW_INDEX start: 255920 length 39
+ Stream: column 3 section ROW_INDEX start: 255959 length 38
+ Stream: column 1 section DATA start: 255997 length 4007
+ Stream: column 2 section DATA start: 260004 length 8007
+ Stream: column 3 section DATA start: 268011 length 768
+ Stream: column 3 section LENGTH start: 268779 length 25
+ Stream: column 3 section DICTIONARY_DATA start: 268804 length 133
Encoding column 0: DIRECT
Encoding column 1: DIRECT_V2
Encoding column 2: DIRECT_V2