diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index df7a559383..10433cebcf 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -397,6 +397,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_OUTPUT_FORMAT_ARROW.varname); } /** @@ -4101,6 +4102,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_RFA, Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false, + "Whether LLapOutputFormatService should output arrow batches"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 01a5b4c9c3..c28719023e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -1257,6 +1258,15 @@ public void closeOp(boolean abort) throws HiveException { if ( null != fpaths ) { rowOutWriters = fpaths.outWriters; rowOutWriters[0].write(recordValue); + } else if(recordValue instanceof ArrowWrapperWritable) { + //Because LLAP arrow output depends on the ThriftJDBCBinarySerDe code path + //this is required for 0 row outputs + //i.e. we need to write a 0 size batch to signal EOS to the consumer + for (FSPaths fsp : valToPaths.values()) { + for(RecordWriter writer : fsp.outWriters) { + writer.write(recordValue); + } + } } } catch (SerDeException | IOException e) { throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index fad0e5c24a..b379c7df6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -129,6 +129,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; +import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -7491,7 +7492,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); Class serdeClass = LazySimpleSerDe.class; if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) { - serdeClass = LazyBinarySerDe2.class; + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); + if(useArrow) { + serdeClass = ArrowColumnarBatchSerDe.class; + } else { + serdeClass = LazyBinarySerDe2.class; + } } table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, @@ -7573,10 +7579,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } } - if (SessionState.get().isHiveServerQuery() && - null != table_desc && - table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && - HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { + if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) { fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true); } else { fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false); @@ -7614,6 +7617,17 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) return output; } + private boolean useBatchingSerializer(String serdeClassName) { + return SessionState.get().isHiveServerQuery() && + hasSetBatchSerializer(serdeClassName); + } + + private boolean hasSetBatchSerializer(String serdeClassName) { + return (serdeClassName.equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && + HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) || + serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName()); + } + private ColsAndTypes deriveFileSinkColTypes( RowResolver inputRR, List field_schemas) throws SemanticException { ColsAndTypes result = new ColsAndTypes("", ""); diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 30d5eb5..fe39cce 100644 --- ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -51,6 +51,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.string.StringEncoder; +import org.apache.hadoop.mapred.RecordWriter; /** @@ -198,11 +199,16 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy LOG.debug("registering socket for: " + id); int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); @SuppressWarnings("rawtypes") - LlapRecordWriter writer = new LlapRecordWriter(id, - new ChunkedOutputStream( - new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), - sendBufferSize, id)); + RecordWriter writer = null; + if(useArrow) { + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); + } else { + writer = new LlapRecordWriter(id, + new ChunkedOutputStream( + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), sendBufferSize, id)); + } boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { diff --git ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java new file mode 100644 index 0000000000..eeebfb21c5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; + +public class WritableByteChannelAdapter implements WritableByteChannel { + + private static final Logger LOG = LoggerFactory.getLogger(WritableByteChannelAdapter.class); + private ChannelHandlerContext chc; + private final int maxPendingWrites; + private final Semaphore writeResources; + private boolean closed = false; + private final String id; + + private ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + writeResources.release(); + + if (future.isCancelled()) { + LOG.error("Write cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Write error on ID " + id, future.cause()); + } + } + }; + + private ChannelFutureListener closeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isCancelled()) { + LOG.error("Close cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Close failed on ID " + id, future.cause()); + } + } + }; + + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { + this.chc = chc; + this.maxPendingWrites = maxPendingWrites; + this.writeResources = new Semaphore(maxPendingWrites); + this.id = id; + } + + @Override + public int write(ByteBuffer src) throws IOException { + int size = src.remaining(); + takeWriteResources(1); + chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener); + return size; + } + + @Override + public boolean isOpen() { + return chc.channel().isOpen(); + } + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + closed = true; + + takeWriteResources(maxPendingWrites); + + try { + chc.close().addListener(closeListener); + } finally { + chc = null; + closed = true; + } + } + + private void takeWriteResources(int numResources) throws IOException { + try { + writeResources.acquire(numResources); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write resources for " + id); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java new file mode 100644 index 0000000000..5a2d8fe908 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.IOException; + +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.io.Writable; +import java.nio.channels.WritableByteChannel; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapArrowRecordWriter + implements RecordWriter { + public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); + + ArrowStreamWriter arrowStreamWriter; + WritableByteChannel out; + + public LlapArrowRecordWriter(WritableByteChannel out) { + this.out = out; + } + + @Override + public void close(Reporter reporter) throws IOException { + arrowStreamWriter.close(); + } + + @Override + public void write(K key, V value) throws IOException { + ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) value; + if (arrowStreamWriter == null) { + arrowStreamWriter = new ArrowStreamWriter(arrowWrapperWritable.getVectorSchemaRoot(), null, out); + } + arrowStreamWriter.writeBatch(); + } +}