diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 5e577d2bf8..092e89aa3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -25,11 +25,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; +import java.time.Instant; /** * It's column level Parquet reader which is used to read a batch of records for a column, @@ -110,8 +112,10 @@ private void readBatchHelper( case DECIMAL: readDecimal(num, (DecimalColumnVector) column, rowId); break; - case INTERVAL_DAY_TIME: case TIMESTAMP: + readTimestamp(num, (TimestampColumnVector) column, rowId); + break; + case INTERVAL_DAY_TIME: default: throw new IOException("Unsupported type: " + type); } @@ -288,6 +292,52 @@ private void readBinaries( } } + private void readTimestamp( + int total, + TimestampColumnVector c, + int rowId) throws IOException { + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + switch(descriptor.getType()) { + case INT64: + long seconds = 0; + long nanoSeconds = 0; + switch(type.getOriginalType()) { + case TIMESTAMP_MILLIS: + long miliSeconds = dataColumn.readLong(); + seconds = miliSeconds / 10^3; + nanoSeconds = (miliSeconds - seconds * 10^3) * 10^6; + break; + case TIMESTAMP_MICROS: + long microSeconds = dataColumn.readLong(); + seconds = microSeconds / 10^6; + nanoSeconds = (microSeconds - seconds * 10^6) * 10^3; + default: + throw new IOException("Unsupported parquet logical type: " + type.getOriginalType() + " for timestamp"); + } + c.set(rowId, Timestamp.from(Instant.ofEpochSecond(seconds, nanoSeconds))); + case INT96: + NanoTime nt = NanoTime.fromBinary(dataColumn.readBytes()); + Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion); + c.set(rowId, ts); + break; + default: + throw new IOException("Unsupported parquet logical type: " + type.getOriginalType() + " for timestamp"); + } + c.isNull[rowId] = false; + c.isRepeating = false; + } else { + c.isNull[rowId] = true; + c.isRepeating = false; + c.noNulls = false; + } + rowId++; + left--; + } + } + /** * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. */