diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java index fb5800b140..3a79e5f136 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java @@ -29,12 +29,14 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMilliVector; import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.holders.NullableIntervalDayHolder; +import org.apache.arrow.vector.types.Types; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -130,7 +132,7 @@ public Object deserialize(Writable writable) { private void read(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) { switch (typeInfo.getCategory()) { case PRIMITIVE: - readPrimitive(arrowVector, hiveVector, typeInfo); + readPrimitive(arrowVector, hiveVector); break; case LIST: readList(arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo); @@ -149,15 +151,14 @@ private void read(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typ } } - private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) { - final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = - ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); + private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { + final Types.MinorType minorType = arrowVector.getMinorType(); final int size = arrowVector.getValueCount(); hiveVector.ensureSize(size, false); - switch (primitiveCategory) { - case BOOLEAN: + switch (minorType) { + case BIT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -169,7 +170,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case BYTE: + case TINYINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -181,7 +182,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case SHORT: + case SMALLINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -205,7 +206,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case LONG: + case BIGINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -217,19 +218,19 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case FLOAT: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - ((DoubleColumnVector) hiveVector).vector[i] = ((Float4Vector) arrowVector).get(i); + case FLOAT4: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + ((DoubleColumnVector) hiveVector).vector[i] = ((Float4Vector) arrowVector).get(i); + } } } - } break; - case DOUBLE: + case FLOAT8: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -241,9 +242,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case STRING: case VARCHAR: - case CHAR: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -255,7 +254,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case DATE: + case DATEDAY: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -267,7 +266,37 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case TIMESTAMP: + case TIMESTAMPMILLI: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + + // Time = second + sub-second + final long timeInMillis = ((TimeStampMilliVector) arrowVector).get(i); + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + int subSecondInNanos = (int) ((timeInMillis % MS_PER_SECOND) * NS_PER_MS); + long second = timeInMillis / MS_PER_SECOND; + + // A nanosecond value should not be negative + if (subSecondInNanos < 0) { + + // So add one second to the negative nanosecond value to make it positive + subSecondInNanos += NS_PER_SECOND; + + // Subtract one second from the second value because we added one second, + // then subtract one more second because of the ceiling in the division. + second -= 2; + } + timestampColumnVector.time[i] = second * MS_PER_SECOND; + timestampColumnVector.nanos[i] = subSecondInNanos; + } + } + } + break; + case TIMESTAMPNANO: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -297,7 +326,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case BINARY: + case VARBINARY: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -322,7 +351,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case INTERVAL_YEAR_MONTH: + case INTERVALYEAR: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -334,7 +363,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case INTERVAL_DAY_TIME: + case INTERVALDAY: { final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; final NullableIntervalDayHolder intervalDayHolder = new NullableIntervalDayHolder(); @@ -354,11 +383,8 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case VOID: - case TIMESTAMPLOCALTZ: - case UNKNOWN: default: - break; + throw new IllegalArgumentException(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index bd23011c93..6c168081df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -30,7 +30,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampMilliVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -175,7 +175,7 @@ private ArrowType toArrowType(TypeInfo typeInfo) { case DATE: return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: - return Types.MinorType.TIMESTAMPNANO.getType(); + return Types.MinorType.TIMESTAMPMILLI.getType(); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: @@ -430,23 +430,13 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case TIMESTAMP: { - final TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) arrowVector; + final TimeStampMilliVector timeStampMilliVector = (TimeStampMilliVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; for (int i = 0; i < size; i++) { if (hiveVector.isNull[i]) { - timeStampNanoVector.setNull(i); + timeStampMilliVector.setNull(i); } else { - // Time = second + sub-second - final long secondInMillis = timestampColumnVector.getTime(i); - final long secondInNanos = (secondInMillis - secondInMillis % 1000) * NS_PER_MS; // second - final long subSecondInNanos = timestampColumnVector.getNanos(i); // sub-second - - if ((secondInMillis > 0 && secondInNanos < 0) || (secondInMillis < 0 && secondInNanos > 0)) { - // If the timestamp cannot be represented in long nanosecond, set it as a null value - timeStampNanoVector.setNull(i); - } else { - timeStampNanoVector.set(i, secondInNanos + subSecondInNanos); - } + timeStampMilliVector.set(i, timestampColumnVector.getTime(i)); } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index 74f6624597..e6fd34b35c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -108,14 +108,10 @@ private final static long NEGATIVE_TIME_IN_MS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3); private final static Timestamp TIMESTAMP; private final static Timestamp NEGATIVE_TIMESTAMP_WITHOUT_NANOS; - private final static Timestamp NEGATIVE_TIMESTAMP_WITH_NANOS; static { TIMESTAMP = new Timestamp(TIME_IN_MS); - TIMESTAMP.setNanos(123456789); NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS); - NEGATIVE_TIMESTAMP_WITH_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS); - NEGATIVE_TIMESTAMP_WITH_NANOS.setNanos(123456789); } private final static Object[][] DTI_ROWS = { @@ -131,12 +127,6 @@ null, null }, - { - null, - new TimestampWritable(NEGATIVE_TIMESTAMP_WITH_NANOS), - null, - null - }, {null, null, null, null}, }; @@ -773,5 +763,4 @@ public void testListDecimal() throws SerDeException { initAndSerializeAndDeserialize(schema, toList(DECIMAL_ROWS)); } - }