diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 33c863d404..bd8d4acf6b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -397,6 +397,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_OUTPUT_FORMAT_ARROW.varname); } /** @@ -4165,6 +4166,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_RFA, Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true, + "Whether LLapOutputFormatService should output arrow batches"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java new file mode 100644 index 0000000000..0032bba49b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.IOException; + +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.io.Writable; +import java.nio.channels.WritableByteChannel; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapArrowRecordWriter + implements RecordWriter { + public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); + + ArrowStreamWriter arrowStreamWriter; + WritableByteChannel out; + + public LlapArrowRecordWriter(WritableByteChannel out) { + this.out = out; + } + + @Override + public void close(Reporter reporter) throws IOException { + arrowStreamWriter.close(); + } + + @Override + public void write(K key, V value) throws IOException { + ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) value; + if (arrowStreamWriter == null) { + arrowStreamWriter = new ArrowStreamWriter(arrowWrapperWritable.getVectorSchemaRoot(), null, out); + } + arrowStreamWriter.writeBatch(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 30d5eb5eab..d4e8e5af30 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -51,6 +51,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.string.StringEncoder; +import org.apache.hadoop.mapred.RecordWriter; /** @@ -198,11 +199,16 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy LOG.debug("registering socket for: " + id); int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); @SuppressWarnings("rawtypes") - LlapRecordWriter writer = new LlapRecordWriter(id, + RecordWriter writer = null; + if(useArrow) { + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); + } else { + writer = new LlapRecordWriter(id, new ChunkedOutputStream( - new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), - sendBufferSize, id)); + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), sendBufferSize, id)); + } boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { diff --git ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java new file mode 100644 index 0000000000..9c095ca2a7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; + +public class WritableByteChannelAdapter implements WritableByteChannel { + + private static final Logger LOG = LoggerFactory.getLogger(WritableByteChannelAdapter.class); + private ChannelHandlerContext chc; + private final int maxPendingWrites; + // This semaphore provides two functions: + // 1. Forces a cap on the number of outstanding async writes to channel + // 2. Ensures that channel isn't closed if there are any outstanding async writes + private final Semaphore writeResources; + private boolean closed = false; + private final String id; + + private ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + //Asynch write completed + //Up the semaphore + writeResources.release(); + + if (future.isCancelled()) { + LOG.error("Write cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Write error on ID " + id, future.cause()); + } + } + }; + + private ChannelFutureListener closeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isCancelled()) { + LOG.error("Close cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Close failed on ID " + id, future.cause()); + } + } + }; + + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { + this.chc = chc; + this.maxPendingWrites = maxPendingWrites; + this.writeResources = new Semaphore(maxPendingWrites); + this.id = id; + } + + @Override + public int write(ByteBuffer src) throws IOException { + int size = src.remaining(); + //Down the semaphore or block until available + takeWriteResources(1); + chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener); + return size; + } + + @Override + public boolean isOpen() { + return chc.channel().isOpen(); + } + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + closed = true; + //Block until all semaphore resources are released + //by outstanding async writes + takeWriteResources(maxPendingWrites); + + try { + chc.close().addListener(closeListener); + } finally { + chc = null; + closed = true; + } + } + + private void takeWriteResources(int numResources) throws IOException { + try { + writeResources.acquire(numResources); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write resources for " + id); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 01a5b4c9c3..fb5951339d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -1251,12 +1252,21 @@ public void closeOp(boolean abort) throws HiveException { // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size) // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe). - if (conf.isUsingThriftJDBCBinarySerDe()) { + if (conf.isUsingBatchingSerDe()) { try { recordValue = serializer.serialize(null, inputObjInspectors[0]); if ( null != fpaths ) { rowOutWriters = fpaths.outWriters; rowOutWriters[0].write(recordValue); + } else if(recordValue instanceof ArrowWrapperWritable) { + //Because LLAP arrow output depends on the ThriftJDBCBinarySerDe code path + //this is required for 0 row outputs + //i.e. we need to write a 0 size batch to signal EOS to the consumer + for (FSPaths fsp : valToPaths.values()) { + for(RecordWriter writer : fsp.outWriters) { + writer.write(recordValue); + } + } } } catch (SerDeException | IOException e) { throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index ff952b6950..5ba4b5fccb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; +import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -7492,7 +7493,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); Class serdeClass = LazySimpleSerDe.class; if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) { - serdeClass = LazyBinarySerDe2.class; + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); + if(useArrow) { + serdeClass = ArrowColumnarBatchSerDe.class; + } else { + serdeClass = LazyBinarySerDe2.class; + } } table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, @@ -7573,13 +7579,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd.setInsertOverwrite(true); } } - if (SessionState.get().isHiveServerQuery() && - null != table_desc && - table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && - HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { - fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true); + if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) { + fileSinkDesc.setIsUsingBatchingSerDe(true); } else { - fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false); + fileSinkDesc.setIsUsingBatchingSerDe(false); } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( @@ -7614,6 +7617,17 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) return output; } + private boolean useBatchingSerializer(String serdeClassName) { + return SessionState.get().isHiveServerQuery() && + hasSetBatchSerializer(serdeClassName); + } + + private boolean hasSetBatchSerializer(String serdeClassName) { + return (serdeClassName.equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && + HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) || + serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName()); + } + private ColsAndTypes deriveFileSinkColTypes( RowResolver inputRR, List field_schemas) throws SemanticException { ColsAndTypes result = new ColsAndTypes("", ""); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index fcb6de7d08..76504698de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -103,9 +103,9 @@ /** * Whether is a HiveServer query, and the destination table is - * indeed written using ThriftJDBCBinarySerDe + * indeed written using a row batching SerDe */ - private boolean isUsingThriftJDBCBinarySerDe = false; + private boolean isUsingBatchingSerDe = false; private boolean isInsertOverwrite = false; @@ -183,12 +183,12 @@ public void setHiveServerQuery(boolean isHiveServerQuery) { this.isHiveServerQuery = isHiveServerQuery; } - public boolean isUsingThriftJDBCBinarySerDe() { - return this.isUsingThriftJDBCBinarySerDe; + public boolean isUsingBatchingSerDe() { + return this.isUsingBatchingSerDe; } - public void setIsUsingThriftJDBCBinarySerDe(boolean isUsingThriftJDBCBinarySerDe) { - this.isUsingThriftJDBCBinarySerDe = isUsingThriftJDBCBinarySerDe; + public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) { + this.isUsingBatchingSerDe = isUsingBatchingSerDe; } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })