diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index afb9837ce4..9dfece9d46 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -217,7 +217,7 @@ public void testDataTypes() throws Exception { //assertEquals("d", mapVal.get("c")); //assertEquals(Integer.valueOf(2), listVal.get(1)); - assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"), rowValues[16]); assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java index b093ebbd27..470f31cf95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java @@ -83,9 +83,13 @@ public static final Logger LOG = LoggerFactory.getLogger(ArrowColumnarBatchSerDe.class.getName()); private static final String DEFAULT_ARROW_FIELD_NAME = "[DEFAULT]"; - static final int MS_PER_SECOND = 1_000; + static final int MILLIS_PER_SECOND = 1_000; + static final int MICROS_PER_SECOND = 1_000_000; static final int NS_PER_SECOND = 1_000_000_000; - static final int NS_PER_MS = 1_000_000; + + static final int NS_PER_MILLIS = NS_PER_SECOND / MILLIS_PER_SECOND; + static final int NS_PER_MICROS = NS_PER_SECOND / MICROS_PER_SECOND; + static final int MICROS_PER_MILLIS = MICROS_PER_SECOND / MILLIS_PER_SECOND; static final int SECOND_PER_DAY = 24 * 60 * 60; BufferAllocator rootAllocator; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java index fb5800b140..6e09d3991f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java @@ -29,12 +29,15 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.holders.NullableIntervalDayHolder; +import org.apache.arrow.vector.types.Types; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -53,10 +56,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; @@ -65,8 +66,10 @@ import java.util.List; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MICROS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_SECOND; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo; @@ -130,7 +133,7 @@ public Object deserialize(Writable writable) { private void read(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) { switch (typeInfo.getCategory()) { case PRIMITIVE: - readPrimitive(arrowVector, hiveVector, typeInfo); + readPrimitive(arrowVector, hiveVector); break; case LIST: readList(arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo); @@ -149,15 +152,14 @@ private void read(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typ } } - private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) { - final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = - ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); + private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { + final Types.MinorType minorType = arrowVector.getMinorType(); final int size = arrowVector.getValueCount(); hiveVector.ensureSize(size, false); - switch (primitiveCategory) { - case BOOLEAN: + switch (minorType) { + case BIT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -169,7 +171,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case BYTE: + case TINYINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -181,7 +183,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case SHORT: + case SMALLINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -205,7 +207,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case LONG: + case BIGINT: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -217,19 +219,19 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case FLOAT: - { - for (int i = 0; i < size; i++) { - if (arrowVector.isNull(i)) { - VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); - } else { - hiveVector.isNull[i] = false; - ((DoubleColumnVector) hiveVector).vector[i] = ((Float4Vector) arrowVector).get(i); + case FLOAT4: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + ((DoubleColumnVector) hiveVector).vector[i] = ((Float4Vector) arrowVector).get(i); + } } } - } break; - case DOUBLE: + case FLOAT8: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -241,9 +243,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case STRING: case VARCHAR: - case CHAR: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -255,7 +255,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case DATE: + case DATEDAY: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -267,7 +267,65 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case TIMESTAMP: + case TIMESTAMPMILLI: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + + // Time = second + sub-second + final long timeInMillis = ((TimeStampMilliVector) arrowVector).get(i); + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + int subSecondInNanos = (int) ((timeInMillis % MILLIS_PER_SECOND) * NS_PER_MILLIS); + long second = timeInMillis / MILLIS_PER_SECOND; + + // A nanosecond value should not be negative + if (subSecondInNanos < 0) { + + // So add one second to the negative nanosecond value to make it positive + subSecondInNanos += NS_PER_SECOND; + + // Subtract one second from the second value because we added one second + second -= 1; + } + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; + timestampColumnVector.nanos[i] = subSecondInNanos; + } + } + } + break; + case TIMESTAMPMICRO: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + + // Time = second + sub-second + final long timeInMicros = ((TimeStampMicroVector) arrowVector).get(i); + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + int subSecondInNanos = (int) ((timeInMicros % MICROS_PER_SECOND) * NS_PER_MICROS); + long second = timeInMicros / MICROS_PER_SECOND; + + // A nanosecond value should not be negative + if (subSecondInNanos < 0) { + + // So add one second to the negative nanosecond value to make it positive + subSecondInNanos += NS_PER_SECOND; + + // Subtract one second from the second value because we added one second + second -= 1; + } + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; + timestampColumnVector.nanos[i] = subSecondInNanos; + } + } + } + break; + case TIMESTAMPNANO: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -287,17 +345,16 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ // So add one second to the negative nanosecond value to make it positive subSecondInNanos += NS_PER_SECOND; - // Subtract one second from the second value because we added one second, - // then subtract one more second because of the ceiling in the division. - second -= 2; + // Subtract one second from the second value because we added one second + second -= 1; } - timestampColumnVector.time[i] = second * MS_PER_SECOND; + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; timestampColumnVector.nanos[i] = subSecondInNanos; } } } break; - case BINARY: + case VARBINARY: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -322,7 +379,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case INTERVAL_YEAR_MONTH: + case INTERVALYEAR: { for (int i = 0; i < size; i++) { if (arrowVector.isNull(i)) { @@ -334,7 +391,7 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ } } break; - case INTERVAL_DAY_TIME: + case INTERVALDAY: { final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; final NullableIntervalDayHolder intervalDayHolder = new NullableIntervalDayHolder(); @@ -346,19 +403,16 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, Typ hiveVector.isNull[i] = false; intervalDayVector.get(i, intervalDayHolder); final long seconds = intervalDayHolder.days * SECOND_PER_DAY + - intervalDayHolder.milliseconds / MS_PER_SECOND; - final int nanos = (intervalDayHolder.milliseconds % 1_000) * NS_PER_MS; + intervalDayHolder.milliseconds / MILLIS_PER_SECOND; + final int nanos = (intervalDayHolder.milliseconds % 1_000) * NS_PER_MILLIS; intervalDayTime.set(seconds, nanos); ((IntervalDayTimeColumnVector) hiveVector).set(i, intervalDayTime); } } } break; - case VOID: - case TIMESTAMPLOCALTZ: - case UNKNOWN: default: - break; + throw new IllegalArgumentException(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index bd23011c93..e6af916ce8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -30,7 +30,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -74,8 +74,10 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MICROS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListVector; @@ -175,7 +177,8 @@ private ArrowType toArrowType(TypeInfo typeInfo) { case DATE: return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: - return Types.MinorType.TIMESTAMPNANO.getType(); + // HIVE-19723: Prefer microsecond because Spark supports it + return Types.MinorType.TIMESTAMPMICRO.getType(); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: @@ -430,22 +433,22 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case TIMESTAMP: { - final TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) arrowVector; + final TimeStampMicroVector timeStampMicroVector = (TimeStampMicroVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; for (int i = 0; i < size; i++) { if (hiveVector.isNull[i]) { - timeStampNanoVector.setNull(i); + timeStampMicroVector.setNull(i); } else { // Time = second + sub-second final long secondInMillis = timestampColumnVector.getTime(i); - final long secondInNanos = (secondInMillis - secondInMillis % 1000) * NS_PER_MS; // second - final long subSecondInNanos = timestampColumnVector.getNanos(i); // sub-second + final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; + final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS; - if ((secondInMillis > 0 && secondInNanos < 0) || (secondInMillis < 0 && secondInNanos > 0)) { - // If the timestamp cannot be represented in long nanosecond, set it as a null value - timeStampNanoVector.setNull(i); + if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { + // If the timestamp cannot be represented in long microsecond, set it as a null value + timeStampMicroVector.setNull(i); } else { - timeStampNanoVector.set(i, secondInNanos + subSecondInNanos); + timeStampMicroVector.set(i, secondInMicros + subSecondInMicros); } } } @@ -502,8 +505,8 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i); final long days = totalSeconds / SECOND_PER_DAY; final long millis = - (totalSeconds - days * SECOND_PER_DAY) * MS_PER_SECOND + - intervalDayTimeColumnVector.getNanos(i) / NS_PER_MS; + (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + + intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS; intervalDayVector.set(i, (int) days, (int) millis); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index 74f6624597..ce25c3e8f9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -104,39 +105,29 @@ {null, null, null}, }; - private final static long TIME_IN_MS = TimeUnit.DAYS.toMillis(365 + 31 + 3); - private final static long NEGATIVE_TIME_IN_MS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3); + private final static long TIME_IN_MILLIS = TimeUnit.DAYS.toMillis(365 + 31 + 3); + private final static long NEGATIVE_TIME_IN_MILLIS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3); private final static Timestamp TIMESTAMP; private final static Timestamp NEGATIVE_TIMESTAMP_WITHOUT_NANOS; - private final static Timestamp NEGATIVE_TIMESTAMP_WITH_NANOS; static { - TIMESTAMP = new Timestamp(TIME_IN_MS); - TIMESTAMP.setNanos(123456789); - NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS); - NEGATIVE_TIMESTAMP_WITH_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS); - NEGATIVE_TIMESTAMP_WITH_NANOS.setNanos(123456789); + TIMESTAMP = new Timestamp(TIME_IN_MILLIS); + NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MILLIS); } private final static Object[][] DTI_ROWS = { { - new DateWritable(DateWritable.millisToDays(TIME_IN_MS)), + new DateWritable(DateWritable.millisToDays(TIME_IN_MILLIS)), new TimestampWritable(TIMESTAMP), new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)), new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000)) }, { - new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MS)), + new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MILLIS)), new TimestampWritable(NEGATIVE_TIMESTAMP_WITHOUT_NANOS), null, null }, - { - null, - new TimestampWritable(NEGATIVE_TIMESTAMP_WITH_NANOS), - null, - null - }, {null, null, null, null}, }; @@ -506,6 +497,25 @@ public void testPrimitiveDTI() throws SerDeException { initAndSerializeAndDeserialize(schema, DTI_ROWS); } + @Test + public void testPrimitiveRandomTimestamp() throws SerDeException { + String[][] schema = { + {"timestamp1", "timestamp"}, + }; + + int size = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE); + Random rand = new Random(294722773L); + Object[][] rows = new Object[size][]; + for (int i = 0; i < size; i++) { + long millis = ((long) rand.nextInt(Integer.MAX_VALUE)) * 1000; + Timestamp timestamp = new Timestamp(rand.nextBoolean() ? millis : -millis); + timestamp.setNanos(rand.nextInt(1000) * 1000); + rows[i] = new Object[] {new TimestampWritable(timestamp)}; + } + + initAndSerializeAndDeserialize(schema, rows); + } + @Test public void testPrimitiveDecimal() throws SerDeException { String[][] schema = {