diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index afb9837ce4..9dfece9d46 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -217,7 +217,7 @@ public void testDataTypes() throws Exception { //assertEquals("d", mapVal.get("c")); //assertEquals(Integer.valueOf(2), listVal.get(1)); - assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"), rowValues[16]); assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java index b093ebbd27..470f31cf95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java @@ -83,9 +83,13 @@ public static final Logger LOG = LoggerFactory.getLogger(ArrowColumnarBatchSerDe.class.getName()); private static final String DEFAULT_ARROW_FIELD_NAME = "[DEFAULT]"; - static final int MS_PER_SECOND = 1_000; + static final int MILLIS_PER_SECOND = 1_000; + static final int MICROS_PER_SECOND = 1_000_000; static final int NS_PER_SECOND = 1_000_000_000; - static final int NS_PER_MS = 1_000_000; + + static final int NS_PER_MILLIS = NS_PER_SECOND / MILLIS_PER_SECOND; + static final int NS_PER_MICROS = NS_PER_SECOND / MICROS_PER_SECOND; + static final int MICROS_PER_MILLIS = MICROS_PER_SECOND / MILLIS_PER_SECOND; static final int SECOND_PER_DAY = 24 * 60 * 60; BufferAllocator rootAllocator; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java index 3a79e5f136..6e09d3991f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java @@ -29,6 +29,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TimeStampMilliVector; import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; @@ -55,10 +56,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; @@ -67,8 +66,10 @@ import java.util.List; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MICROS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_SECOND; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo; @@ -277,8 +278,8 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { // Time = second + sub-second final long timeInMillis = ((TimeStampMilliVector) arrowVector).get(i); final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - int subSecondInNanos = (int) ((timeInMillis % MS_PER_SECOND) * NS_PER_MS); - long second = timeInMillis / MS_PER_SECOND; + int subSecondInNanos = (int) ((timeInMillis % MILLIS_PER_SECOND) * NS_PER_MILLIS); + long second = timeInMillis / MILLIS_PER_SECOND; // A nanosecond value should not be negative if (subSecondInNanos < 0) { @@ -286,11 +287,39 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { // So add one second to the negative nanosecond value to make it positive subSecondInNanos += NS_PER_SECOND; - // Subtract one second from the second value because we added one second, - // then subtract one more second because of the ceiling in the division. - second -= 2; + // Subtract one second from the second value because we added one second + second -= 1; } - timestampColumnVector.time[i] = second * MS_PER_SECOND; + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; + timestampColumnVector.nanos[i] = subSecondInNanos; + } + } + } + break; + case TIMESTAMPMICRO: + { + for (int i = 0; i < size; i++) { + if (arrowVector.isNull(i)) { + VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i); + } else { + hiveVector.isNull[i] = false; + + // Time = second + sub-second + final long timeInMicros = ((TimeStampMicroVector) arrowVector).get(i); + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + int subSecondInNanos = (int) ((timeInMicros % MICROS_PER_SECOND) * NS_PER_MICROS); + long second = timeInMicros / MICROS_PER_SECOND; + + // A nanosecond value should not be negative + if (subSecondInNanos < 0) { + + // So add one second to the negative nanosecond value to make it positive + subSecondInNanos += NS_PER_SECOND; + + // Subtract one second from the second value because we added one second + second -= 1; + } + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; timestampColumnVector.nanos[i] = subSecondInNanos; } } @@ -316,11 +345,10 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { // So add one second to the negative nanosecond value to make it positive subSecondInNanos += NS_PER_SECOND; - // Subtract one second from the second value because we added one second, - // then subtract one more second because of the ceiling in the division. - second -= 2; + // Subtract one second from the second value because we added one second + second -= 1; } - timestampColumnVector.time[i] = second * MS_PER_SECOND; + timestampColumnVector.time[i] = second * MILLIS_PER_SECOND; timestampColumnVector.nanos[i] = subSecondInNanos; } } @@ -375,8 +403,8 @@ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector) { hiveVector.isNull[i] = false; intervalDayVector.get(i, intervalDayHolder); final long seconds = intervalDayHolder.days * SECOND_PER_DAY + - intervalDayHolder.milliseconds / MS_PER_SECOND; - final int nanos = (intervalDayHolder.milliseconds % 1_000) * NS_PER_MS; + intervalDayHolder.milliseconds / MILLIS_PER_SECOND; + final int nanos = (intervalDayHolder.milliseconds % 1_000) * NS_PER_MILLIS; intervalDayTime.set(seconds, nanos); ((IntervalDayTimeColumnVector) hiveVector).set(i, intervalDayTime); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 6c168081df..e6af916ce8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -30,7 +30,7 @@ import org.apache.arrow.vector.IntervalDayVector; import org.apache.arrow.vector.IntervalYearVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -74,8 +74,10 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND; -import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MICROS; +import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListVector; @@ -175,7 +177,8 @@ private ArrowType toArrowType(TypeInfo typeInfo) { case DATE: return Types.MinorType.DATEDAY.getType(); case TIMESTAMP: - return Types.MinorType.TIMESTAMPMILLI.getType(); + // HIVE-19723: Prefer microsecond because Spark supports it + return Types.MinorType.TIMESTAMPMICRO.getType(); case BINARY: return Types.MinorType.VARBINARY.getType(); case DECIMAL: @@ -430,13 +433,23 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case TIMESTAMP: { - final TimeStampMilliVector timeStampMilliVector = (TimeStampMilliVector) arrowVector; + final TimeStampMicroVector timeStampMicroVector = (TimeStampMicroVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; for (int i = 0; i < size; i++) { if (hiveVector.isNull[i]) { - timeStampMilliVector.setNull(i); + timeStampMicroVector.setNull(i); } else { - timeStampMilliVector.set(i, timestampColumnVector.getTime(i)); + // Time = second + sub-second + final long secondInMillis = timestampColumnVector.getTime(i); + final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; + final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS; + + if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { + // If the timestamp cannot be represented in long microsecond, set it as a null value + timeStampMicroVector.setNull(i); + } else { + timeStampMicroVector.set(i, secondInMicros + subSecondInMicros); + } } } } @@ -492,8 +505,8 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i); final long days = totalSeconds / SECOND_PER_DAY; final long millis = - (totalSeconds - days * SECOND_PER_DAY) * MS_PER_SECOND + - intervalDayTimeColumnVector.getNanos(i) / NS_PER_MS; + (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + + intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS; intervalDayVector.set(i, (int) days, (int) millis); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index 4b430eb6ad..e50e43e0df 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -105,25 +105,25 @@ {null, null, null}, }; - private final static long TIME_IN_MS = TimeUnit.DAYS.toMillis(365 + 31 + 3); - private final static long NEGATIVE_TIME_IN_MS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3); + private final static long TIME_IN_MILLIS = TimeUnit.DAYS.toMillis(365 + 31 + 3); + private final static long NEGATIVE_TIME_IN_MILLIS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3); private final static Timestamp TIMESTAMP; private final static Timestamp NEGATIVE_TIMESTAMP_WITHOUT_NANOS; static { - TIMESTAMP = new Timestamp(TIME_IN_MS); - NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS); + TIMESTAMP = new Timestamp(TIME_IN_MILLIS); + NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MILLIS); } private final static Object[][] DTI_ROWS = { { - new DateWritable(DateWritable.millisToDays(TIME_IN_MS)), + new DateWritable(DateWritable.millisToDays(TIME_IN_MILLIS)), new TimestampWritable(TIMESTAMP), new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)), new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000)) }, { - new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MS)), + new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MILLIS)), new TimestampWritable(NEGATIVE_TIMESTAMP_WITHOUT_NANOS), null, null @@ -508,7 +508,9 @@ public void testPrimitiveRandomTimestamp() throws SerDeException { Object[][] rows = new Object[size][]; for (int i = 0; i < size; i++) { long millis = ((long) rand.nextInt(Integer.MAX_VALUE)) * 1000; - rows[i] = new Object[] {new TimestampWritable(new Timestamp(millis))}; + Timestamp timestamp = new Timestamp(rand.nextBoolean() ? millis : -millis); + timestamp.setNanos(rand.nextInt(1000) * 1000); + rows[i] = new Object[] {new TimestampWritable(timestamp)}; } initAndSerializeAndDeserialize(schema, rows);