diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 4e6dd7a9239d35639a19d2d49e17afdba71f1638..78e40d1f676699cbb6f397fb197485974bda47a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -717,6 +717,7 @@ public SplitStrategy call() throws IOException { private ReaderImpl.FileMetaInfo fileMetaInfo; private Metadata metadata; private List types; + private boolean[] includedCols; private final boolean isOriginal; private final List deltas; private final boolean hasBase; @@ -830,8 +831,14 @@ OrcSplit createSplit(long offset, long length, hosts = new String[hostList.size()]; hostList.toArray(hosts); } + + // scale the raw data size to split level based on ratio of split wrt to file length + final long fileLen = file.getLen(); + final double splitRatio = (double) length / (double) fileLen; + final long scaledProjSize = projColsUncompressedSize > 0 ? + (long) (splitRatio * projColsUncompressedSize) : fileLen; return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo, - isOriginal, hasBase, deltas, projColsUncompressedSize); + isOriginal, hasBase, deltas, scaledProjSize); } /** @@ -845,11 +852,12 @@ OrcSplit createSplit(long offset, long length, // figure out which stripes we need to read boolean[] includeStripe = null; + // we can't eliminate stripes if there are deltas because the // deltas may change the rows making them match the predicate. if (deltas.isEmpty()) { Reader.Options options = new Reader.Options(); - options.include(genIncludedColumns(types, context.conf, isOriginal)); + options.include(includedCols); setSearchArgument(options, types, context.conf, isOriginal); // only do split pruning if HIVE-8732 has been fixed in the writer if (options.getSearchArgument() != null && @@ -930,8 +938,6 @@ OrcSplit createSplit(long offset, long length, private void populateAndCacheStripeDetails() throws IOException { Reader orcReader = OrcFile.createReader(file.getPath(), OrcFile.readerOptions(context.conf).filesystem(fs)); - List projCols = ColumnProjectionUtils.getReadColumnNames(context.conf); - // TODO: produce projColsUncompressedSize from projCols if (fileInfo != null) { stripes = fileInfo.stripeInfos; fileMetaInfo = fileInfo.fileMetaInfo; @@ -959,6 +965,22 @@ private void populateAndCacheStripeDetails() throws IOException { metadata, types, fileMetaInfo, writerVersion)); } } + includedCols = genIncludedColumns(types, context.conf, isOriginal); + projColsUncompressedSize = computeProjectionSize(orcReader, includedCols, isOriginal); + } + + private long computeProjectionSize(final Reader orcReader, final boolean[] includedCols, + final boolean isOriginal) { + final int rootIdx = getRootColumn(isOriginal); + List internalColIds = Lists.newArrayList(); + if (includedCols != null) { + for (int i = 0; i < includedCols.length; i++) { + if (includedCols[i]) { + internalColIds.add(rootIdx + i); + } + } + } + return orcReader.getRawDataSizeFromColIndices(internalColIds); } private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 6f4f013f58324760cce3901be8223f972425c58c..7bddefc8b3e36d29a94dda452e9e1e37960de356 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -52,6 +52,13 @@ long getRawDataSizeOfColumns(List colNames); /** + * Get the deserialized data size of the specified columns ids + * @param colIds - internal column id (check orcfiledump for column ids) + * @return raw data size of columns + */ + long getRawDataSizeFromColIndices(List colIds); + + /** * Get the user metadata keys. * @return the set of metadata keys */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 3c0de3cea5526629d9f68af8f371d2940d44c662..a6448b6870035492879818beac1506de1effcc7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -575,7 +575,8 @@ public long getRawDataSize() { return deserializedSize; } - private long getRawDataSizeFromColIndices(List colIndices) { + @Override + public long getRawDataSizeFromColIndices(List colIndices) { long result = 0; for (int colIdx : colIndices) { result += getRawDataSizeOfColumn(colIdx); @@ -620,7 +621,7 @@ private long getRawDataSizeOfColumn(int colIdx) { case BYTE: return numVals * JavaDataModel.get().primitive1(); default: - LOG.debug("Unknown primitive category."); + LOG.debug("Unknown primitive category: " + type.getKind()); break; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 6cb85299c3fceb4683281ded4db2e98536253b35..0c12c8902973b682e027d280991388a11d9b8568 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Output; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -108,6 +105,9 @@ import org.junit.Test; import org.junit.rules.TestName; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; + public class TestInputOutputFormat { public static String toKryo(SearchArgument sarg) { @@ -902,14 +902,25 @@ static void fill(DataOutputBuffer out, long length) throws IOException { } fill(buffer, offset); footer.addTypes(OrcProto.Type.newBuilder() - .setKind(OrcProto.Type.Kind.STRUCT) - .addFieldNames("col1") - .addSubtypes(1)); + .setKind(OrcProto.Type.Kind.STRUCT) + .addFieldNames("col1") + .addSubtypes(1)); footer.addTypes(OrcProto.Type.newBuilder() .setKind(OrcProto.Type.Kind.STRING)); footer.setNumberOfRows(1000 * stripeLengths.length) .setHeaderLength(headerLen) .setContentLength(offset - headerLen); + footer.addStatistics(OrcProto.ColumnStatistics.newBuilder() + .setNumberOfValues(1000 * stripeLengths.length).build()); + footer.addStatistics(OrcProto.ColumnStatistics.newBuilder() + .setNumberOfValues(1000 * stripeLengths.length) + .setStringStatistics( + OrcProto.StringStatistics.newBuilder() + .setMaximum("zzz") + .setMinimum("aaa") + .setSum(1000 * 3 * stripeLengths.length) + .build() + ).build()); footer.build().writeTo(buffer); int footerEnd = buffer.getLength(); OrcProto.PostScript ps = @@ -1013,6 +1024,78 @@ public void testSplitGenerator() throws Exception { } @Test + public void testProjectedColumnSize() throws Exception { + long[] stripeSizes = + new long[]{200, 200, 200, 200, 100}; + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/a/file", 500, + createMockOrcFile(stripeSizes), + new MockBlock("host1-1", "host1-2", "host1-3"), + new MockBlock("host2-1", "host0", "host2-3"), + new MockBlock("host0", "host3-2", "host3-3"), + new MockBlock("host4-1", "host4-2", "host4-3"), + new MockBlock("host5-1", "host5-2", "host5-3"))); + conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 300); + conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + OrcInputFormat.SplitGenerator splitter = + new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, + fs.getFileStatus(new Path("/a/file")), null, true, + new ArrayList(), true, null, null)); + List results = splitter.call(); + OrcSplit result = results.get(0); + assertEquals(3, results.size()); + assertEquals(3, result.getStart()); + assertEquals(400, result.getLength()); + assertEquals(167468, result.getProjectedColumnsUncompressedSize()); + result = results.get(1); + assertEquals(403, result.getStart()); + assertEquals(400, result.getLength()); + assertEquals(167468, result.getProjectedColumnsUncompressedSize()); + result = results.get(2); + assertEquals(803, result.getStart()); + assertEquals(100, result.getLength()); + assertEquals(41867, result.getProjectedColumnsUncompressedSize()); + + // test min = 0, max = 0 generates each stripe + conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0); + conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0); + context = new OrcInputFormat.Context(conf); + splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, + fs.getFileStatus(new Path("/a/file")), null, true, + new ArrayList(), + true, null, null)); + results = splitter.call(); + assertEquals(5, results.size()); + for (int i = 0; i < stripeSizes.length; ++i) { + assertEquals("checking stripe " + i + " size", + stripeSizes[i], results.get(i).getLength()); + if (i == stripeSizes.length - 1) { + assertEquals(41867, results.get(i).getProjectedColumnsUncompressedSize()); + } else { + assertEquals(83734, results.get(i).getProjectedColumnsUncompressedSize()); + } + } + + // single split + conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 100000); + conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 1000); + context = new OrcInputFormat.Context(conf); + splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, + fs.getFileStatus(new Path("/a/file")), null, true, + new ArrayList(), + true, null, null)); + results = splitter.call(); + assertEquals(1, results.size()); + result = results.get(0); + assertEquals(3, result.getStart()); + assertEquals(900, result.getLength()); + assertEquals(376804, result.getProjectedColumnsUncompressedSize()); + } + + @Test @SuppressWarnings("unchecked,deprecation") public void testInOutFormat() throws Exception { Properties properties = new Properties();