diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 5924d06371..b5d2386245 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -627,7 +627,9 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws druidCluster.start(); } - if(clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + if (clusterType == MiniClusterType.kafka + || clusterType == MiniClusterType.druidKafka + || clusterType == MiniClusterType.druidLocal) { kafkaCluster = new SingleNodeKafkaCluster("kafka", logDir + "/kafka-cluster", setup.zkPort diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java index c401df9850..c4a4a6ffa3 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java @@ -22,6 +22,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.io.NullWritable; @@ -56,7 +59,7 @@ * Records will be returned as bytes array. */ @SuppressWarnings("WeakerAccess") public class KafkaInputFormat extends InputFormat - implements org.apache.hadoop.mapred.InputFormat { + implements org.apache.hadoop.mapred.InputFormat, VectorizedInputFormatInterface { private static final Logger LOG = LoggerFactory.getLogger(KafkaInputFormat.class); @@ -190,6 +193,10 @@ @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) { + if (Utilities.getIsVectorized(jobConf)) { + //noinspection unchecked + return (RecordReader) new VectorizedKafkaRecordReader((KafkaInputSplit) inputSplit, jobConf); + } return new KafkaRecordReader((KafkaInputSplit) inputSplit, jobConf); } @@ -205,4 +212,8 @@ TaskAttemptContext taskAttemptContext) { return new KafkaRecordReader(); } + + @Override public VectorizedSupport.Support[] getSupportedFeatures() { + return new VectorizedSupport.Support[0]; + } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java index 2225f19a4d..74614dea91 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -41,7 +41,15 @@ * * Notes: * The user of this class has to provide a functional Kafka Consumer and then has to clean it afterward. - * The user of this class is responsible for thread safety if the provided consumer is shared across threads. + * + * Iterator position is related to the position of the consumer therefore consumer CAN NOT BE SHARED between threads. + * + * The polling of new record will only occur if the current buffered records are consumed by the iterator via: + * {@link org.apache.hadoop.hive.kafka.KafkaRecordIterator#next()} + * + * org.apache.hadoop.hive.kafka.KafkaRecordIterator#hasNext() throws PollTimeoutException in case Kafka consumer poll, + * returns 0 record and consumer position did not reach requested endOffset. + * Such an exception is a retryable exception, and it can be a transient exception that if retried may succeed. * */ class KafkaRecordIterator implements Iterator> { @@ -78,13 +86,16 @@ /** * Kafka record Iterator pulling from a single {@code topicPartition} an inclusive {@code requestedStartOffset}, * up to exclusive {@code requestedEndOffset}. + * Iterator position is related to the position of the consumer therefore consumer can not be shared between threads. * * This iterator can block on polling up to a designated timeout. * * If no record is returned by brokers after poll timeout duration such case will be considered as an exception. * Although the timeout exception it is a retryable exception, therefore users of this class can retry if needed. * - * @param consumer Functional kafka consumer, user must initialize this and close it. + * Other than the Kafka consumer, No Resources cleaning is needed. + * + * @param consumer Functional kafka consumer, user must initialize and close it. * @param topicPartition Target Kafka topic partition. * @param requestedStartOffset Requested start offset position, if NULL iterator will seek to beginning using: * {@link Consumer#seekToBeginning(java.util.Collection)}. diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java index 7f8353c9f0..3651cf09f8 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java @@ -157,7 +157,7 @@ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration j /** * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition. */ - private static final class EmptyIterator implements Iterator> { + static final class EmptyIterator implements Iterator> { @Override public boolean hasNext() { return false; } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index 6b2ca1056e..d3cd45150b 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -89,8 +88,9 @@ * Object Inspector of original row plus metadata. */ private ObjectInspector objectInspector; - private final List columnNames = Lists.newArrayList(); + private final List columnNames = new ArrayList<>(); private BytesConverter bytesConverter; + private int metadataStartIndex; @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException { //This method is called before {@link org.apache.hadoop.hive.kafka.KafkaStorageHandler.preCreateTable} @@ -124,7 +124,7 @@ .collect(Collectors.toList())); inspectors.addAll(MetadataColumn.KAFKA_METADATA_INSPECTORS); objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); - + metadataStartIndex = columnNames.size() - MetadataColumn.values().length; // Setup Read and Write Path From/To Kafka if (delegateSerDe.getSerializedClass() == Text.class) { bytesConverter = new TextBytesConverter(); @@ -153,18 +153,19 @@ } StructObjectInspector structObjectInspector = (StructObjectInspector) objInspector; List data = structObjectInspector.getStructFieldsDataAsList(obj); + int firstMetadataColumnIndex = data.size() - MetadataColumn.values().length; if (delegateSerializerOI == null) { //@TODO check if i can cache this if it is the same. delegateSerializerOI = - new SubStructObjectInspector(structObjectInspector, data.size() - MetadataColumn.values().length); + new SubStructObjectInspector(structObjectInspector, firstMetadataColumnIndex); } // We always append the metadata columns to the end of the row. - final List row = data.subList(0, data.size() - MetadataColumn.values().length); - //@TODO @FIXME use column names instead of actual positions that can be hard to read and review - Object key = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size()); - Object partition = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 1); - Object offset = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 2); - Object timestamp = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 3); + final List row = data.subList(0, firstMetadataColumnIndex); + + Object key = data.get(firstMetadataColumnIndex); + Object partition = data.get(firstMetadataColumnIndex + 1); + Object offset = data.get(firstMetadataColumnIndex + 2); + Object timestamp = data.get(firstMetadataColumnIndex + 3); if (PrimitiveObjectInspectorUtils.getLong(offset, MetadataColumn.OFFSET.getObjectInspector()) != -1) { LOG.error("Can not insert values into `__offset` column, has to be [-1]"); @@ -197,15 +198,24 @@ } @Override public Object deserialize(Writable blob) throws SerDeException { - KafkaWritable record = (KafkaWritable) blob; - final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(record.getValue())); - return columnNames.stream().map(name -> { - final MetadataColumn metadataColumn = MetadataColumn.forName(name); - if (metadataColumn != null) { - return record.getHiveWritable(metadataColumn); - } - return delegateDeserializerOI.getStructFieldData(row, delegateDeserializerOI.getStructFieldRef(name)); - }).collect(Collectors.toList()); + return deserializeKWritable((KafkaWritable) blob); + } + + ArrayList deserializeKWritable(KafkaWritable kafkaWritable) throws SerDeException { + ArrayList resultRow = new ArrayList<>(columnNames.size()); + final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(kafkaWritable.getValue())); + //first add the value payload elements + + for (int i = 0; i < metadataStartIndex; i++) { + resultRow.add(delegateDeserializerOI.getStructFieldData(row, + delegateDeserializerOI.getStructFieldRef(columnNames.get(i)))); + } + //add the metadata columns + for (int i = metadataStartIndex; i < columnNames.size(); i++) { + final MetadataColumn metadataColumn = MetadataColumn.forName(columnNames.get(i)); + resultRow.add(kafkaWritable.getHiveWritable(metadataColumn)); + } + return resultRow; } @Override public ObjectInspector getObjectInspector() { @@ -361,6 +371,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { private static class TextBytesConverter implements BytesConverter { final private Text text = new Text(); + @Override public byte[] getBytes(Text writable) { //@TODO There is no reason to decode then encode the string to bytes really //@FIXME this issue with CTRL-CHAR ^0 added by Text at the end of string and Json serd does not like that. diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java index 678e190b3f..b0db0ad055 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java @@ -57,13 +57,10 @@ private static final String ACTION_ABORT = "WriterId [{}] lost record from Topic [{}], delivery Semantic [{}] -> ACTION=ABORT, ERROR caused by [{}]"; - private static final String - ACTION_CARRY_ON = - "WriterId [{}], lost record from Topic [{}], delivery Semantic [{}] -> ACTION=CARRY-ON"; private final String topic; private final String writerId; - private final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE;; + private final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE; private final KafkaProducer producer; private final Callback callback; private final AtomicReference sendExceptionRef = new AtomicReference<>(); diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java new file mode 100644 index 0000000000..294085610e --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Properties; + +/** + * Vectorized Kafka record reader. + */ +class VectorizedKafkaRecordReader implements RecordReader { + private static final Logger LOG = LoggerFactory.getLogger(VectorizedKafkaRecordReader.class); + + private final KafkaConsumer consumer; + private final Iterator> recordsCursor; + private long totalNumberRecords = 0L; + private long consumedRecords = 0L; + private long readBytes = 0L; + private final VectorizedRowBatchCtx rbCtx; + /** + * actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src; + */ + private final int[] projectedColumns; + + /** + * underlying row deserializer. + */ + private final KafkaSerDe serDe; + + private final VectorAssignRow vectorAssignRow = new VectorAssignRow(); + + private final KafkaWritable kafkaWritable = new KafkaWritable(); + + VectorizedKafkaRecordReader(KafkaInputSplit inputSplit, Configuration jobConf) { + // VectorBatch Context initializing + this.rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf); + if (rbCtx.getDataColumnNums() != null) { + projectedColumns = rbCtx.getDataColumnNums(); + } else { + // case all the columns are selected + projectedColumns = new int[rbCtx.getRowColumnTypeInfos().length]; + for (int i = 0; i < projectedColumns.length; i++) { + projectedColumns[i] = i; + } + } + + // row parser and row assigner initializing + serDe = createAndInitializeSerde(jobConf); + try { + vectorAssignRow.init((StructObjectInspector) serDe.getObjectInspector()); + } catch (HiveException e) { + throw new RuntimeException(e); + } + + // Kafka iterator initializing + long startOffset = inputSplit.getStartOffset(); + long endOffset = inputSplit.getEndOffset(); + TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition()); + Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset, + "Start [%s] has to be positive and Less than or equal to End [%s]", + startOffset, + endOffset); + totalNumberRecords += endOffset - startOffset; + final Properties properties = KafkaUtils.consumerProperties(jobConf); + consumer = new KafkaConsumer<>(properties); + long pollTimeout = jobConf.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1); + LOG.debug("Consumer poll timeout [{}] ms", pollTimeout); + this.recordsCursor = + startOffset == endOffset ? + new KafkaRecordReader.EmptyIterator() : + new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); + } + + @Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException { + vectorizedRowBatch.reset(); + try { + return readNextBatch(vectorizedRowBatch, recordsCursor) > 0; + } catch (SerDeException e) { + throw new IOException("Serde exception", e); + } + } + + @Override public NullWritable createKey() { + return NullWritable.get(); + } + + @Override public VectorizedRowBatch createValue() { + return rbCtx.createVectorizedRowBatch(); + } + + @Override public long getPos() throws IOException { + return -1; + } + + @Override public float getProgress() { + if (consumedRecords == 0) { + return 0f; + } + if (consumedRecords >= totalNumberRecords) { + return 1f; + } + return consumedRecords * 1.0f / totalNumberRecords; + } + + @Override public void close() { + LOG.trace("total read bytes [{}]", readBytes); + if (consumer != null) { + consumer.wakeup(); + consumer.close(); + } + } + + private int readNextBatch(VectorizedRowBatch vectorizedRowBatch, + Iterator> recordIterator) throws SerDeException { + int rowsCount = 0; + while (recordIterator.hasNext() && rowsCount < vectorizedRowBatch.getMaxSize()) { + ConsumerRecord kRecord = recordIterator.next(); + kafkaWritable.set(kRecord); + readBytes += kRecord.serializedKeySize() + kRecord.serializedValueSize(); + if (projectedColumns.length > 0) { + ArrayList row = serDe.deserializeKWritable(kafkaWritable); + for (int i : projectedColumns) { + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + } + } + rowsCount++; + } + vectorizedRowBatch.size = rowsCount; + consumedRecords += rowsCount; + return rowsCount; + } + + private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { + KafkaSerDe serDe = new KafkaSerDe(); + MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null"); + Properties + properties = + mapWork.getPartitionDescs() + .stream() + .map(partitionDesc -> partitionDesc.getTableDesc().getProperties()) + .findAny() + .orElseThrow(() -> new RuntimeException("Can not find table property at the map work")); + try { + serDe.initialize(jobConf, properties, null); + } catch (SerDeException e) { + throw new RuntimeException("Can not initialized the serde", e); + } + return serDe; + } +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java index 8a9bbc7f66..b7b0641a8b 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java @@ -30,15 +30,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -50,7 +47,7 @@ /** * Test class for Kafka simple writer. */ -@RunWith(Parameterized.class) public class SimpleKafkaWriterTest { +public class SimpleKafkaWriterTest { private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaWriterTest.class); private static final int RECORD_NUMBER = 17384; @@ -63,17 +60,8 @@ return new KafkaWritable(0, (long) number, value, KEY_BYTES); }).collect(Collectors.toList()); private final Configuration conf = new Configuration(); - private final KafkaOutputFormat.WriteSemantic writeSemantic; private KafkaConsumer consumer; - public SimpleKafkaWriterTest(KafkaOutputFormat.WriteSemantic writeSemantic) { - this.writeSemantic = writeSemantic; - } - - @Parameterized.Parameters public static Iterable data() { - return Arrays.asList(new Object[][] {{KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE}}); - } - @BeforeClass public static void setupCluster() throws Throwable { KAFKA_BROKER_RESOURCE.before(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e3e329f89b..65d49cb6c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2287,7 +2287,7 @@ public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) thro } /** - * Copies the storage handler proeprites configured for a table descriptor to a runtime job + * Copies the storage handler properties configured for a table descriptor to a runtime job * configuration. This differs from {@link #copyTablePropertiesToConf(org.apache.hadoop.hive.ql.plan.TableDesc, org.apache.hadoop.mapred.JobConf)} * in that it does not allow parameters already set in the job to override the values from the * table. This is important for setting the config up for reading, diff --git ql/src/test/queries/clientpositive/kafka_storage_handler.q ql/src/test/queries/clientpositive/kafka_storage_handler.q index e6cd276f95..782b4e0b0c 100644 --- ql/src/test/queries/clientpositive/kafka_storage_handler.q +++ ql/src/test/queries/clientpositive/kafka_storage_handler.q @@ -1,4 +1,4 @@ -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true; CREATE EXTERNAL TABLE kafka_table (`__time` timestamp , `page` string, `user` string, `language` string, @@ -150,16 +150,6 @@ FROM kafka_table_2; Select count(*) FROM kafka_table_2; -CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 -STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' -TBLPROPERTIES -("kafka.topic" = "wiki_kafka_avro_table", -"kafka.bootstrap.servers"="localhost:9092", -"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe"); - -SELECT * FROM wiki_kafka_avro_table_1; -SELECT COUNT (*) from wiki_kafka_avro_table_1; - CREATE EXTERNAL TABLE wiki_kafka_avro_table STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES @@ -232,7 +222,7 @@ TBLPROPERTIES describe extended wiki_kafka_avro_table; -select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; select count(*) from wiki_kafka_avro_table; @@ -241,7 +231,7 @@ select count(distinct `user`) from wiki_kafka_avro_table; select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table; -select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, +select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, `__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090; @@ -297,4 +287,13 @@ values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291); insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284); -select * from kafka_table_csv; \ No newline at end of file +select * from kafka_table_csv; +select distinct `__key`, c_name from kafka_table_csv; + +SET hive.vectorized.execution.enabled=false ; +explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; +select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; + +SET hive.vectorized.execution.enabled=true ; +explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; +select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table; diff --git ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out index 8ea2aa9d3a..ef7160ddce 100644 --- ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out +++ ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out @@ -607,52 +607,6 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table_2 POSTHOOK: Output: hdfs://### HDFS PATH ### 10 -PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 -STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' -TBLPROPERTIES -("kafka.topic" = "wiki_kafka_avro_table", -"kafka.bootstrap.servers"="localhost:9092", -"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe") -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@wiki_kafka_avro_table_1 -POSTHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 -STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' -TBLPROPERTIES -("kafka.topic" = "wiki_kafka_avro_table", -"kafka.bootstrap.servers"="localhost:9092", -"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe") -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@wiki_kafka_avro_table_1 -PREHOOK: query: SELECT * FROM wiki_kafka_avro_table_1 -PREHOOK: type: QUERY -PREHOOK: Input: default@wiki_kafka_avro_table_1 -PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: SELECT * FROM wiki_kafka_avro_table_1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@wiki_kafka_avro_table_1 -POSTHOOK: Output: hdfs://### HDFS PATH ### -key-0 0 0 1534736225090 -key-1 0 1 1534739825090 -key-2 0 2 1534743425090 -key-3 0 3 1534747025090 -key-4 0 4 1534750625090 -key-5 0 5 1534754225090 -key-6 0 6 1534757825090 -key-7 0 7 1534761425090 -key-8 0 8 1534765025090 -key-9 0 9 1534768625090 -key-10 0 10 1534772225090 -PREHOOK: query: SELECT COUNT (*) from wiki_kafka_avro_table_1 -PREHOOK: type: QUERY -PREHOOK: Input: default@wiki_kafka_avro_table_1 -PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: SELECT COUNT (*) from wiki_kafka_avro_table_1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@wiki_kafka_avro_table_1 -POSTHOOK: Output: hdfs://### HDFS PATH ### -11 PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES @@ -827,12 +781,12 @@ __timestamp bigint from deserializer #### A masked pattern was here #### StorageHandlerInfo Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]] -PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +PREHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +POSTHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table @@ -875,13 +829,13 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### 5522.000000000001 0 -PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, +PREHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, `__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, +POSTHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, `__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 POSTHOOK: type: QUERY @@ -1101,3 +1055,659 @@ test1 5 4.999 key 0 0 1536449552290 test2 15 14.999666 NULL 0 1 1536449552285 test4 -5 -4.999 key-2 0 3 1536449552291 test5 -15 -14.9996666 key-3 0 5 1536449552284 +PREHOOK: query: select distinct `__key`, c_name from kafka_table_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__key`, c_name from kafka_table_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +key test1 +key-2 test4 +key-3 test5 +NULL test2 +PREHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP(9)) AS `$f1`, `__key` AS `$f2` +FROM `default`.`wiki_kafka_avro_table` +GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP(9)), `__key` +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: wiki_kafka_avro_table + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: __offset (type: bigint), CAST( __timestamp AS TIMESTAMP) (type: timestamp), __key (type: binary) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + null sort order: aaa + sort order: +++ + Map-reduce partition columns: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + tag: -1 + auto parallelism: true + Execution mode: llap + LLAP IO: no inputs + Path -> Alias: + hdfs://### HDFS PATH ### [wiki_kafka_avro_table] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: wiki_kafka_avro_table + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + jobProperties: + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + name: default.wiki_kafka_avro_table + name: default.wiki_kafka_avro_table + Truncated Path -> Alias: + /wiki_kafka_avro_table [wiki_kafka_avro_table] + Reducer 2 + Execution mode: llap + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: bigint), KEY._col1 (type: timestamp), KEY._col2 (type: binary) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1,_col2 + columns.types bigint:timestamp:binary + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 2018-08-20 03:37:05.09 key-0 +1 2018-08-20 04:37:05.09 key-1 +3 2018-08-20 06:37:05.09 key-3 +4 2018-08-20 07:37:05.09 key-4 +5 2018-08-20 08:37:05.09 key-5 +7 2018-08-20 10:37:05.09 key-7 +8 2018-08-20 11:37:05.09 key-8 +9 2018-08-20 12:37:05.09 key-9 +10 2018-08-20 13:37:05.09 key-10 +11 2018-08-20 03:37:05.09 key-0 +13 2018-08-20 05:37:05.09 key-2 +15 2018-08-20 07:37:05.09 key-4 +17 2018-08-20 09:37:05.09 key-6 +19 2018-08-20 11:37:05.09 key-8 +21 2018-08-20 13:37:05.09 key-10 +2 2018-08-20 05:37:05.09 key-2 +6 2018-08-20 09:37:05.09 key-6 +12 2018-08-20 04:37:05.09 key-1 +14 2018-08-20 06:37:05.09 key-3 +16 2018-08-20 08:37:05.09 key-5 +18 2018-08-20 10:37:05.09 key-7 +20 2018-08-20 12:37:05.09 key-9 +PREHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP(9)) AS `$f1`, `__key` AS `$f2` +FROM `default`.`wiki_kafka_avro_table` +GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP(9)), `__key` +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: wiki_kafka_avro_table + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: __offset (type: bigint), CAST( __timestamp AS TIMESTAMP) (type: timestamp), __key (type: binary) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + null sort order: aaa + sort order: +++ + Map-reduce partition columns: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + tag: -1 + auto parallelism: true + Execution mode: vectorized, llap + LLAP IO: no inputs + Path -> Alias: + hdfs://### HDFS PATH ### [wiki_kafka_avro_table] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: wiki_kafka_avro_table + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + jobProperties: + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + name: default.wiki_kafka_avro_table + name: default.wiki_kafka_avro_table + Truncated Path -> Alias: + /wiki_kafka_avro_table [wiki_kafka_avro_table] + Reducer 2 + Execution mode: vectorized, llap + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: bigint), KEY._col1 (type: timestamp), KEY._col2 (type: binary) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1,_col2 + columns.types bigint:timestamp:binary + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 2018-08-20 03:37:05.09 key-0 +1 2018-08-20 04:37:05.09 key-1 +3 2018-08-20 06:37:05.09 key-3 +4 2018-08-20 07:37:05.09 key-4 +5 2018-08-20 08:37:05.09 key-5 +7 2018-08-20 10:37:05.09 key-7 +8 2018-08-20 11:37:05.09 key-8 +9 2018-08-20 12:37:05.09 key-9 +10 2018-08-20 13:37:05.09 key-10 +11 2018-08-20 03:37:05.09 key-0 +13 2018-08-20 05:37:05.09 key-2 +15 2018-08-20 07:37:05.09 key-4 +17 2018-08-20 09:37:05.09 key-6 +19 2018-08-20 11:37:05.09 key-8 +21 2018-08-20 13:37:05.09 key-10 +2 2018-08-20 05:37:05.09 key-2 +6 2018-08-20 09:37:05.09 key-6 +12 2018-08-20 04:37:05.09 key-1 +14 2018-08-20 06:37:05.09 key-3 +16 2018-08-20 08:37:05.09 key-5 +18 2018-08-20 10:37:05.09 key-7 +20 2018-08-20 12:37:05.09 key-9