diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a18a6d7ec2..c79745beb7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4615,6 +4615,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" + "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" + "losing the ordering (external clients are responsible for guaranteeing the ordering)"), + LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR("hive.llap.external.client.use.hybrid.calendar", + false, + "Whether to use hybrid calendar for parsing of data/timestamps."), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrowBatch.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrowBatch.java new file mode 100644 index 0000000000..f41e6f9e3d --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrowBatch.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.commons.collections4.MultiSet; +import org.apache.commons.collections4.multiset.HashMultiSet; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * TestJdbcWithMiniLlap for Arrow format - uses batch record reader. + * We can obtain arrow batches and compare the results. + * + */ +public class TestJdbcWithMiniLlapVectorArrowBatch extends BaseJdbcWithMiniLlap { + + private final DateTimestampTestIO dateTimestampTestIO = new DateTimestampTestIO(); + + @BeforeClass public static void beforeTest() throws Exception { + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + BaseJdbcWithMiniLlap.beforeTest(conf); + } + + @Override protected InputFormat getInputFormat() { + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + private static class DateTimestampTestIO { + + final List inputDates; + final MultiSet> expectedHybridDates; + + final List inputTimestamps; + final MultiSet> expectedHybridTimestamps; + + public DateTimestampTestIO() { + // date stuff + this.inputDates = Arrays + .asList("2012-02-21", "2014-02-11", "1947-02-11", "8200-02-11", "1012-02-21", "1014-02-11", "0947-02-11", + "0200-02-11", "0001-01-01"); + + expectedHybridDates = new HashMultiSet<>(); + expectedHybridDates.add(Lists.newArrayList(15391)); + expectedHybridDates.add(Lists.newArrayList(16112)); + expectedHybridDates.add(Lists.newArrayList(-8360)); + expectedHybridDates.add(Lists.newArrayList(2275502)); + expectedHybridDates.add(Lists.newArrayList(-349846)); + expectedHybridDates.add(Lists.newArrayList(-349125)); + expectedHybridDates.add(Lists.newArrayList(-373597)); + expectedHybridDates.add(Lists.newArrayList(-646439)); + expectedHybridDates.add(Lists.newArrayList(-719164)); + + // timestamp stuff + + inputTimestamps = Arrays.asList("2012-02-21 07:08:09.123", "2014-02-11 07:08:09.123", "1947-02-11 07:08:09.123", + "8200-02-11 07:08:09.123", "1012-02-21 07:15:11.123", "1014-02-11 07:15:11.123", "947-02-11 07:15:11.123", + "0200-02-11 07:15:11.123", "0001-01-01 00:00:00"); + expectedHybridTimestamps = new HashMultiSet<>(); + expectedHybridTimestamps.add(Lists.newArrayList(1329808089123000L)); + expectedHybridTimestamps.add(Lists.newArrayList(1392102489123000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-722278310877000L)); + expectedHybridTimestamps.add(Lists.newArrayList(196603398489123000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-30226668288877000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-30164373888877000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-32278754688877000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-55852303488877000L)); + expectedHybridTimestamps.add(Lists.newArrayList(-62135769600000000L)); + + } + + private String getInsertQuery(String tableName, boolean dateQuery) { + String format = "INSERT INTO %s VALUES %s"; + List inputs = dateQuery ? inputDates : inputTimestamps; + String values = inputs.stream().map(d -> String.format("('%s')", d)).collect(Collectors.joining(",")); + return String.format(format, tableName, values); + } + } + + @Test public void testDates() throws Exception { + String tableName = "test_dates"; + testDateTimestampWithStorageFormat(tableName, "orc", + ImmutableMap.of(ConfVars.LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR.toString(), "true"), true); + testDateTimestampWithStorageFormat(tableName, "parquet", + ImmutableMap.of(ConfVars.LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR.toString(), "true"), true); + } + + @Test public void testTimestamps() throws Exception { + String tableName = "test_timestamps"; + testDateTimestampWithStorageFormat(tableName, "orc", + ImmutableMap.of(ConfVars.LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR.toString(), "true"), false); + testDateTimestampWithStorageFormat(tableName, "parquet", + ImmutableMap.of(ConfVars.LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR.toString(), "true"), false); + } + + private void testDateTimestampWithStorageFormat(String tableName, String storageFormat, + Map extraHiveConfs, boolean testDate) throws Exception { + + String createTableQuery = + String.format("create table %s (d %s) stored as %s", tableName, testDate ? "date" : "timestamp", storageFormat); + + executeSQL(createTableQuery, dateTimestampTestIO.getInsertQuery(tableName, testDate)); + + MultiSet> llapResult = runQueryUsingLlapArrowBatchReader("select * from " + tableName, extraHiveConfs); + assertEquals(testDate ? dateTimestampTestIO.expectedHybridDates : dateTimestampTestIO.expectedHybridTimestamps, + llapResult); + executeSQL("drop table " + tableName); + } + + private MultiSet> executeQuerySQL(String query) throws SQLException { + MultiSet> rows = new HashMultiSet<>(); + try (Statement stmt = hs2Conn.createStatement()) { + ResultSet rs = stmt.executeQuery(query); + int columnCount = rs.getMetaData().getColumnCount(); + while (rs.next()) { + List oneRow = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + oneRow.add(rs.getObject(i)); + } + rows.add(oneRow); + } + } + return rows; + } + + private void executeSQL(String query, String... moreQueries) throws SQLException { + try (Statement stmt = hs2Conn.createStatement()) { + stmt.execute(query); + if (moreQueries != null) { + for (String q : moreQueries) { + stmt.execute(q); + } + } + } + } + + private MultiSet> runQueryUsingLlapArrowBatchReader(String query, Map extraHiveConfs) + throws Exception { + String url = miniHS2.getJdbcURL(); + + if (extraHiveConfs != null) { + url = url + "?" + extraHiveConfs.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(";")); + } + + String user = System.getProperty("user.name"); + String pwd = user; + String handleId = UUID.randomUUID().toString(); + + // Get splits + JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, query); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); + job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "false"); + + BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(Long.MAX_VALUE) + .newChildAllocator(UUID.randomUUID().toString(), 0, Long.MAX_VALUE); + + LlapBaseInputFormat llapBaseInputFormat = new LlapBaseInputFormat(true, allocator); + InputSplit[] splits = llapBaseInputFormat.getSplits(job, 1); + + assertTrue(splits.length > 0); + + MultiSet> queryResult = new HashMultiSet<>(); + for (InputSplit split : splits) { + System.out.println("Processing split " + Arrays.toString(split.getLocations())); + RecordReader reader = llapBaseInputFormat.getRecordReader(split, job, null); + ArrowWrapperWritable wrapperWritable = new ArrowWrapperWritable(); + while (reader.next(NullWritable.get(), wrapperWritable)) { + queryResult.addAll(collectResultFromArrowVector(wrapperWritable)); + } + reader.close(); + } + LlapBaseInputFormat.close(handleId); + return queryResult; + } + + private MultiSet> collectResultFromArrowVector(ArrowWrapperWritable wrapperWritable) { + List fieldVectors = wrapperWritable.getVectorSchemaRoot().getFieldVectors(); + MultiSet> result = new HashMultiSet<>(); + int valueCount = fieldVectors.get(0).getValueCount(); + for (int recordIndex = 0; recordIndex < valueCount; recordIndex++) { + List row = new ArrayList<>(); + for (FieldVector fieldVector : fieldVectors) { + row.add(fieldVector.getObject(recordIndex)); + } + result.add(row); + } + return result; + } + + @Override public void testLlapInputFormatEndToEnd() throws Exception { + // to be implemented for this reader + } + + @Override public void testNonAsciiStrings() throws Exception { + // to be implemented for this reader + } + + @Override public void testEscapedStrings() throws Exception { + // to be implemented for this reader + } + + @Override public void testDataTypes() throws Exception { + // to be implemented for this reader + } + + @Override public void testComplexQuery() throws Exception { + // to be implemented for this reader + } + + @Override public void testKillQuery() throws Exception { + // to be implemented for this reader + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 035097759a..8e6f18ef09 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; @@ -87,6 +88,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; @@ -111,6 +113,7 @@ private List fieldNames; private int fieldSize; + private boolean useHybridCalendar; private final StructVector rootVector; private final DecimalHolder decimalHolder = new DecimalHolder(); @@ -119,6 +122,7 @@ public Serializer(Configuration conf, String attemptId, List typeInfos this.fieldTypeInfos = typeInfos; this.fieldNames = fieldNames; long childAllocatorLimit = HiveConf.getLongVar(conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); + this.useHybridCalendar = HiveConf.getBoolVar(conf, LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR); //Use per-task allocator for accounting only, no need to reserve per-task memory long childAllocatorReservation = 0L; //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed @@ -136,6 +140,7 @@ public Serializer(Configuration conf, String attemptId, List typeInfos Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); + this.useHybridCalendar = HiveConf.getBoolVar(serDe.conf, LLAP_EXTERNAL_CLIENT_USE_HYBRID_CALENDAR); ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); String childAllocatorName = Thread.currentThread().getName(); //Use per-task allocator for accounting only, no need to reserve per-task memory @@ -595,6 +600,9 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case DATE: { + // and since hive always provides data in proleptic calendar format + // set the usingProlepticCalendar flag for conversion to hybrid if required in dateValueSetter + ((DateColumnVector) hiveVector).setUsingProlepticCalendar(true); if(isNative) { writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, dateValueSetter, typeInfo); return; @@ -610,6 +618,9 @@ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, Ty break; case TIMESTAMP: { + // and since hive always provides data in proleptic calendar format + // set the usingProlepticCalendar flag for conversion to hybrid if required in timestampValueSetter + ((TimestampColumnVector) hiveVector).setUsingProlepticCalendar(true); if(isNative) { writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, timestampNullSetter, timestampValueSetter, typeInfo); return; @@ -857,16 +868,31 @@ private static void writeGeneric(final FieldVector fieldVector, final ColumnVect //date private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, hiveVector) -> ((DateDayVector) arrowVector).setNull(i); - private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector, typeInfo) - -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + private final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector, typeInfo) -> { + + DateColumnVector dateColumnVector = (DateColumnVector) hiveVector; + // useHybridCalendar - means the client wants data in hybrid calendar format + if (useHybridCalendar && dateColumnVector.isUsingProlepticCalendar()) { + dateColumnVector.changeCalendar(false, true); + } + + ((DateDayVector) arrowVector).set(i, (int) (dateColumnVector).vector[j]); + }; //timestamp private static final IntAndVectorsConsumer timestampNullSetter = (i, arrowVector, hiveVector) -> ((TimeStampMicroTZVector) arrowVector).setNull(i); - private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + private final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector, typeInfo) -> { final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + + // useHybridCalendar - means the client wants data in hybrid calendar format + if (useHybridCalendar && timestampColumnVector.usingProlepticCalendar()) { + timestampColumnVector.changeCalendar(false , true); + } + // Time = second + sub-second final long secondInMillis = timestampColumnVector.getTime(j); final long nanos = timestampColumnVector.getNanos(j);