diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 39c77b3..3b36d62 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3655,7 +3655,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "internal use only. When false, don't suppress fatal exceptions like\n" + "NullPointerException, etc so the query will fail and assure it will be noticed", true), - + HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED( + "hive.vectorized.execution.filesink.arrow.native.enabled", false, + "This flag should be set to true to enable the native vectorization\n" + + "of queries using the Arrow SerDe and FileSink.\n" + + "The default value is false."), HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control " + "whether to check, convert, and normalize partition value to conform to its column type in " + "partition operations including but not limited to insert, such as alter, describe etc."), diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 280119b..98f4729 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -110,29 +110,10 @@ private static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer - public static void beforeTest(boolean useArrow) throws Exception { + public static void beforeTest(HiveConf inputConf) throws Exception { + conf = inputConf; Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - if (confDir != null && !confDir.isEmpty()) { - HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); - } - - conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - if(useArrow) { - conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); - } else { - conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); - } - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); dataTypesFilePath = new Path(dataFileDir, "datatypes.txt"); @@ -141,6 +122,19 @@ public static void beforeTest(boolean useArrow) throws Exception { miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } + static HiveConf defaultConf() throws Exception { + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + HiveConf defaultConf = new HiveConf(); + defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + return defaultConf; + } + @Before public void setUp() throws Exception { hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); @@ -546,6 +540,8 @@ private int processQuery(String currentDatabase, String query, int numSplits, Ro rowProcessor.process(row); ++rowCount; } + //In arrow-mode this will throw exception unless all buffers have been released + //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader reader.close(); } LlapBaseInputFormat.close(handleId); diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index e69c686..c02980b 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.io.NullWritable; import org.junit.BeforeClass; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; @@ -40,7 +42,9 @@ @BeforeClass public static void beforeTest() throws Exception { - BaseJdbcWithMiniLlap.beforeTest(true); + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + BaseJdbcWithMiniLlap.beforeTest(conf); } @Override diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java index 809068f..d954d0e 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java @@ -25,6 +25,8 @@ import org.junit.Before; import org.junit.After; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; /** * TestJdbcWithMiniLlap for llap Row format. @@ -33,7 +35,9 @@ @BeforeClass public static void beforeTest() throws Exception { - BaseJdbcWithMiniLlap.beforeTest(false); + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); + BaseJdbcWithMiniLlap.beforeTest(conf); } @Override diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java new file mode 100644 index 0000000..55a2df8 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import java.math.BigDecimal; +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.common.type.Timestamp; +import java.util.List; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.junit.BeforeClass; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; + +/** + * TestJdbcWithMiniLlap for Arrow format with vectorized output sink + */ +public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap { + + + @BeforeClass + public static void beforeTest() throws Exception { + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + BaseJdbcWithMiniLlap.beforeTest(conf); + } + + @Override + protected InputFormat getInputFormat() { + //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + // Currently MAP type is not supported. Add it back when Arrow 1.0 is released. + // See: SPARK-21187 + @Override + public void testDataTypes() throws Exception { + createDataTypesTable("datatypes"); + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from datatypes"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + // Verify schema + String[][] colNameTypes = new String[][] { + {"datatypes.c1", "int"}, + {"datatypes.c2", "boolean"}, + {"datatypes.c3", "double"}, + {"datatypes.c4", "string"}, + {"datatypes.c5", "array"}, + {"datatypes.c6", "map"}, + {"datatypes.c7", "map"}, + {"datatypes.c8", "struct"}, + {"datatypes.c9", "tinyint"}, + {"datatypes.c10", "smallint"}, + {"datatypes.c11", "float"}, + {"datatypes.c12", "bigint"}, + {"datatypes.c13", "array>"}, + {"datatypes.c14", "map>"}, + {"datatypes.c15", "struct>"}, + {"datatypes.c16", "array,n:int>>"}, + {"datatypes.c17", "timestamp"}, + {"datatypes.c18", "decimal(16,7)"}, + {"datatypes.c19", "binary"}, + {"datatypes.c20", "date"}, + {"datatypes.c21", "varchar(20)"}, + {"datatypes.c22", "char(15)"}, + {"datatypes.c23", "binary"}, + }; + FieldDesc fieldDesc; + assertEquals(23, rowCollector.numColumns); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + fieldDesc = rowCollector.schema.getColumns().get(idx); + assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName()); + assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName()); + } + + // First row is all nulls + Object[] rowValues = rowCollector.rows.get(0); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + assertEquals("idx=" + idx, null, rowValues[idx]); + } + + // Second Row + rowValues = rowCollector.rows.get(1); + assertEquals(Integer.valueOf(-1), rowValues[0]); + assertEquals(Boolean.FALSE, rowValues[1]); + assertEquals(Double.valueOf(-1.1d), rowValues[2]); + assertEquals("", rowValues[3]); + + List c5Value = (List) rowValues[4]; + assertEquals(0, c5Value.size()); + + //Map c6Value = (Map) rowValues[5]; + //assertEquals(0, c6Value.size()); + + //Map c7Value = (Map) rowValues[6]; + //assertEquals(0, c7Value.size()); + + List c8Value = (List) rowValues[7]; + assertEquals(null, c8Value.get(0)); + assertEquals(null, c8Value.get(1)); + assertEquals(null, c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) -1), rowValues[8]); + assertEquals(Short.valueOf((short) -1), rowValues[9]); + assertEquals(Float.valueOf(-1.0f), rowValues[10]); + assertEquals(Long.valueOf(-1l), rowValues[11]); + + List c13Value = (List) rowValues[12]; + assertEquals(0, c13Value.size()); + + //Map c14Value = (Map) rowValues[13]; + //assertEquals(0, c14Value.size()); + + List c15Value = (List) rowValues[14]; + assertEquals(null, c15Value.get(0)); + assertEquals(null, c15Value.get(1)); + + //List c16Value = (List) rowValues[15]; + //assertEquals(0, c16Value.size()); + + assertEquals(null, rowValues[16]); + assertEquals(null, rowValues[17]); + assertEquals(null, rowValues[18]); + assertEquals(null, rowValues[19]); + assertEquals(null, rowValues[20]); + assertEquals(null, rowValues[21]); + assertEquals(null, rowValues[22]); + + // Third row + rowValues = rowCollector.rows.get(2); + assertEquals(Integer.valueOf(1), rowValues[0]); + assertEquals(Boolean.TRUE, rowValues[1]); + assertEquals(Double.valueOf(1.1d), rowValues[2]); + assertEquals("1", rowValues[3]); + + c5Value = (List) rowValues[4]; + assertEquals(2, c5Value.size()); + assertEquals(Integer.valueOf(1), c5Value.get(0)); + assertEquals(Integer.valueOf(2), c5Value.get(1)); + + //c6Value = (Map) rowValues[5]; + //assertEquals(2, c6Value.size()); + //assertEquals("x", c6Value.get(Integer.valueOf(1))); + //assertEquals("y", c6Value.get(Integer.valueOf(2))); + + //c7Value = (Map) rowValues[6]; + //assertEquals(1, c7Value.size()); + //assertEquals("v", c7Value.get("k")); + + c8Value = (List) rowValues[7]; + assertEquals("a", c8Value.get(0)); + assertEquals(Integer.valueOf(9), c8Value.get(1)); + assertEquals(Double.valueOf(2.2d), c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) 1), rowValues[8]); + assertEquals(Short.valueOf((short) 1), rowValues[9]); + assertEquals(Float.valueOf(1.0f), rowValues[10]); + assertEquals(Long.valueOf(1l), rowValues[11]); + + c13Value = (List) rowValues[12]; + assertEquals(2, c13Value.size()); + List listVal = (List) c13Value.get(0); + assertEquals("a", listVal.get(0)); + assertEquals("b", listVal.get(1)); + listVal = (List) c13Value.get(1); + assertEquals("c", listVal.get(0)); + assertEquals("d", listVal.get(1)); + + //c14Value = (Map) rowValues[13]; + //assertEquals(2, c14Value.size()); + //Map mapVal = (Map) c14Value.get(Integer.valueOf(1)); + //assertEquals(2, mapVal.size()); + //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11))); + //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13))); + //mapVal = (Map) c14Value.get(Integer.valueOf(2)); + //assertEquals(1, mapVal.size()); + //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21))); + + c15Value = (List) rowValues[14]; + assertEquals(Integer.valueOf(1), c15Value.get(0)); + listVal = (List) c15Value.get(1); + assertEquals(2, listVal.size()); + assertEquals(Integer.valueOf(2), listVal.get(0)); + assertEquals("x", listVal.get(1)); + + //c16Value = (List) rowValues[15]; + //assertEquals(2, c16Value.size()); + //listVal = (List) c16Value.get(0); + //assertEquals(2, listVal.size()); + //mapVal = (Map) listVal.get(0); + //assertEquals(0, mapVal.size()); + //assertEquals(Integer.valueOf(1), listVal.get(1)); + //listVal = (List) c16Value.get(1); + //mapVal = (Map) listVal.get(0); + //assertEquals(2, mapVal.size()); + //assertEquals("b", mapVal.get("a")); + //assertEquals("d", mapVal.get("c")); + //assertEquals(Integer.valueOf(2), listVal.get(1)); + + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"), rowValues[16]); + assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); + assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); + assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); + assertEquals("abc123", rowValues[20]); + assertEquals("abc123 ", rowValues[21]); + assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); + } + +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java new file mode 100644 index 0000000..1603703 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.filesink; + +import java.io.Serializable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.hive.llap.LlapOutputFormatService; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import java.util.List; +import java.util.ArrayList; +import org.apache.hadoop.hive.ql.io.arrow.Serializer; +import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.core.layout.AbstractStringLayout; + +/** + * Native Vectorized File Sink operator implementation for Arrow. + * Assumes output to LlapOutputFormatService + **/ +public class VectorFileSinkArrowOperator extends TerminalOperator + implements Serializable, VectorizationOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + private VectorFileSinkDesc vectorDesc; + public static final Logger LOG = LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName()); + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + private transient Serializer converter; + private transient RecordWriter recordWriter; + private transient boolean wroteData; + private transient String attemptId; + + public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) { + this(ctx); + this.conf = (FileSinkDesc) conf; + this.vContext = vContext; + this.vectorDesc = (VectorFileSinkDesc) vectorDesc; + } + + /** Kryo ctor. */ + @VisibleForTesting + public VectorFileSinkArrowOperator() { + super(); + } + + public VectorFileSinkArrowOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + public VectorizationContext getInputVectorizationContext() { + return vContext; + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + //attemptId identifies a RecordWriter initialized by LlapOutputFormatService + this.attemptId = hconf.get(LLAP_OF_ID_KEY); + try { + //Initialize column names and types + List typeInfos = new ArrayList<>(); + List fieldNames = new ArrayList<>(); + StructObjectInspector schema = (StructObjectInspector) inputObjInspectors[0]; + for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) { + StructField structField = schema.getAllStructFieldRefs().get(i); + fieldNames.add(structField.getFieldName()); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector()); + typeInfos.add(typeInfo); + } + //Initialize an Arrow serializer + converter = new Serializer(hconf, attemptId, typeInfos, fieldNames); + } catch (Exception e) { + LOG.error("Unable to initialize VectorFileSinkArrowOperator"); + throw new RuntimeException(e); + } + } + + @Override + public void process(Object data, int tag) throws HiveException { + //ArrowStreamReader expects at least the schema metadata, if this op writes no data, + //we need to send the schema to close the stream gracefully + VectorizedRowBatch batch = (VectorizedRowBatch) data; + try { + if(recordWriter == null) { + recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId); + } + //Convert the VectorizedRowBatch to a handle for the Arrow batch + ArrowWrapperWritable writable = converter.serializeBatch(batch, true); + //Pass the handle to the LlapOutputFormatService recordWriter + recordWriter.write(null, writable); + this.wroteData = true; + } catch(Exception e) { + LOG.error("Failed to convert VectorizedRowBatch to Arrow batch"); + throw new RuntimeException(e); + } + } + + @Override + protected void closeOp(boolean abort) throws HiveException { + try { + if(!wroteData) { + //Send a schema only batch to signal EOS with no data written + ArrowWrapperWritable writable = converter.emptyBatch(); + if(recordWriter == null) { + recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId); + } + recordWriter.write(null, writable); + } + } catch(Exception e) { + LOG.error("Failed to write Arrow stream schema"); + throw new RuntimeException(e); + } finally { + try { + //Close the recordWriter with null Reporter + recordWriter.close(null); + } catch(Exception e) { + LOG.error("Failed to close Arrow stream"); + throw new RuntimeException(e); + } + } + } + + @Override + public VectorDesc getVectorDesc() { + return vectorDesc; + } + + @Override + public OperatorType getType() { + return OperatorType.FILESINK; + } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 65a889e..08e0fb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -42,10 +42,12 @@ import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; @@ -70,6 +72,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.arrow.memory.BufferAllocator; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import java.util.ArrayList; import java.util.List; @@ -87,21 +90,39 @@ import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector; -class Serializer { +public class Serializer { private final int MAX_BUFFERED_ROWS; - // Schema - private final StructTypeInfo structTypeInfo; - private final int fieldSize; - // Hive columns private final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; private BufferAllocator allocator; + private List fieldTypeInfos; + private List fieldNames; + private int fieldSize; private final NullableMapVector rootVector; + //Constructor for non-serde serialization + public Serializer(Configuration conf, String attemptId, List typeInfos, List fieldNames) { + this.fieldTypeInfos = typeInfos; + this.fieldNames = fieldNames; + long childAllocatorLimit = HiveConf.getLongVar(conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator( + attemptId, + childAllocatorReservation, + childAllocatorLimit); + rootVector = NullableMapVector.empty(null, allocator); + //These last fields are unused in non-serde usage + vectorizedRowBatch = null; + vectorAssignRow = null; + MAX_BUFFERED_ROWS = 0; + } + Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); @@ -111,13 +132,14 @@ long childAllocatorReservation = 0L; //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed allocator = serDe.rootAllocator.newChildAllocator( - childAllocatorName, - childAllocatorReservation, - childAllocatorLimit); + childAllocatorName, + childAllocatorReservation, + childAllocatorLimit); // Schema - structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); - List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + StructTypeInfo structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); + fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + fieldNames = structTypeInfo.getAllStructFieldNames(); fieldSize = fieldTypeInfos.size(); // Init Arrow stuffs rootVector = NullableMapVector.empty(null, allocator); @@ -138,33 +160,66 @@ } } - private ArrowWrapperWritable serializeBatch() { + //Construct an emptyBatch which contains schema-only info + public ArrowWrapperWritable emptyBatch() { + rootVector.setValueCount(0); + for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++) { + final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex); + final String fieldName = fieldNames.get(fieldIndex); + final FieldType fieldType = toFieldType(fieldTypeInfo); + final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class); + arrowVector.setInitialCapacity(0); + arrowVector.allocateNew(); + } + VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); + } + + //Used for both: + //1. VectorizedRowBatch constructed by batching rows + //2. VectorizedRowBatch provided from upstream (isNative) + public ArrowWrapperWritable serializeBatch(VectorizedRowBatch vectorizedRowBatch, boolean isNative) { rootVector.setValueCount(0); for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; fieldIndex++) { final int projectedColumn = vectorizedRowBatch.projectedColumns[fieldIndex]; final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn]; - final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex); - final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex); + final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex); + final String fieldName = fieldNames.get(fieldIndex); final FieldType fieldType = toFieldType(fieldTypeInfo); + //Reuse existing FieldVector buffers + //since we always call setValue or setNull for each row + boolean fieldExists = false; + if(rootVector.getChild(fieldName) != null) { + fieldExists = true; + } final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class); - arrowVector.setInitialCapacity(batchSize); - arrowVector.allocateNew(); - write(arrowVector, hiveVector, fieldTypeInfo, batchSize); + if(fieldExists) { + arrowVector.setValueCount(isNative ? vectorizedRowBatch.size : batchSize); + } else { + arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size : batchSize); + arrowVector.allocateNew(); + } + write(arrowVector, hiveVector, fieldTypeInfo, isNative ? vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative); + } + if(!isNative) { + //Only mutate batches that are constructed by this serde + vectorizedRowBatch.reset(); + rootVector.setValueCount(batchSize); + } else { + rootVector.setValueCount(vectorizedRowBatch.size); } - vectorizedRowBatch.reset(); - rootVector.setValueCount(batchSize); batchSize = 0; VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); } - private FieldType toFieldType(TypeInfo typeInfo) { + private static FieldType toFieldType(TypeInfo typeInfo) { return new FieldType(true, toArrowType(typeInfo), null); } - private ArrowType toArrowType(TypeInfo typeInfo) { + private static ArrowType toArrowType(TypeInfo typeInfo) { switch (typeInfo.getCategory()) { case PRIMITIVE: switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { @@ -218,34 +273,35 @@ private ArrowType toArrowType(TypeInfo typeInfo) { } } - private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size) { + private static void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { switch (typeInfo.getCategory()) { case PRIMITIVE: - writePrimitive(arrowVector, hiveVector, typeInfo, size); + writePrimitive(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative); break; case LIST: - writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size); + writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; case STRUCT: - writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size); + writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; case UNION: - writeUnion(arrowVector, hiveVector, typeInfo, size); + writeUnion(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative); break; case MAP: - writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size); + writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; default: throw new IllegalArgumentException(); - } + } } - private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo, - int size) { + private static void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo, + int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo); final ListColumnVector structListVector = toStructListVector(hiveVector); - write(arrowVector, structListVector, structListTypeInfo, size); + write(arrowVector, structListVector, structListTypeInfo, size, vectorizedRowBatch, isNative); final ArrowBuf validityBuffer = arrowVector.getValidityBuffer(); for (int rowIndex = 0; rowIndex < size; rowIndex++) { @@ -257,8 +313,8 @@ private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTyp } } - private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, - int size) { + private static void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, + int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; final List objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos(); final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector; @@ -268,11 +324,11 @@ private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeIn final ColumnVector hiveObjectVector = hiveObjectVectors[tag]; final TypeInfo objectTypeInfo = objectTypeInfos.get(tag); - write(arrowVector, hiveObjectVector, objectTypeInfo, size); + write(arrowVector, hiveObjectVector, objectTypeInfo, size, vectorizedRowBatch, isNative); } - private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, - StructTypeInfo typeInfo, int size) { + private static void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, + StructTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final List fieldNames = typeInfo.getAllStructFieldNames(); final List fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos(); final ColumnVector[] hiveFieldVectors = hiveVector.fields; @@ -287,7 +343,7 @@ private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class); arrowFieldVector.setInitialCapacity(size); arrowFieldVector.allocateNew(); - write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size); + write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size, vectorizedRowBatch, isNative); } final ArrowBuf validityBuffer = arrowVector.getValidityBuffer(); @@ -300,8 +356,8 @@ private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, } } - private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, - int size) { + private static void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final int OFFSET_WIDTH = 4; final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); final ColumnVector hiveElementVector = hiveVector.child; @@ -310,7 +366,7 @@ private void writeList(ListVector arrowVector, ListColumnVector hiveVector, List arrowElementVector.setInitialCapacity(hiveVector.childCount); arrowElementVector.allocateNew(); - write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount); + write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative); final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer(); int nextOffset = 0; @@ -327,208 +383,244 @@ private void writeList(ListVector arrowVector, ListColumnVector hiveVector, List offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset); } - private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, - int size) { + //Handle cases for both internally constructed + //and externally provided (isNative) VectorRowBatch + private static void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); switch (primitiveCategory) { - case BOOLEAN: - { - final BitVector bitVector = (BitVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - bitVector.setNull(i); - } else { - bitVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + case BOOLEAN: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter, boolValueSetter); + return; + } + final BitVector bitVector = (BitVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + boolNullSetter.accept(i, arrowVector, hiveVector); + } else { + boolValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case BYTE: - { - final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - tinyIntVector.setNull(i); - } else { - tinyIntVector.set(i, (byte) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case BYTE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter, byteValueSetter); + return; + } + final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + byteNullSetter.accept(i, arrowVector, hiveVector); + } else { + byteValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case SHORT: - { - final SmallIntVector smallIntVector = (SmallIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - smallIntVector.setNull(i); - } else { - smallIntVector.set(i, (short) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case SHORT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter, shortValueSetter); + return; + } + final SmallIntVector smallIntVector = (SmallIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + shortNullSetter.accept(i, arrowVector, hiveVector); + } else { + shortValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case INT: - { - final IntVector intVector = (IntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intVector.setNull(i); - } else { - intVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case INT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter, intValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intNullSetter.accept(i, arrowVector, hiveVector); + } else { + intValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case LONG: - { - final BigIntVector bigIntVector = (BigIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - bigIntVector.setNull(i); - } else { - bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case LONG: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter, longValueSetter); + return; + } + final BigIntVector bigIntVector = (BigIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + longNullSetter.accept(i, arrowVector, hiveVector); + } else { + longValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case FLOAT: - { - final Float4Vector float4Vector = (Float4Vector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - float4Vector.setNull(i); - } else { - float4Vector.set(i, (float) ((DoubleColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case FLOAT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter, floatValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + floatNullSetter.accept(i, arrowVector, hiveVector); + } else { + floatValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case DOUBLE: - { - final Float8Vector float8Vector = (Float8Vector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - float8Vector.setNull(i); - } else { - float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case DOUBLE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, doubleNullSetter, doubleValueSetter); + return; + } + final Float8Vector float8Vector = (Float8Vector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + doubleNullSetter.accept(i, arrowVector, hiveVector); + } else { + doubleValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case STRING: - case VARCHAR: - case CHAR: - { - final VarCharVector varCharVector = (VarCharVector) arrowVector; - final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - varCharVector.setNull(i); - } else { - varCharVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]); - } - } + } + } + break; + //TODO Add CHAR padding conversion + case STRING: + case VARCHAR: + case CHAR: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, stringNullSetter, stringValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + stringNullSetter.accept(i, arrowVector, hiveVector); + } else { + stringValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case DATE: - { - final DateDayVector dateDayVector = (DateDayVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - dateDayVector.setNull(i); - } else { - dateDayVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case DATE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, dateValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + dateNullSetter.accept(i, arrowVector, hiveVector); + } else { + dateValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case TIMESTAMP: - { - final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - timeStampMicroTZVector.setNull(i); - } else { - // Time = second + sub-second - final long secondInMillis = timestampColumnVector.getTime(i); - final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; - final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS; - - if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { - // If the timestamp cannot be represented in long microsecond, set it as a null value - timeStampMicroTZVector.setNull(i); - } else { - timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); - } - } - } + } + } + break; + case TIMESTAMP: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, timestampNullSetter, timestampValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + timestampNullSetter.accept(i, arrowVector, hiveVector); + } else { + timestampValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case BINARY: - { - final VarBinaryVector varBinaryVector = (VarBinaryVector) arrowVector; - final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - varBinaryVector.setNull(i); - } else { - varBinaryVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]); - } - } + } + } + break; + case BINARY: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, binaryNullSetter, binaryValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + binaryNullSetter.accept(i, arrowVector, hiveVector); + } else { + binaryValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case DECIMAL: - { - final DecimalVector decimalVector = (DecimalVector) arrowVector; - final int scale = decimalVector.getScale(); - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - decimalVector.setNull(i); - } else { - decimalVector.set(i, - ((DecimalColumnVector) hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale)); - } - } + } + } + break; + case DECIMAL: + { + if(isNative) { + if(hiveVector instanceof DecimalColumnVector) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimalValueSetter); + } else { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimal64ValueSetter); } - break; - case INTERVAL_YEAR_MONTH: - { - final IntervalYearVector intervalYearVector = (IntervalYearVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intervalYearVector.setNull(i); - } else { - intervalYearVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + decimalNullSetter.accept(i, arrowVector, hiveVector); + } else if(hiveVector instanceof DecimalColumnVector) { + decimalValueSetter.accept(i, i, arrowVector, hiveVector); + } else if(hiveVector instanceof Decimal64ColumnVector) { + decimal64ValueSetter.accept(i, i, arrowVector, hiveVector); + } else { + throw new IllegalArgumentException("Unsupported vector column type: " + hiveVector.getClass().getName()); } - break; - case INTERVAL_DAY_TIME: - { - final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; - final IntervalDayTimeColumnVector intervalDayTimeColumnVector = - (IntervalDayTimeColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intervalDayVector.setNull(i); - } else { - final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i); - final long days = totalSeconds / SECOND_PER_DAY; - final long millis = - (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + - intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS; - intervalDayVector.set(i, (int) days, (int) millis); - } - } + } + } + break; + case INTERVAL_YEAR_MONTH: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalYearMonthNullSetter, intervalYearMonthValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector); + } else { + intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector); } - break; - case VOID: - case UNKNOWN: - case TIMESTAMPLOCALTZ: - default: - throw new IllegalArgumentException(); + } + } + break; + case INTERVAL_DAY_TIME: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalDayTimeNullSetter, intervalDayTimeValueSetter); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector); + } else { + intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector); + } + } + } + break; + case VOID: + case UNKNOWN: + case TIMESTAMPLOCALTZ: + default: + throw new IllegalArgumentException(); } } @@ -536,7 +628,7 @@ ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) { // if row is null, it means there are no more rows (closeOp()). // another case can be that the buffer is full. if (obj == null) { - return serializeBatch(); + return serializeBatch(vectorizedRowBatch, false); } List standardObjects = new ArrayList(); ObjectInspectorUtils.copyToStandardObject(standardObjects, obj, @@ -545,8 +637,218 @@ ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) { vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize); batchSize++; if (batchSize == MAX_BUFFERED_ROWS) { - return serializeBatch(); + return serializeBatch(vectorizedRowBatch, false); } return null; } + + //Use a provided nullSetter and valueSetter function to populate + //fieldVector from hiveVector + private static void writeGeneric(final FieldVector fieldVector, final ColumnVector hiveVector, final int size, final boolean selectedInUse, final int[] selected, final IntAndVectorsConsumer nullSetter, final IntIntAndVectorsConsumer valueSetter) + { + final boolean[] inputIsNull = hiveVector.isNull; + final int[] sel = selected; + + if (hiveVector.isRepeating) { + if (hiveVector.noNulls || !inputIsNull[0]) { + for(int i = 0; i < size; i++) { + //Fill n rows with value in row 0 + valueSetter.accept(i, 0, fieldVector, hiveVector); + } + } else { + for(int i = 0; i < size; i++) { + //Fill n rows with NULL + nullSetter.accept(i, fieldVector, hiveVector); + } + } + return; + } + + if (hiveVector.noNulls) { + if (selectedInUse) { + for(int logical = 0; logical < size; logical++) { + final int batchIndex = sel[logical]; + //Add row batchIndex + valueSetter.accept(logical, batchIndex, fieldVector, hiveVector); + } + } else { + for(int batchIndex = 0; batchIndex < size; batchIndex++) { + //Add row batchIndex + valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector); + } + } + } else { + if (selectedInUse) { + for(int logical = 0; logical < size; logical++) { + final int batchIndex = sel[logical]; + if (inputIsNull[batchIndex]) { + //Add NULL + nullSetter.accept(batchIndex, fieldVector, hiveVector); + } else { + //Add row batchIndex + valueSetter.accept(logical, batchIndex, fieldVector, hiveVector); + } + } + } else { + for(int batchIndex = 0; batchIndex < size; batchIndex++) { + if (inputIsNull[batchIndex]) { + //Add NULL + nullSetter.accept(batchIndex, fieldVector, hiveVector); + } else { + //Add row batchIndex + valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector); + } + } + } + } + } + + //nullSetters and valueSetter for each type + + //bool + private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector, hiveVector) + -> ((BitVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer boolValueSetter = (i, j, arrowVector, hiveVector) + -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //byte + private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector, hiveVector) + -> ((TinyIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer byteValueSetter = (i, j, arrowVector, hiveVector) + -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector) hiveVector).vector[j]); + + //short + private static final IntAndVectorsConsumer shortNullSetter = (i, arrowVector, hiveVector) + -> ((SmallIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer shortValueSetter = (i, j, arrowVector, hiveVector) + -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector) hiveVector).vector[j]); + + //int + private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector, hiveVector) + -> ((IntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer intValueSetter = (i, j, arrowVector, hiveVector) + -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //long + private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector, hiveVector) + -> ((BigIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer longValueSetter = (i, j, arrowVector, hiveVector) + -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector) hiveVector).vector[j]); + + //float + private static final IntAndVectorsConsumer floatNullSetter = (i, arrowVector, hiveVector) + -> ((Float4Vector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer floatValueSetter = (i, j, arrowVector, hiveVector) + -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector) hiveVector).vector[j]); + + //double + private static final IntAndVectorsConsumer doubleNullSetter = (i, arrowVector, hiveVector) + -> ((Float8Vector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j, arrowVector, hiveVector) + -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector) hiveVector).vector[j]); + + //string/varchar + private static final IntAndVectorsConsumer stringNullSetter = (i, arrowVector, hiveVector) + -> ((VarCharVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer stringValueSetter = (i, j, arrowVector, hiveVector) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]); + }; + + //fixed-length CHAR + //TODO Add padding conversion + private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector, hiveVector) + -> ((VarCharVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer charValueSetter = (i, j, arrowVector, hiveVector) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]); + }; + + //date + private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, hiveVector) + -> ((DateDayVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector) + -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //timestamp + private static final IntAndVectorsConsumer timestampNullSetter = (i, arrowVector, hiveVector) + -> ((TimeStampMicroTZVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector) + -> { + final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + // Time = second + sub-second + final long secondInMillis = timestampColumnVector.getTime(j); + final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; + final long subSecondInMicros = timestampColumnVector.getNanos(j) / NS_PER_MICROS; + if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { + // If the timestamp cannot be represented in long microsecond, set it as a null value + timeStampMicroTZVector.setNull(i); + } else { + timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); + } + }; + + //binary + private static final IntAndVectorsConsumer binaryNullSetter = (i, arrowVector, hiveVector) + -> ((VarBinaryVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j, arrowVector, hiveVector) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]); + }; + + //decimal and decimal64 + private static final IntAndVectorsConsumer decimalNullSetter = (i, arrowVector, hiveVector) + -> ((DecimalVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector) + -> { + final DecimalVector decimalVector = (DecimalVector) arrowVector; + final int scale = decimalVector.getScale(); + decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale)); + }; + private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, arrowVector, hiveVector) + -> { + final DecimalVector decimalVector = (DecimalVector) arrowVector; + final int scale = decimalVector.getScale(); + HiveDecimalWritable decimalHolder = new HiveDecimalWritable(); + decimalHolder.setFromLongAndScale(((Decimal64ColumnVector) hiveVector).vector[j], scale); + decimalVector.set(i, decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale)); + }; + + //interval year + private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i, arrowVector, hiveVector) + -> ((IntervalYearVector) arrowVector).setNull(i); + private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i, j, arrowVector, hiveVector) + -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //interval day + private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i, arrowVector, hiveVector) + -> ((IntervalDayVector) arrowVector).setNull(i); + private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j, arrowVector, hiveVector) + -> { + final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; + final IntervalDayTimeColumnVector intervalDayTimeColumnVector = + (IntervalDayTimeColumnVector) hiveVector; + long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j); + final long days = totalSeconds / SECOND_PER_DAY; + final long millis = + (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + + intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS; + intervalDayVector.set(i, (int) days, (int) millis); + }; + + //Used for setting null at arrowVector[i] + private interface IntAndVectorsConsumer { + void accept(int i, FieldVector arrowVector, ColumnVector hiveVector); + } + + //Used to copy value from hiveVector[j] -> arrowVector[i] + //since hiveVector might be referenced through vector.selected + private interface IntIntAndVectorsConsumer { + void accept(int i, int j, FieldVector arrowVector, ColumnVector hiveVector); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index eb5b1a8..9bb104d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator; @@ -4119,6 +4120,48 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, return true; } + private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc, + boolean isTezOrSpark, VectorizationContext vContext, + VectorFileSinkDesc vectorDesc) throws HiveException { + + // Various restrictions. + + boolean isVectorizationFileSinkArrowNativeEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED); + + String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + + String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName(); + + boolean isOkArrowFileSink = + serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe") && + isVectorizationFileSinkArrowNativeEnabled && + engine.equalsIgnoreCase("tez"); + + return isOkArrowFileSink; + } + + private Operator specializeArrowFileSinkOperator( + Operator op, VectorizationContext vContext, FileSinkDesc desc, + VectorFileSinkDesc vectorDesc) throws HiveException { + + Class> opClass = VectorFileSinkArrowOperator.class; + + Operator vectorOp = null; + try { + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), + vContext, vectorDesc); + } catch (Exception e) { + LOG.info("Vectorizer vectorizeOperator file sink class exception " + opClass.getSimpleName() + + " exception " + e); + throw new HiveException(e); + } + + return vectorOp; + } + private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) { if (vecExpr == null) { return false; @@ -5145,9 +5188,20 @@ private static VectorPTFInfo createVectorPTFInfo(Operator