diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 44d9a57..75a5f7e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -547,6 +547,9 @@ HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", null), // Define the default ORC stripe size HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size", + 64L * 1024 * 1024), + // Define the default file system block size for ORC + HIVE_ORC_DEFAULT_BLOCK_SIZE("hive.exec.orc.default.block.size", 256L * 1024 * 1024), HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD( "hive.exec.orc.dictionary.key.size.threshold", 0.8f), @@ -558,6 +561,9 @@ // Define the default block padding HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true), + // Define the tolerance for block padding. The total padded length will + // always be less than the specified percentage. + HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f), // Define the default compression codec for ORC file HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"), // Define the default encoding strategy to use diff --git a/conf/hive-default.xml.template b/conf/hive-default.xml.template index e53df4f..f23cb3c 100644 --- a/conf/hive-default.xml.template +++ b/conf/hive-default.xml.template @@ -1957,13 +1957,21 @@ hive.exec.orc.default.stripe.size - 268435456 + 67108864 Define the default ORC stripe size. + hive.exec.orc.default.block.size + 268435456 + + Define the default file system block size for ORC files. + + + + hive.exec.orc.default.row.index.stride 10000 @@ -1988,6 +1996,18 @@ + hive.exec.orc.block.padding.tolerance + 0.05 + + Define the tolerance for block padding as a percentage of stripe size. + No block will have more than this fraction of padding. + For the defaults of 64Mb stripe & 256Mb blocks, only 3.2Mb will be reserved for padding within the 256Mb block. + In that case, if the available size within the block is more than 3.2Mb, a new smaller stripe will be inserted to fit within that space. + This will make sure that no stripe written will cross block boundaries and cause remote reads within a node local task. + + + + hive.exec.orc.default.compress ZLIB diff --git a/pom.xml b/pom.xml index b5a5697..8dfc128 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ 1.0.1 1.7.5 4.0.4 - 0.4.0-incubating + 0.4.1-incubating 1.1 0.2 1.4 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 61cc874..41cd1aa 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -307,6 +307,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job try { List paths = Utilities.getInputPathsTez(job, mrwork); dirs = paths.toArray(new Path[paths.size()]); + for(Path d: dirs) { + System.err.println("GOPAL: " + d); + } } catch (Exception e) { throw new IOException("Could not create input files", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 7686f82..b89b335 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -118,6 +118,7 @@ public int getMinor() { COMPRESSION("orc.compress"), COMPRESSION_BLOCK_SIZE("orc.compress.size"), STRIPE_SIZE("orc.stripe.size"), + BLOCK_SIZE("orc.block.size"), ROW_INDEX_STRIDE("orc.row.index.stride"), ENABLE_INDEXES("orc.create.index"), BLOCK_PADDING("orc.block.padding"), @@ -218,6 +219,7 @@ public static Reader createReader(Path path, private FileSystem fileSystemValue = null; private ObjectInspector inspectorValue = null; private long stripeSizeValue; + private long blockSizeValue; private int rowIndexStrideValue; private int bufferSizeValue; private boolean blockPaddingValue; @@ -226,6 +228,7 @@ public static Reader createReader(Path path, private Version versionValue; private WriterCallback callback; private EncodingStrategy encodingStrategy; + private float paddingTolerance; WriterOptions(Configuration conf) { configuration = conf; @@ -233,6 +236,9 @@ public static Reader createReader(Path path, stripeSizeValue = conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal); + blockSizeValue = + conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.varname, + HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE.defaultLongVal); rowIndexStrideValue = conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.defaultIntVal); @@ -262,6 +268,9 @@ public static Reader createReader(Path path, } else { encodingStrategy = EncodingStrategy.valueOf(enString); } + paddingTolerance = + conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, + HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); } /** @@ -284,6 +293,15 @@ public WriterOptions stripeSize(long value) { } /** + * Set the file system block size for the file. For optimal performance, + * set the block size to be multiple factors of stripe size. + */ + public WriterOptions blockSize(long value) { + blockSizeValue = value; + return this; + } + + /** * Set the distance between entries in the row index. The minimum value is * 1000 to prevent the index from overwhelming the data. If the stride is * set to 0, no indexes will be included in the file. @@ -321,6 +339,14 @@ public WriterOptions encodingStrategy(EncodingStrategy strategy) { } /** + * Sets the tolerance for block padding as a percentage of stripe size. + */ + public WriterOptions paddingTolerance(float value) { + paddingTolerance = value; + return this; + } + + /** * Sets the generic compression that is used to compress the data. */ public WriterOptions compress(CompressionKind value) { @@ -390,7 +416,9 @@ public static Writer createWriter(Path path, opts.stripeSizeValue, opts.compressValue, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, - opts.versionValue, opts.callback, opts.encodingStrategy); + opts.versionValue, opts.callback, + opts.encodingStrategy, opts.paddingTolerance, + opts.blockSizeValue); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index ba69246..0321109 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -105,13 +105,15 @@ private final FileSystem fs; private final Path path; - private final long stripeSize; + private final long defaultStripeSize; + private long adjustedStripeSize; private final int rowIndexStride; private final CompressionKind compress; private final CompressionCodec codec; private final boolean addBlockPadding; private final int bufferSize; private final long blockSize; + private final float paddingTolerance; // the streams that make up the current stripe private final Map streams = new TreeMap(); @@ -156,7 +158,9 @@ boolean addBlockPadding, OrcFile.Version version, OrcFile.WriterCallback callback, - OrcFile.EncodingStrategy encodingStrategy) throws IOException { + OrcFile.EncodingStrategy encodingStrategy, + float paddingTolerance, + long blockSizeValue) throws IOException { this.fs = fs; this.path = path; this.conf = conf; @@ -172,12 +176,13 @@ public Writer getWriter() { } else { callbackContext = null; } - this.stripeSize = stripeSize; + this.adjustedStripeSize = stripeSize; + this.defaultStripeSize = stripeSize; this.version = version; this.encodingStrategy = encodingStrategy; this.addBlockPadding = addBlockPadding; - // pick large block size to minimize block over or under hangs - this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize); + this.blockSize = blockSizeValue; + this.paddingTolerance = paddingTolerance; this.compress = compress; this.rowIndexStride = rowIndexStride; this.memoryManager = memoryManager; @@ -296,7 +301,7 @@ static CompressionCodec createCodec(CompressionKind kind) { @Override public synchronized boolean checkMemory(double newScale) throws IOException { - long limit = (long) Math.round(stripeSize * newScale); + long limit = (long) Math.round(adjustedStripeSize * newScale); long size = estimateStripeSize(); if (LOG.isDebugEnabled()) { LOG.debug("ORC writer " + path + " size = " + size + " limit = " + @@ -1904,10 +1909,31 @@ private void flushStripe() throws IOException { // Do we need to pad the file so the stripe doesn't straddle a block // boundary? long start = rawWriter.getPos(); - long stripeSize = indexSize + dataSize + footer.getSerializedSize(); + long currentStripeSize = indexSize + dataSize + footer.getSerializedSize(); + long available = blockSize - (start % blockSize); + long overflow = currentStripeSize - defaultStripeSize; + float availRatio = (float) available / (float) adjustedStripeSize; + if (availRatio > 0.0f && availRatio < 1.0f && + availRatio > paddingTolerance) { + // adjust default stripe size to fit into remaining space, also adjust + // the next stripe for correction based on the current stripe size + // and user specified padding tolerance. Since stripe size can overflow + // the default stripe size we should apply this correction to avoid + // writing portion of last stripe to next hdfs block. + float correction = overflow > 0 ? (float) overflow / + (float) defaultStripeSize : 0.0f; + + // correction should not be greater than user specified padding tolerance + correction = correction > paddingTolerance ? paddingTolerance : correction; + + // adjust next stripe size based on current stripe estimate correction + adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * + adjustedStripeSize)); + } if (addBlockPadding && - stripeSize < blockSize && - (start % blockSize) + stripeSize > blockSize) { + currentStripeSize < blockSize && + (start % blockSize) + currentStripeSize > blockSize && + availRatio < paddingTolerance ) { long padding = blockSize - (start % blockSize); byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; start += padding; @@ -1916,6 +1942,9 @@ private void flushStripe() throws IOException { rawWriter.write(pad, 0, writeLen); padding -= writeLen; } + + // after padding reset the default stripe size + adjustedStripeSize = defaultStripeSize; } // write out the data streams diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out index 0a06481..f6e7a50 100644 --- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out +++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -90,15 +90,15 @@ Stripes: Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 Encoding column 3: DIRECT_V2 - Stripe: offset: 1800000 data: 177935 rows: 1000 tail: 67 index: 813 - Stream: column 0 section ROW_INDEX start: 1800000 length 10 - Stream: column 1 section ROW_INDEX start: 1800010 length 36 - Stream: column 2 section ROW_INDEX start: 1800046 length 39 - Stream: column 3 section ROW_INDEX start: 1800085 length 728 - Stream: column 1 section DATA start: 1800813 length 4007 - Stream: column 2 section DATA start: 1804820 length 8007 - Stream: column 3 section DATA start: 1812827 length 164661 - Stream: column 3 section LENGTH start: 1977488 length 1260 + Stripe: offset: 1751794 data: 177935 rows: 1000 tail: 67 index: 813 + Stream: column 0 section ROW_INDEX start: 1751794 length 10 + Stream: column 1 section ROW_INDEX start: 1751804 length 36 + Stream: column 2 section ROW_INDEX start: 1751840 length 39 + Stream: column 3 section ROW_INDEX start: 1751879 length 728 + Stream: column 1 section DATA start: 1752607 length 4007 + Stream: column 2 section DATA start: 1756614 length 8007 + Stream: column 3 section DATA start: 1764621 length 164661 + Stream: column 3 section LENGTH start: 1929282 length 1260 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out index 7a5b907..f46a89b 100644 --- a/ql/src/test/resources/orc-file-dump.out +++ b/ql/src/test/resources/orc-file-dump.out @@ -80,30 +80,30 @@ Stripes: Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY_V2 - Stripe: offset: 200000 data: 63796 rows: 5000 tail: 74 index: 123 - Stream: column 0 section ROW_INDEX start: 200000 length 10 - Stream: column 1 section ROW_INDEX start: 200010 length 35 - Stream: column 2 section ROW_INDEX start: 200045 length 39 - Stream: column 3 section ROW_INDEX start: 200084 length 39 - Stream: column 1 section DATA start: 200123 length 20029 - Stream: column 2 section DATA start: 220152 length 40035 - Stream: column 3 section DATA start: 260187 length 3574 - Stream: column 3 section LENGTH start: 263761 length 25 - Stream: column 3 section DICTIONARY_DATA start: 263786 length 133 + Stripe: offset: 191881 data: 63796 rows: 5000 tail: 74 index: 123 + Stream: column 0 section ROW_INDEX start: 191881 length 10 + Stream: column 1 section ROW_INDEX start: 191891 length 35 + Stream: column 2 section ROW_INDEX start: 191926 length 39 + Stream: column 3 section ROW_INDEX start: 191965 length 39 + Stream: column 1 section DATA start: 192004 length 20029 + Stream: column 2 section DATA start: 212033 length 40035 + Stream: column 3 section DATA start: 252068 length 3574 + Stream: column 3 section LENGTH start: 255642 length 25 + Stream: column 3 section DICTIONARY_DATA start: 255667 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 Encoding column 3: DICTIONARY_V2 - Stripe: offset: 263993 data: 12940 rows: 1000 tail: 71 index: 123 - Stream: column 0 section ROW_INDEX start: 263993 length 10 - Stream: column 1 section ROW_INDEX start: 264003 length 36 - Stream: column 2 section ROW_INDEX start: 264039 length 39 - Stream: column 3 section ROW_INDEX start: 264078 length 38 - Stream: column 1 section DATA start: 264116 length 4007 - Stream: column 2 section DATA start: 268123 length 8007 - Stream: column 3 section DATA start: 276130 length 768 - Stream: column 3 section LENGTH start: 276898 length 25 - Stream: column 3 section DICTIONARY_DATA start: 276923 length 133 + Stripe: offset: 255874 data: 12940 rows: 1000 tail: 71 index: 123 + Stream: column 0 section ROW_INDEX start: 255874 length 10 + Stream: column 1 section ROW_INDEX start: 255884 length 36 + Stream: column 2 section ROW_INDEX start: 255920 length 39 + Stream: column 3 section ROW_INDEX start: 255959 length 38 + Stream: column 1 section DATA start: 255997 length 4007 + Stream: column 2 section DATA start: 260004 length 8007 + Stream: column 3 section DATA start: 268011 length 768 + Stream: column 3 section LENGTH start: 268779 length 25 + Stream: column 3 section DICTIONARY_DATA start: 268804 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2