diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index 381d97d..6fec8b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -30,10 +30,12 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedInputStream; public abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); + private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB protected final String name; protected final long length; @@ -447,4 +449,26 @@ public static InStream create(String name, return new CompressedStream(name, input, length, codec, bufferSize); } } + + /** + * Creates coded input stream (used for protobuf message parsing) with higher message size limit. + * + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return coded input stream + * @throws IOException + */ + public static CodedInputStream createCodedInputStream(String name, + List input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + InStream inStream = create(name, input, length, codec, bufferSize); + CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); + codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); + return codedInputStream; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java index cdc0372..5aebf45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java @@ -110,7 +110,7 @@ public MetadataReader(FSDataInputStream file, ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.seek(offset); file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.create("footer", + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", Lists.newArrayList(new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 80cfc98..e1dc915 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -48,15 +47,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.protobuf.CodedInputStream; -import com.google.protobuf.InvalidProtocolBufferException; public class ReaderImpl implements Reader { private static final Log LOG = LogFactory.getLog(ReaderImpl.class); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; - private static final int DEFAULT_PROTOBUF_MESSAGE_LIMIT = 64 << 20; // 64MB - private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB protected final FileSystem fileSystem; protected final Path path; @@ -388,76 +384,16 @@ public static FooterInfo extractMetaInfoFromFooter( int footerSize, CompressionCodec codec, int bufferSize) throws IOException { bb.position(footerAbsPos); bb.limit(footerAbsPos + footerSize); - InputStream instream = InStream.create("footer", Lists.newArrayList( - new BufferChunk(bb, 0)), footerSize, codec, bufferSize); - CodedInputStream in = CodedInputStream.newInstance(instream); - int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; - OrcProto.Footer footer = null; - do { - try { - in.setSizeLimit(msgLimit); - footer = OrcProto.Footer.parseFrom(in); - } catch (InvalidProtocolBufferException e) { - if (e.getMessage().contains("Protocol message was too large")) { - LOG.warn("Footer section is larger than " + msgLimit + " bytes. Increasing the max" + - " size of the coded input stream." ); - - msgLimit = msgLimit << 1; - if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { - LOG.error("Footer section exceeds max protobuf message size of " + - PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); - throw e; - } - - // we must have failed in the middle of reading instream and instream doesn't support - // resetting the stream - instream = InStream.create("footer", Lists.newArrayList( - new BufferChunk(bb, 0)), footerSize, codec, bufferSize); - in = CodedInputStream.newInstance(instream); - } else { - throw e; - } - } - } while (footer == null); - return footer; + return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", + Lists.newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); } private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { bb.position(metadataAbsPos); bb.limit(metadataAbsPos + metadataSize); - InputStream instream = InStream.create("metadata", Lists.newArrayList( - new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); - CodedInputStream in = CodedInputStream.newInstance(instream); - int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; - OrcProto.Metadata meta = null; - do { - try { - in.setSizeLimit(msgLimit); - meta = OrcProto.Metadata.parseFrom(in); - } catch (InvalidProtocolBufferException e) { - if (e.getMessage().contains("Protocol message was too large")) { - LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" + - " size of the coded input stream." ); - - msgLimit = msgLimit << 1; - if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { - LOG.error("Metadata section exceeds max protobuf message size of " + - PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); - throw e; - } - - // we must have failed in the middle of reading instream and instream doesn't support - // resetting the stream - instream = InStream.create("metadata", Lists.newArrayList( - new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); - in = CodedInputStream.newInstance(instream); - } else { - throw e; - } - } - } while (meta == null); - return meta; + return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", + Lists.newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); } private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,