diff --git a/pom.xml b/pom.xml index ce3da37d2b..db54631c02 100644 --- a/pom.xml +++ b/pom.xml @@ -188,7 +188,7 @@ 2.0.0-M5 4.1.17.Final 3.10.5.Final - 1.9.0 + 1.10.0 0.16.0 1.5.6 2.5.0 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java index 4a17ee494b..9ce1ba4591 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.parquet.vector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; @@ -168,7 +169,7 @@ public Void visit(DataPageV2 dataPageV2) { }); } - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) + private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) throws IOException { this.pageValueCount = valueCount; this.endOfPageValueCount = valuesRead + pageValueCount; @@ -190,7 +191,7 @@ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int } try { - dataColumn.initFromPage(pageValueCount, bytes, offset); + dataColumn.initFromPage(pageValueCount, in); } catch (IOException e) { throw new IOException("could not read page in col " + descriptor, e); } @@ -202,16 +203,15 @@ private void readPageV1(DataPageV1 page) { this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); try { - byte[] bytes = page.getBytes().toByteArray(); - LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records"); - LOG.debug("reading repetition levels at 0"); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - LOG.debug("reading definition levels at " + next); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - LOG.debug("reading data at " + next); - initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount()); + BytesInput bytes = page.getBytes(); + LOG.debug("page size " + bytes.size() + " bytes and " + pageValueCount + " records"); + ByteBufferInputStream in = bytes.toInputStream(); + LOG.debug("reading repetition levels at " + in.position()); + rlReader.initFromPage(pageValueCount, in); + LOG.debug("reading definition levels at " + in.position()); + dlReader.initFromPage(pageValueCount, in); + LOG.debug("reading data at " + in.position()); + initDataReader(page.getValueEncoding(), in, page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); } @@ -224,7 +224,7 @@ private void readPageV2(DataPageV2 page) { this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels()); try { LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records"); - initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount()); + initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount()); } catch (IOException e) { throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java index 5e6802282d..61c6820db3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.parquet.vector; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Dictionary; import java.io.IOException; @@ -31,11 +32,10 @@ /** * Initialize the reader by page data. * @param valueCount value count - * @param page page data - * @param offset current offset + * @param in page data * @throws IOException */ - void initFromPage(int valueCount, byte[] page, int offset) throws IOException; + void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException; /** * @return the next Dictionary ID from the page diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java index 17d6e338e6..715aece592 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; @@ -72,13 +73,9 @@ public DefaultParquetDataColumnReader(Dictionary dict, int length) { this.length = length; } - public void initFromPage(int i, ByteBuffer byteBuffer, int i1) throws IOException { - valuesReader.initFromPage(i, byteBuffer, i1); - } - @Override - public void initFromPage(int valueCount, byte[] page, int offset) throws IOException { - this.initFromPage(valueCount, ByteBuffer.wrap(page), offset); + public void initFromPage(int i, ByteBufferInputStream in) throws IOException { + valuesReader.initFromPage(i, in); } @Override