diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index 381d97d..6fec8b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -30,10 +30,12 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedInputStream; public abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); + private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB protected final String name; protected final long length; @@ -447,4 +449,26 @@ public static InStream create(String name, return new CompressedStream(name, input, length, codec, bufferSize); } } + + /** + * Creates coded input stream (used for protobuf message parsing) with higher message size limit. + * + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return coded input stream + * @throws IOException + */ + public static CodedInputStream createCodedInputStream(String name, + List input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + InStream inStream = create(name, input, length, codec, bufferSize); + CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); + codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); + return codedInputStream; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java index 43d2933..1910214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java @@ -108,7 +108,7 @@ public MetadataReader(FSDataInputStream file, // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.create("footer", + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", Lists.newArrayList(new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 6589e41..d593473 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -46,7 +45,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.protobuf.CodedInputStream; -import com.google.protobuf.InvalidProtocolBufferException; public class ReaderImpl implements Reader { @@ -466,42 +464,14 @@ private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; footerBuffer.limit(position + metadataSize); - InputStream instream = InStream.create("metadata", Lists.newArrayList( - new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize); - CodedInputStream in = CodedInputStream.newInstance(instream); - int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; - OrcProto.Metadata meta = null; - do { - try { - in.setSizeLimit(msgLimit); - meta = OrcProto.Metadata.parseFrom(in); - } catch (InvalidProtocolBufferException e) { - if (e.getMessage().contains("Protocol message was too large")) { - LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" + - " size of the coded input stream." ); - - msgLimit = msgLimit << 1; - if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { - LOG.error("Metadata section exceeds max protobuf message size of " + - PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); - throw e; - } - - // we must have failed in the middle of reading instream and instream doesn't support - // resetting the stream - instream = InStream.create("metadata", Lists.newArrayList( - new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize); - in = CodedInputStream.newInstance(instream); - } else { - throw e; - } - } - } while (meta == null); - this.metadata = meta; + CodedInputStream instream = InStream.createCodedInputStream("metadata", + Lists.newArrayList(new BufferChunk(footerBuffer, 0)), metadataSize, + codec, bufferSize); + this.metadata = OrcProto.Metadata.parseFrom(instream); footerBuffer.position(position + metadataSize); footerBuffer.limit(position + metadataSize + footerBufferSize); - instream = InStream.create("footer", Lists.newArrayList( + instream = InStream.createCodedInputStream("footer", Lists.newArrayList( new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize); this.footer = OrcProto.Footer.parseFrom(instream);