record = recordsCursor.next();
+ bytesWritable.set(record);
+ consumedRecords += 1;
+ readBytes += record.serializedValueSize();
+ return true;
+ }
+ return false;
+ }
+
+ @Override public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override public KafkaRecordWritable createValue() {
+ return new KafkaRecordWritable();
+ }
+
+ @Override public long getPos() throws IOException {
+ return consumedRecords;
+ }
+
+ @Override public boolean nextKeyValue() throws IOException {
+ currentWritableValue = new KafkaRecordWritable();
+ if (next(NullWritable.get(), currentWritableValue)) {
+ return true;
+ }
+ currentWritableValue = null;
+ return false;
+ }
+
+ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override public KafkaRecordWritable getCurrentValue() throws IOException, InterruptedException {
+ return Preconditions.checkNotNull(currentWritableValue);
+ }
+
+ @Override public float getProgress() throws IOException {
+ if (consumedRecords == 0) {
+ return 0f;
+ }
+ if (consumedRecords >= totalNumberRecords) {
+ return 1f;
+ }
+ return consumedRecords * 1.0f / totalNumberRecords;
+ }
+
+ @Override public void close() throws IOException {
+ if (!started) {
+ return;
+ }
+ LOG.trace("total read bytes [{}]", readBytes);
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ closer.close();
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
new file mode 100644
index 0000000000..7d5754d81b
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end.
+ *
+ * If {@code startOffset} is not null will seek up to that offset
+ * Else If {@code startOffset} is null will seek to beginning see
+ * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
+ *
+ * When provided with an end offset it will return records up to the record with offset == endOffset - 1,
+ * Else If end offsets is null it will read up to the current end see
+ * {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(java.util.Collection)}
+ *
+ * Current implementation of this Iterator will throw and exception if can not poll up to the endOffset - 1
+ */
+public class KafkaRecordIterator implements Iterator> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
+
+ private final Consumer consumer;
+ private final TopicPartition topicPartition;
+ private long endOffset;
+ private long startOffset;
+ private final long pollTimeoutMs;
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private ConsumerRecords records;
+ private long currentOffset;
+ private ConsumerRecord nextRecord;
+ private boolean hasMore = true;
+ private final boolean started;
+
+ //Kafka consumer poll method return an iterator of records.
+ private Iterator> consumerRecordIterator = null;
+
+ /**
+ * @param consumer functional kafka consumer
+ * @param topicPartition kafka topic partition
+ * @param startOffset start position of stream.
+ * @param endOffset requested end position. If null will read up to current last
+ * @param pollTimeoutMs poll time out in ms
+ */
+ public KafkaRecordIterator(Consumer consumer,
+ TopicPartition topicPartition,
+ @Nullable Long startOffset,
+ @Nullable Long endOffset,
+ long pollTimeoutMs) {
+ this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null");
+ this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
+ this.pollTimeoutMs = pollTimeoutMs;
+ Preconditions.checkState(this.pollTimeoutMs > 0, "poll timeout has to be positive number");
+ this.startOffset = startOffset == null ? -1L : startOffset;
+ this.endOffset = endOffset == null ? -1L : endOffset;
+ assignAndSeek();
+ this.started = true;
+ }
+
+ public KafkaRecordIterator(Consumer consumer, TopicPartition tp, long pollTimeoutMs) {
+ this(consumer, tp, null, null, pollTimeoutMs);
+ }
+
+ private void assignAndSeek() {
+ // assign topic partition to consumer
+ final List topicPartitionList = ImmutableList.of(topicPartition);
+ if (LOG.isTraceEnabled()) {
+ stopwatch.reset().start();
+ }
+
+ consumer.assign(topicPartitionList);
+ // compute offsets and seek to start
+ if (startOffset > -1) {
+ LOG.info("Seeking to offset [{}] of topic partition [{}]", startOffset, topicPartition);
+ consumer.seek(topicPartition, startOffset);
+ } else {
+ LOG.info("Seeking to beginning of topic partition [{}]", topicPartition);
+ // seekToBeginning is lazy thus need to call position() or poll(0)
+ this.consumer.seekToBeginning(Collections.singleton(topicPartition));
+ startOffset = consumer.position(topicPartition);
+ }
+ if (endOffset == -1) {
+ this.endOffset = consumer.endOffsets(topicPartitionList).get(topicPartition);
+ LOG.info("EndOffset set to {}", endOffset);
+ }
+ currentOffset = consumer.position(topicPartition);
+ Preconditions.checkState(this.endOffset >= currentOffset,
+ "End offset [%s] need to be greater than start offset [%s]",
+ this.endOffset,
+ currentOffset);
+ LOG.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset [{}]; endOffset [{}]",
+ topicPartition,
+ currentOffset,
+ this.endOffset);
+ if (LOG.isTraceEnabled()) {
+ stopwatch.stop();
+ LOG.trace("Time to assign and seek [{}] ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ /*
+ Poll more records if
+ Initial poll case -> (records == null)
+ OR
+ Need to poll at least one more record (currentOffset + 1 < endOffset) AND consumerRecordIterator is empty (!hasMore)
+ */
+ if (!hasMore && currentOffset + 1 < endOffset || records == null) {
+ pollRecords();
+ findNext();
+ }
+ return hasMore;
+ }
+
+ /**
+ * Poll more records or Fail with {@link TimeoutException} if no records returned before reaching target end offset.
+ */
+ private void pollRecords() {
+ if (LOG.isTraceEnabled()) {
+ stopwatch.reset().start();
+ }
+ Preconditions.checkArgument(started);
+ records = consumer.poll(pollTimeoutMs);
+ if (LOG.isTraceEnabled()) {
+ stopwatch.stop();
+ LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ // Fail if we can not poll within one lap of pollTimeoutMs.
+ if (records.isEmpty() && currentOffset < endOffset) {
+ throw new TimeoutException(String.format("Current offset: [%s]-TopicPartition:[%s], target End offset:[%s]."
+ + "Consumer returned 0 record due to exhausted poll timeout [%s]ms, try increasing[%s]",
+ currentOffset,
+ topicPartition.toString(),
+ endOffset,
+ pollTimeoutMs,
+ KafkaStorageHandler.HIVE_KAFKA_POLL_TIMEOUT));
+ }
+ consumerRecordIterator = records.iterator();
+ }
+
+ @Override public ConsumerRecord next() {
+ ConsumerRecord value = nextRecord;
+ Preconditions.checkState(value.offset() < endOffset);
+ findNext();
+ return Preconditions.checkNotNull(value);
+ }
+
+ /**
+ * Find the next element in the batch of returned records by previous poll or set hasMore to false tp poll more next
+ * call to {@link KafkaRecordIterator#hasNext()}.
+ */
+ private void findNext() {
+ if (consumerRecordIterator.hasNext()) {
+ nextRecord = consumerRecordIterator.next();
+ hasMore = true;
+ if (nextRecord.offset() < endOffset) {
+ currentOffset = nextRecord.offset();
+ return;
+ }
+ }
+ hasMore = false;
+ nextRecord = null;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
new file mode 100644
index 0000000000..b6b8d39131
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Writable implementation of Kafka ConsumerRecord.
+ * Serialized in the form
+ * kafkaRecordTimestamp(long) | kafkaPartition (int) | recordOffset (long) | value.size (int) | value (byte [])
+ */
+public class KafkaRecordWritable implements Writable {
+
+ private int partition;
+ private long offset;
+ private long timestamp;
+ private byte[] value;
+
+ public static KafkaRecordWritable fromKafkaRecord(ConsumerRecord consumerRecord) {
+ return new KafkaRecordWritable(consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.timestamp(),
+ consumerRecord.value());
+ }
+
+ public void set(ConsumerRecord consumerRecord) {
+ this.partition = consumerRecord.partition();
+ this.timestamp = consumerRecord.timestamp();
+ this.offset = consumerRecord.offset();
+ this.value = consumerRecord.value();
+ }
+
+ private KafkaRecordWritable(int partition, long offset, long timestamp, byte[] value) {
+ this.partition = partition;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+ public KafkaRecordWritable() {
+ }
+
+ @Override public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(timestamp);
+ dataOutput.writeInt(partition);
+ dataOutput.writeLong(offset);
+ dataOutput.writeInt(value.length);
+ dataOutput.write(value);
+ }
+
+ @Override public void readFields(DataInput dataInput) throws IOException {
+ timestamp = dataInput.readLong();
+ partition = dataInput.readInt();
+ offset = dataInput.readLong();
+ int size = dataInput.readInt();
+ if (size > 0) {
+ value = new byte[size];
+ dataInput.readFully(value);
+ } else {
+ value = new byte[0];
+ }
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaRecordWritable)) {
+ return false;
+ }
+ KafkaRecordWritable that = (KafkaRecordWritable) o;
+ return getPartition() == that.getPartition()
+ && getOffset() == that.getOffset()
+ && getTimestamp() == that.getTimestamp()
+ && Arrays.equals(getValue(), that.getValue());
+ }
+
+ @Override public int hashCode() {
+
+ int result = Objects.hash(getPartition(), getOffset(), getTimestamp());
+ result = 31 * result + Arrays.hashCode(getValue());
+ return result;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
new file mode 100644
index 0000000000..e90c486786
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Kafka Range trimmer, takes a full kafka scan and prune the scan based on a filter expression
+ * it is a Best effort trimmer and it can not replace the filter it self, filtration still takes place in Hive executor.
+ */
+public class KafkaScanTrimmer {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaScanTrimmer.class);
+ private final Map fullHouse;
+ private final KafkaConsumer kafkaConsumer;
+
+ /**
+ * @param fullHouse initial full scan to be pruned, this is a map of Topic partition to input split.
+ * @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed
+ */
+ public KafkaScanTrimmer(Map fullHouse, KafkaConsumer kafkaConsumer) {
+ this.fullHouse = fullHouse;
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ * This might block due to calls like.
+ * org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(java.util.Map)
+ *
+ * @param filterExpression filter expression to be used for pruning scan
+ *
+ * @return tiny house of of the full house based on filter expression
+ */
+ public Map computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
+ Map optimizedScan = parse(filterExpression);
+
+ if (LOG.isDebugEnabled()) {
+ if (optimizedScan != null) {
+ LOG.debug("Optimized scan:");
+ optimizedScan.forEach((tp, input) -> LOG.info(
+ "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]",
+ tp.topic(),
+ tp.partition(),
+ input.getStartOffset(),
+ input.getEndOffset()));
+ } else {
+ LOG.debug("No optimization thus using full scan ");
+ fullHouse.forEach((tp, input) -> LOG.info(
+ "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]",
+ tp.topic(),
+ tp.partition(),
+ input.getStartOffset(),
+ input.getEndOffset()));
+ }
+ }
+ return optimizedScan == null ? fullHouse : optimizedScan;
+ }
+
+ /**
+ * @param expression filter to parse and trim the full scan
+ *
+ * @return Map of optimized kafka range scans or null if it is impossible to optimize.
+ */
+ @Nullable private Map parse(ExprNodeDesc expression) {
+ if (expression.getClass() != ExprNodeGenericFuncDesc.class) {
+ return null;
+ }
+ // get the kind of expression
+ ExprNodeGenericFuncDesc expr = (ExprNodeGenericFuncDesc) expression;
+ Class> op = expr.getGenericUDF().getClass();
+
+ // handle the logical operators
+ if (FunctionRegistry.isOpOr(expr)) {
+ return pushOrOp(expr);
+ }
+ if (FunctionRegistry.isOpAnd(expr)) {
+ return pushAndOp(expr);
+ }
+
+ if (op == GenericUDFOPGreaterThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+ } else if (op == GenericUDFOPEqualOrGreaterThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, true);
+ } else if (op == GenericUDFOPLessThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, false);
+ } else if (op == GenericUDFOPEqualOrLessThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+ } else if (op == GenericUDFOPEqual.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.EQUALS, false);
+ // otherwise, we didn't understand it, so bailout
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * @param expr leaf node to push
+ * @param operator operator
+ * @param negation true if it is a negation, this is used to represent:
+ * GenericUDFOPGreaterThan and GenericUDFOPEqualOrGreaterThan
+ * using PredicateLeaf.Operator.LESS_THAN and PredicateLeaf.Operator.LESS_THAN_EQUALS
+ *
+ * @return leaf scan or null if can not figure out push down
+ */
+ @Nullable private Map pushLeaf(ExprNodeGenericFuncDesc expr,
+ PredicateLeaf.Operator operator,
+ boolean negation) {
+ if (expr.getChildren().size() != 2) {
+ return null;
+ }
+ GenericUDF genericUDF = expr.getGenericUDF();
+ if (!(genericUDF instanceof GenericUDFBaseCompare)) {
+ return null;
+ }
+ ExprNodeDesc expr1 = expr.getChildren().get(0);
+ ExprNodeDesc expr2 = expr.getChildren().get(1);
+ // We may need to peel off the GenericUDFBridge that is added by CBO or user
+ if (expr1.getTypeInfo().equals(expr2.getTypeInfo())) {
+ expr1 = getColumnExpr(expr1);
+ expr2 = getColumnExpr(expr2);
+ }
+
+ ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2);
+ if (extracted == null || (extracted.length > 2)) {
+ return null;
+ }
+
+ ExprNodeColumnDesc columnDesc;
+ ExprNodeConstantDesc constantDesc;
+ final boolean flip;
+
+ if (extracted[0] instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc) extracted[0];
+ constantDesc = (ExprNodeConstantDesc) extracted[1];
+ flip = false;
+
+ } else {
+ flip = true;
+ columnDesc = (ExprNodeColumnDesc) extracted[1];
+ constantDesc = (ExprNodeConstantDesc) extracted[0];
+ }
+
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.PARTITION_COLUMN)) {
+ return buildScanFormPartitionPredicate(fullHouse,
+ operator,
+ ((Number) constantDesc.getValue()).intValue(),
+ flip,
+ negation);
+
+ }
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.OFFSET_COLUMN)) {
+ return buildScanFromOffsetPredicate(fullHouse,
+ operator,
+ ((Number) constantDesc.getValue()).longValue(),
+ flip,
+ negation);
+ }
+
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.TIMESTAMP_COLUMN)) {
+ long timestamp = ((Number) constantDesc.getValue()).longValue();
+ return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer);
+ }
+ return null;
+ }
+
+ /**
+ * Trim kafka scan using a leaf binary predicate on partition column.
+ *
+ * @param fullScan kafka full scan to be optimized
+ * @param operator predicate operator, equal, lessThan or lessThanEqual
+ * @param partitionConst partition constant value
+ * @param flip true if the position of column and constant is flipped by default assuming column OP constant
+ * @param negation true if the expression is a negation of the original expression
+ *
+ * @return filtered kafka scan
+ */
+ @VisibleForTesting protected static Map buildScanFormPartitionPredicate(
+ Map fullScan,
+ PredicateLeaf.Operator operator,
+ int partitionConst,
+ boolean flip,
+ boolean negation) {
+ final Predicate predicate;
+ final Predicate intermediatePredicate;
+ switch (operator) {
+ case EQUALS:
+ predicate = topicPartition -> topicPartition != null && topicPartition.partition() == partitionConst;
+ break;
+ case LESS_THAN:
+ intermediatePredicate =
+ flip ?
+ topicPartition -> topicPartition != null && partitionConst < topicPartition.partition() :
+ topicPartition -> topicPartition != null && topicPartition.partition() < partitionConst;
+
+ predicate = negation ? intermediatePredicate.negate() : intermediatePredicate;
+ break;
+ case LESS_THAN_EQUALS:
+ intermediatePredicate =
+ flip ?
+ topicPartition -> topicPartition != null && partitionConst <= topicPartition.partition() :
+ topicPartition -> topicPartition != null && topicPartition.partition() <= partitionConst;
+
+ predicate = negation ? intermediatePredicate.negate() : intermediatePredicate;
+ break;
+ default:
+ //Default to select * for unknown cases
+ predicate = topicPartition -> true;
+ }
+
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ // Filter full scan based on predicate
+ fullScan.entrySet()
+ .stream()
+ .filter(entry -> predicate.test(entry.getKey()))
+ .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone()));
+ return builder.build();
+ }
+
+ /**
+ * @param fullScan full kafka scan to be pruned
+ * @param operator operator kind
+ * @param offsetConst offset constant value
+ * @param flip true if position of constant and column were flipped by default assuming COLUMN OP CONSTANT
+ * @param negation true if the expression is a negation of the original expression
+ *
+ * @return optimized kafka scan
+ */
+ @VisibleForTesting protected static Map buildScanFromOffsetPredicate(
+ Map fullScan,
+ PredicateLeaf.Operator operator,
+ long offsetConst,
+ boolean flip,
+ boolean negation) {
+ final boolean isEndBound;
+ final long startOffset;
+ final long endOffset;
+
+ if (flip == negation) {
+ isEndBound = true;
+ } else {
+ isEndBound = false;
+ }
+ switch (operator) {
+ case LESS_THAN_EQUALS:
+ if (isEndBound) {
+ startOffset = -1;
+ endOffset = negation ? offsetConst : offsetConst + 1;
+ } else {
+ endOffset = -1;
+ startOffset = negation ? offsetConst + 1 : offsetConst;
+ }
+ break;
+ case EQUALS:
+ startOffset = offsetConst;
+ endOffset = offsetConst + 1;
+ break;
+ case LESS_THAN:
+ if (isEndBound) {
+ endOffset = negation ? offsetConst + 1 : offsetConst;
+ startOffset = -1;
+ } else {
+ endOffset = -1;
+ startOffset = negation ? offsetConst : offsetConst + 1;
+ }
+ break;
+ default:
+ // default to select *
+ startOffset = -1;
+ endOffset = -1;
+ }
+
+ final Map newScan = new HashMap<>();
+
+ fullScan.forEach((tp, existingInputSplit) -> {
+ final KafkaPullerInputSplit newInputSplit;
+ if (startOffset != -1 && endOffset == -1) {
+ newInputSplit = new KafkaPullerInputSplit(tp.topic(),
+ tp.partition(),
+ // @TODO make sure that this is okay
+ //if the user as for start offset > max offset will replace with last offset
+ Math.min(startOffset, existingInputSplit.getEndOffset()),
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getPath());
+ } else if (endOffset != -1 && startOffset == -1) {
+ newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
+ //@TODO check this, if user ask for non existing end offset ignore it and position head on start
+ Math.max(endOffset, existingInputSplit.getStartOffset()), existingInputSplit.getPath());
+ } else if (endOffset == startOffset + 1) {
+ if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) {
+ newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(),
+ //@TODO check this with team if we have ask for offset out of range what to do ?
+ // here am seeking to last offset
+ existingInputSplit.getEndOffset(), existingInputSplit.getEndOffset(), existingInputSplit.getPath());
+ } else {
+ newInputSplit =
+ new KafkaPullerInputSplit(tp.topic(),
+ tp.partition(),
+ startOffset,
+ endOffset,
+ existingInputSplit.getPath());
+ }
+
+ } else {
+ newInputSplit =
+ new KafkaPullerInputSplit(tp.topic(),
+ tp.partition(),
+ existingInputSplit.getStartOffset(),
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getPath());
+ }
+
+ newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit));
+ });
+
+ return newScan;
+ }
+
+ @Nullable protected static Map buildScanForTimesPredicate(
+ Map fullHouse,
+ PredicateLeaf.Operator operator,
+ long timestamp,
+ boolean flip,
+ boolean negation,
+ KafkaConsumer consumer) {
+ long
+ increment =
+ (flip && operator == PredicateLeaf.Operator.LESS_THAN
+ || negation && operator == PredicateLeaf.Operator.LESS_THAN_EQUALS) ? 1L : 0L;
+ // only accepted cases are timestamp_column [ > ; >= ; = ]constant
+ if (operator == PredicateLeaf.Operator.EQUALS || flip ^ negation) {
+ final Map timePartitionsMap = Maps.toMap(fullHouse.keySet(), tp -> timestamp + increment);
+ try {
+ // Based on Kafka docs
+ // NULL will be returned for that partition If the message format version in a partition is before 0.10.0
+ Map offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap);
+ final Map newScan = Maps.toMap(fullHouse.keySet(), tp -> {
+ KafkaPullerInputSplit existing = fullHouse.get(tp);
+ OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp);
+ //Null in case filter doesn't match or field not existing ie old broker thus return empty scan.
+ final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset();
+ return new KafkaPullerInputSplit(tp.topic(),
+ tp.partition(),
+ startOffset,
+ existing.getEndOffset(),
+ existing.getPath());
+ });
+ return newScan;
+ } catch (Exception e) {
+ LOG.error("Error while looking up offsets for time", e);
+ //Bailout when can not figure out offsets for times.
+ return null;
+ }
+
+ }
+ return null;
+ }
+
+ /**
+ * @param expr And expression to be parsed
+ *
+ * @return either full scan or an optimized sub scan.
+ */
+ private Map pushAndOp(ExprNodeGenericFuncDesc expr) {
+ Map currentScan = new HashMap<>();
+
+ fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaPullerInputSplit.copyOf(input)));
+
+ for (ExprNodeDesc child : expr.getChildren()) {
+ Map scan = parse(child);
+ if (scan != null) {
+ Set currentKeys = ImmutableSet.copyOf(currentScan.keySet());
+ currentKeys.stream().forEach(key -> {
+ KafkaPullerInputSplit newSplit = scan.get(key);
+ KafkaPullerInputSplit oldSplit = currentScan.get(key);
+ currentScan.remove(key);
+ if (newSplit != null) {
+ KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit);
+ if (intersectionSplit != null) {
+ currentScan.put(key, intersectionSplit);
+ }
+ }
+ });
+
+ }
+ }
+ return currentScan;
+ }
+
+ @Nullable private Map pushOrOp(ExprNodeGenericFuncDesc expr) {
+ final Map currentScan = new HashMap<>();
+ for (ExprNodeDesc child : expr.getChildren()) {
+ Map scan = parse(child);
+ if (scan == null) {
+ // if any of the children is unknown bailout
+ return null;
+ }
+
+ scan.forEach((tp, input) -> {
+ KafkaPullerInputSplit existingSplit = currentScan.get(tp);
+ currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
+ });
+ }
+ return currentScan;
+ }
+
+ private static ExprNodeDesc getColumnExpr(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ return expr;
+ }
+ ExprNodeGenericFuncDesc funcDesc = null;
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ funcDesc = (ExprNodeGenericFuncDesc) expr;
+ }
+ if (null == funcDesc) {
+ return expr;
+ }
+ GenericUDF udf = funcDesc.getGenericUDF();
+ // check if its a simple cast expression.
+ if ((udf instanceof GenericUDFBridge
+ || udf instanceof GenericUDFToBinary
+ || udf instanceof GenericUDFToChar
+ || udf instanceof GenericUDFToVarchar
+ || udf instanceof GenericUDFToDecimal
+ || udf instanceof GenericUDFToDate
+ || udf instanceof GenericUDFToUnixTimeStamp
+ || udf instanceof GenericUDFToUtcTimestamp) && funcDesc.getChildren().size() == 1 && funcDesc.getChildren()
+ .get(0) instanceof ExprNodeColumnDesc) {
+ return expr.getChildren().get(0);
+ }
+ return expr;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
new file mode 100644
index 0000000000..ca26045e8a
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Hive Kafka storage handler to allow user querying Stream of tuples from a Kafka queue.
+ */
+public class KafkaStorageHandler implements HiveStorageHandler {
+
+ public static final String TIMESTAMP_COLUMN = "__timestamp";
+ public static final String PARTITION_COLUMN = "__partition";
+ public static final String OFFSET_COLUMN = "__offset";
+ public static final String SERDE_CLASS_NAME = "kafka.serde.class";
+ public static final String HIVE_KAFKA_TOPIC = "kafka.topic";
+ public static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
+ public static final String HIVE_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+ public static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms";
+ public static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000L; // 5 seconds
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHandler.class);
+
+ Configuration configuration;
+
+ @Override public Class extends InputFormat> getInputFormatClass() {
+ return KafkaPullerInputFormat.class;
+ }
+
+ @Override public Class extends OutputFormat> getOutputFormatClass() {
+ return NullOutputFormat.class;
+ }
+
+ @Override public Class extends AbstractSerDe> getSerDeClass() {
+ return GenericKafkaSerDe.class;
+ }
+
+ @Override public HiveMetaHook getMetaHook() {
+ return null;
+ }
+
+ @Override public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+ return new DefaultHiveAuthorizationProvider();
+ }
+
+ @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) {
+ jobProperties.put(HIVE_KAFKA_TOPIC,
+ Preconditions.checkNotNull(tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC),
+ "kafka topic missing set table property->" + HIVE_KAFKA_TOPIC));
+ LOG.debug("Table properties: Kafka Topic {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC));
+ jobProperties.put(HIVE_KAFKA_BOOTSTRAP_SERVERS,
+ Preconditions.checkNotNull(tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS),
+ "Broker address missing set table property->" + HIVE_KAFKA_BOOTSTRAP_SERVERS));
+ LOG.debug("Table properties: Kafka broker {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS));
+ jobProperties.put(SERDE_CLASS_NAME,
+ tableDesc.getProperties().getProperty(SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName()));
+
+ LOG.info("Table properties: SerDe class name {}", jobProperties.get(SERDE_CLASS_NAME));
+
+ //set extra properties
+ tableDesc.getProperties()
+ .entrySet()
+ .stream()
+ .filter(objectObjectEntry -> objectObjectEntry.getKey()
+ .toString()
+ .toLowerCase()
+ .startsWith(CONSUMER_CONFIGURATION_PREFIX))
+ .forEach(entry -> {
+ String key = entry.getKey().toString().substring(CONSUMER_CONFIGURATION_PREFIX.length() + 1);
+ String value = entry.getValue().toString();
+ jobProperties.put(key, value);
+ LOG.info("Setting extra job properties: key [{}] -> value [{}]", key, value);
+
+ });
+ }
+
+ @Override public void configureInputJobCredentials(TableDesc tableDesc, Map secrets) {
+
+ }
+
+ @Override public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) {
+
+ }
+
+ @Override public void configureTableJobProperties(TableDesc tableDesc, Map jobProperties) {
+ configureInputJobProperties(tableDesc, jobProperties);
+ }
+
+ @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ Map properties = new HashMap<>();
+ configureInputJobProperties(tableDesc, properties);
+ properties.forEach((key, value) -> jobConf.set(key, value));
+ try {
+ KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override public void setConf(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override public Configuration getConf() {
+ return configuration;
+ }
+
+ @Override public String toString() {
+ return "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
new file mode 100644
index 0000000000..132db9880d
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Utilities class.
+ */
+public final class KafkaStreamingUtils {
+
+ private KafkaStreamingUtils() {
+ }
+
+ /**
+ * @param configuration Job configs
+ *
+ * @return default consumer properties
+ */
+ public static Properties consumerProperties(Configuration configuration) {
+ final Properties props = new Properties();
+ // those are very important to set to avoid long blocking
+ props.setProperty("request.timeout.ms", "10001");
+ props.setProperty("fetch.max.wait.ms", "10000");
+ props.setProperty("session.timeout.ms", "10000");
+ // we are managing the commit offset
+ props.setProperty("enable.auto.commit", "false");
+ // we are seeking in the stream so no reset
+ props.setProperty("auto.offset.reset", "none");
+ String brokerEndPoint = configuration.get(KafkaStorageHandler.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+ props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+ props.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ // user can always override stuff
+ final Map
+ kafkaProperties =
+ configuration.getValByRegex("^" + KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX + "\\..*");
+ for (Map.Entry entry : kafkaProperties.entrySet()) {
+ props.setProperty(entry.getKey().substring(KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX.length() + 1),
+ entry.getValue());
+ }
+ return props;
+ }
+
+ public static void copyDependencyJars(Configuration conf, Class>... classes) throws IOException {
+ Set jars = new HashSet<>();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ jars.addAll(conf.getStringCollection("tmpjars"));
+ jars.addAll(Arrays.asList(classes).stream().filter(aClass -> aClass != null)
+ .map(clazz -> {
+ String path = Utilities.jarFinderGetJar(clazz);
+ if (path == null) {
+ throw new RuntimeException("Could not find jar for class "
+ + clazz
+ + " in order to ship it to the cluster.");
+ }
+ try {
+ if (!localFs.exists(new Path(path))) {
+ throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return path;
+ }).collect(Collectors.toList()));
+
+ if (jars.isEmpty()) {
+ return;
+ }
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/package-info.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/package-info.java
new file mode 100644
index 0000000000..8a0d8fd0b0
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package info file.
+ */
+
+package org.apache.hadoop.hive.kafka;
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
new file mode 100644
index 0000000000..be26986818
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Kafka Hadoop InputSplit Test.
+ */
+public class KafkaPullerInputSplitTest {
+ private String topic = "my_topic";
+ private KafkaPullerInputSplit expectedInputSplit;
+
+ public KafkaPullerInputSplitTest() {
+ this.expectedInputSplit = new KafkaPullerInputSplit(this.topic, 1, 50L, 56L, new Path("/tmp"));
+ }
+
+ @Test public void testWriteRead() throws IOException {
+ DataOutput output = new DataOutputBuffer();
+ this.expectedInputSplit.write(output);
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit();
+ DataInput input = new DataInputBuffer();
+ ((DataInputBuffer) input).reset(((DataOutputBuffer) output).getData(), 0, ((DataOutputBuffer) output).getLength());
+ kafkaPullerInputSplit.readFields(input);
+ Assert.assertEquals(this.expectedInputSplit, kafkaPullerInputSplit);
+ }
+
+ @Test public void andRangeOverLapping() {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+ KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 200, new Path("/tmp"));
+
+ Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
+ KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
+
+ }
+
+ @Test public void andRangeNonOverLapping() {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit2 =
+ new KafkaPullerInputSplit("test-topic", 2, 550, 700, new Path("/tmp"));
+
+ Assert.assertEquals(null, KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
+
+ }
+
+ @Test public void orRange() {
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit =
+ new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 600, new Path("/tmp"));
+
+ Assert.assertEquals(kafkaPullerInputSplit2,
+ KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
+
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit3 =
+ new KafkaPullerInputSplit("test-topic", 2, 700, 6000, new Path("/tmp"));
+
+ Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 300, 6000, new Path("/tmp")),
+ KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit3));
+ }
+
+ @Test public void copyOf() {
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit =
+ new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaPullerInputSplit copyOf = KafkaPullerInputSplit.copyOf(kafkaPullerInputSplit);
+ Assert.assertEquals(kafkaPullerInputSplit, copyOf);
+ Assert.assertTrue(kafkaPullerInputSplit != copyOf);
+ }
+
+ @Test public void testClone() {
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit =
+ new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+ KafkaPullerInputSplit clone = kafkaPullerInputSplit.clone();
+ Assert.assertEquals(kafkaPullerInputSplit, clone);
+ Assert.assertTrue(clone != kafkaPullerInputSplit);
+
+ }
+
+ @Test public void testSlice() {
+ KafkaPullerInputSplit
+ kafkaPullerInputSplit =
+ new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+ List kafkaPullerInputSplitList = KafkaPullerInputSplit.slice(14, kafkaPullerInputSplit);
+ Assert.assertEquals(kafkaPullerInputSplitList.stream()
+ .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
+ - kafkaPullerInputSplit1.getStartOffset())
+ .sum(), kafkaPullerInputSplit.getEndOffset() - kafkaPullerInputSplit.getStartOffset());
+ Assert.assertTrue(kafkaPullerInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getStartOffset()
+ == kafkaPullerInputSplit1.getStartOffset())
+ .count() == 1);
+ Assert.assertTrue(kafkaPullerInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getEndOffset() == kafkaPullerInputSplit1.getEndOffset())
+ .count() == 1);
+
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
new file mode 100644
index 0000000000..41c31a47b4
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableList;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka Iterator Tests.
+ */
+public class KafkaRecordIteratorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIteratorTest.class);
+ private static final String TOPIC = "my_test_topic";
+ private static final List> RECORDS = new ArrayList();
+ private static final int RECORD_NUMBER = 100;
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 0);
+ public static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+ private static ZkUtils zkUtils;
+ private static ZkClient zkClient;
+ private static KafkaProducer producer;
+ private static KafkaServer kafkaServer;
+ private static String zkConnect;
+ private KafkaConsumer consumer = null;
+ private KafkaRecordIterator kafkaRecordIterator = null;
+ private Configuration conf = new Configuration();
+ private static EmbeddedZookeeper zkServer;
+
+ public KafkaRecordIteratorTest() {
+ }
+
+ @BeforeClass public static void setupCluster() throws IOException, InterruptedException {
+ LOG.info("init embedded Zookeeper");
+ zkServer = new EmbeddedZookeeper();
+ zkConnect = "127.0.0.1:" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 3000, 3000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+ LOG.info("init kafka broker");
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dir", Files.createTempDirectory("kafka-log-dir-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
+ brokerProps.setProperty("offsets.TOPIC.replication.factor", "1");
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ kafkaServer.startup();
+ LOG.info("Creating kafka TOPIC [{}]", TOPIC);
+ AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ setupProducer();
+ sendData();
+ }
+
+ @Before public void setUp() {
+ LOG.info("setting up consumer");
+ this.setupConsumer();
+ this.kafkaRecordIterator = null;
+ }
+
+ @Test public void testHasNextAbsoluteStartEnd() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, (long) RECORDS.size(), 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextGivenStartEnd() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 2L, 4L, 100L);
+ this.compareIterator(RECORDS.stream()
+ .filter((consumerRecord) -> consumerRecord.offset() >= 2L && consumerRecord.offset() < 4L)
+ .collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextNoOffsets() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextLastRecord() {
+ long startOffset = (long) (RECORDS.size() - 1);
+ long lastOffset = (long) RECORDS.size();
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, startOffset, lastOffset, 100L);
+ this.compareIterator(RECORDS.stream()
+ .filter((consumerRecord) -> consumerRecord.offset() >= startOffset && consumerRecord.offset() < lastOffset)
+ .collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextFirstRecord() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 1L, 100L);
+ this.compareIterator(RECORDS.stream()
+ .filter((consumerRecord) -> consumerRecord.offset() >= 0L && consumerRecord.offset() < 1L)
+ .collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextNoStart() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, (Long) null, 10L, 100L);
+ this.compareIterator(RECORDS.stream()
+ .filter((consumerRecord) -> consumerRecord.offset() >= 0L && consumerRecord.offset() < 10L)
+ .collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test public void testHasNextNoEnd() {
+ long lastOffset = (long) RECORDS.size();
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 5L, (Long) null, 100L);
+ this.compareIterator(RECORDS.stream()
+ .filter((consumerRecord) -> consumerRecord.offset() >= 5L && consumerRecord.offset() < lastOffset)
+ .collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test public void testRecordReader() throws IOException {
+ InputSplit inputSplits = new KafkaPullerInputSplit(TOPIC, 0, 0L, 50L, null);
+ KafkaPullerRecordReader recordReader = new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplits, this.conf);
+ List
+ serRecords =
+ RECORDS.stream().map((aRecord) -> KafkaRecordWritable.fromKafkaRecord(aRecord)).collect(Collectors.toList());
+
+ for (int i = 0; i < 50; ++i) {
+ KafkaRecordWritable record = new KafkaRecordWritable();
+ Assert.assertTrue(recordReader.next((NullWritable) null, record));
+ Assert.assertEquals(serRecords.get(i), record);
+ }
+
+ recordReader.close();
+ recordReader = new KafkaPullerRecordReader();
+ TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID());
+ recordReader.initialize(new KafkaPullerInputSplit(TOPIC, 0, 50L, 100L, null), context);
+
+ for (int i = 50; i < 100; ++i) {
+ KafkaRecordWritable record = new KafkaRecordWritable();
+ Assert.assertTrue(recordReader.next(null, record));
+ Assert.assertEquals(serRecords.get(i), record);
+ }
+
+ recordReader.close();
+ }
+
+ @Test(expected = TimeoutException.class) public void testPullingBeyondLimit() {
+ this.kafkaRecordIterator =
+ new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 101L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(expected = IllegalStateException.class) public void testPullingStartGreaterThanEnd() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 1L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(expected = TimeoutException.class) public void testPullingFromEmptyTopic() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 0), 0L, 100L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(expected = TimeoutException.class) public void testPullingFromEmptyPartition() {
+ this.kafkaRecordIterator =
+ new KafkaRecordIterator(this.consumer, new TopicPartition(TOPIC, 1), 0L, 100L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test public void testStartIsEqualEnd() {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 10L, 100L);
+ this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator);
+ }
+
+ @Test public void testStartIsTheLastOffset() {
+ this.kafkaRecordIterator =
+ new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, new Long(RECORD_NUMBER), new Long(RECORD_NUMBER), 100L);
+ this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator);
+ }
+
+ private void compareIterator(List> expected,
+ Iterator> kafkaRecordIterator) {
+ expected.stream().forEachOrdered((expectedRecord) -> {
+ Assert.assertTrue("record with offset " + expectedRecord.offset(), kafkaRecordIterator.hasNext());
+ ConsumerRecord record = kafkaRecordIterator.next();
+ Assert.assertTrue(record.topic().equals(TOPIC));
+ Assert.assertTrue(record.partition() == 0);
+ Assert.assertEquals("Offsets not matching", expectedRecord.offset(), record.offset());
+ byte[] binaryExceptedValue = expectedRecord.value();
+ byte[] binaryExceptedKey = expectedRecord.key();
+ byte[] binaryValue = (byte[]) record.value();
+ byte[] binaryKey = (byte[]) record.key();
+ Assert.assertArrayEquals(binaryExceptedValue, binaryValue);
+ Assert.assertArrayEquals(binaryExceptedKey, binaryKey);
+ });
+ Assert.assertFalse(kafkaRecordIterator.hasNext());
+ }
+
+ private static void setupProducer() {
+ LOG.info("Setting up kafka producer");
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty("max.block.ms", "10000");
+ producer = new KafkaProducer(producerProps);
+ LOG.info("kafka producer started");
+ }
+
+ private void setupConsumer() {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("enable.auto.commit", "false");
+ consumerProps.setProperty("auto.offset.reset", "none");
+ consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
+ this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092");
+ consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("request.timeout.ms", "3002");
+ consumerProps.setProperty("fetch.max.wait.ms", "3001");
+ consumerProps.setProperty("session.timeout.ms", "3001");
+ consumerProps.setProperty("metadata.max.age.ms", "100");
+ this.consumer = new KafkaConsumer(consumerProps);
+ }
+
+ private static void sendData() throws InterruptedException {
+ LOG.info("Sending {} records", RECORD_NUMBER);
+ RECORDS.clear();
+ for (int i = 0; i < RECORD_NUMBER; ++i) {
+
+ final byte[] value = ("VALUE-" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"));
+ //noinspection unchecked
+ producer.send(new ProducerRecord(TOPIC, 0, 0L, KEY_BYTES, value));
+
+ //noinspection unchecked
+ RECORDS.add(new ConsumerRecord(TOPIC, 0, (long) i, 0L, null, 0L, 0, 0, KEY_BYTES, value));
+ }
+
+ producer.close();
+ }
+
+ @After public void tearDown() {
+ this.kafkaRecordIterator = null;
+ if (this.consumer != null) {
+ this.consumer.close();
+ }
+ }
+
+ @AfterClass public static void tearDownCluster() {
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ kafkaServer.zkUtils().close();
+ kafkaServer.awaitShutdown();
+ }
+ zkServer.shutdown();
+ zkClient.close();
+ zkUtils.close();
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
new file mode 100644
index 0000000000..e28c924a9a
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Test class for kafka Writable.
+ */
+public class KafkaRecordWritableTest {
+ public KafkaRecordWritableTest() {
+ }
+
+ @Test public void testWriteReadFields() throws IOException {
+ ConsumerRecord record = new ConsumerRecord("topic", 0, 3L, "key".getBytes(), "value".getBytes());
+ KafkaRecordWritable kafkaRecordWritable = KafkaRecordWritable.fromKafkaRecord(record);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream w = new DataOutputStream(baos);
+ kafkaRecordWritable.write(w);
+ w.flush();
+
+ ByteArrayInputStream input = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream inputStream = new DataInputStream(input);
+ KafkaRecordWritable actualKafkaRecordWritable = new KafkaRecordWritable();
+ actualKafkaRecordWritable.readFields(inputStream);
+ Assert.assertEquals(kafkaRecordWritable, actualKafkaRecordWritable);
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
new file mode 100644
index 0000000000..c7e0e1f3ee
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test Class for Kafka Trimmer Class.
+ */
+public class KafkaScanTrimmerTest {
+ private static final Path PATH = new Path("/tmp");
+
+ private ExprNodeDesc zeroInt = ConstantExprBuilder.build(0);
+ private ExprNodeDesc threeInt = ConstantExprBuilder.build(3);
+ private ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L);
+ private ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L);
+ private ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L);
+ private ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L);
+
+ private ExprNodeDesc
+ partitionColumn =
+ new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, KafkaStorageHandler.PARTITION_COLUMN, null, false);
+ private ExprNodeDesc
+ offsetColumn =
+ new ExprNodeColumnDesc(TypeInfoFactory.longTypeInfo, KafkaStorageHandler.OFFSET_COLUMN, null, false);
+
+ private String topic = "my_topic";
+ private Map
+ fullHouse =
+ ImmutableMap.of(new TopicPartition(topic, 0),
+ new KafkaPullerInputSplit(topic, 0, 0, 45, PATH),
+ new TopicPartition(topic, 1),
+ new KafkaPullerInputSplit(topic, 1, 5, 1005, PATH),
+ new TopicPartition(topic, 2),
+ new KafkaPullerInputSplit(topic, 2, 9, 100, PATH),
+ new TopicPartition(topic, 3),
+ new KafkaPullerInputSplit(topic, 3, 0, 100, PATH));
+
+ @Test public void computeOptimizedScanPartitionBinaryOpFilter() {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ int partitionId = 2;
+ ExprNodeDesc constant = ConstantExprBuilder.build(partitionId);
+ final List children = Lists.newArrayList(partitionColumn, constant);
+
+ ExprNodeGenericFuncDesc node = eq(children);
+ assertNotNull(node);
+
+ Map
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(node)));
+ Map expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() == partitionId);
+ Assert.assertEquals(expected, actual);
+
+ ExprNodeGenericFuncDesc lessNode = lessThan(children);
+ assertNotNull(lessNode);
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(lessNode)));
+ expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() < partitionId);
+ Assert.assertEquals(expected, actual);
+
+ ExprNodeGenericFuncDesc lessEqNode = lessThanEq(children);
+
+ assertNotNull(lessEqNode);
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(lessEqNode)));
+ expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() <= partitionId);
+ Assert.assertEquals(expected, actual);
+
+ }
+
+ @Test public void computeOptimizedScanFalseFilter() {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ ExprNodeGenericFuncDesc
+ falseFilter =
+ and(Lists.newArrayList(eq(Lists.newArrayList(partitionColumn, zeroInt)),
+ eq(Lists.newArrayList(partitionColumn, threeInt))));
+
+ assertNotNull(falseFilter);
+ Map
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(falseFilter)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc
+ falseFilter2 =
+ and(Lists.newArrayList(eq(Lists.newArrayList(offsetColumn, thirtyFiveLong)),
+ eq(Lists.newArrayList(offsetColumn, fortyLong))));
+
+ assertNotNull(falseFilter2);
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(falseFilter2)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc filter3 = or(Lists.newArrayList(falseFilter, falseFilter2));
+
+ assertNotNull(filter3);
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(filter3)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc
+ filter4 =
+ and(Lists.newArrayList(filter3, eq(Lists.newArrayList(partitionColumn, zeroInt))));
+ assertNotNull(filter4);
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(filter4)));
+ Assert.assertTrue(actual.isEmpty());
+ }
+
+ @Test public void computeOptimizedScanOrAndCombinedFilter() {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ // partition = 0 and 30 <= offset < 35 or partition = 3 and 35 <= offset < 75 or (partition = 0 and offset = 40)
+
+ ExprNodeGenericFuncDesc
+ part1 =
+ and(Lists.newArrayList(greaterThanEq(Lists.newArrayList(offsetColumn, thirtyLong)),
+ eq(Lists.newArrayList(partitionColumn, zeroInt)),
+ lessThan(Lists.newArrayList(offsetColumn, thirtyFiveLong))));
+
+ ExprNodeGenericFuncDesc
+ part2 =
+ and(Lists.newArrayList(greaterThanEq(Lists.newArrayList(offsetColumn, thirtyFiveLong)),
+ eq(Lists.newArrayList(partitionColumn, threeInt)),
+ lessThan(Lists.newArrayList(offsetColumn, seventyFiveLong))));
+
+ ExprNodeGenericFuncDesc
+ part3 =
+ and(Lists.newArrayList(eq(Lists.newArrayList(offsetColumn, fortyLong)),
+ eq(Lists.newArrayList(partitionColumn, zeroInt))));
+
+ ExprNodeGenericFuncDesc orExpression = or(Lists.newArrayList(part1, part2, part3));
+
+ assertNotNull(orExpression);
+ Map
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(orExpression)));
+ TopicPartition tpZero = new TopicPartition(topic, 0);
+ TopicPartition toThree = new TopicPartition(topic, 3);
+ KafkaPullerInputSplit split1 = new KafkaPullerInputSplit(topic, 0, 30, 41, PATH);
+ KafkaPullerInputSplit split2 = new KafkaPullerInputSplit(topic, 3, 35, 75, PATH);
+
+ Map expected = ImmutableMap.of(tpZero, split1, toThree, split2);
+ Assert.assertEquals(expected, actual);
+
+ }
+
+ @Test public void computeOptimizedScanPartitionOrAndCombinedFilter() {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+
+ // partition = 1 or (partition >2 and <= 3)
+ ExprNodeGenericFuncDesc eq = eq(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(1)));
+ ExprNodeGenericFuncDesc lessEq = lessThanEq(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(3)));
+ ExprNodeGenericFuncDesc greater = greaterThan(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(2)));
+ ExprNodeGenericFuncDesc orNode = or(Lists.newArrayList(and(Lists.newArrayList(lessEq, greater)), eq));
+
+ Map
+ actual =
+ kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities
+ .deserializeExpression(SerializationUtilities.serializeExpression(orNode)));
+ Map
+ expected =
+ Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() == 1 || tp.getPartition() == 3);
+ Assert.assertEquals(expected, actual);
+ assertNotNull(orNode);
+ }
+
+ @Test public void buildScanFormPartitionPredicateEq() {
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse, PredicateLeaf.Operator.EQUALS, 3, false, false);
+ TopicPartition topicPartition = new TopicPartition(topic, 3);
+ Assert.assertEquals(fullHouse.get(topicPartition), actual.get(topicPartition));
+ }
+
+ @Test public void buildScanFormPartitionPredicateLess() {
+ // partitionConst < partitionColumn (flip true)
+ int partitionConst = 2;
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ true,
+ false);
+
+ Map
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionConst >= partitionColumn (flip true, negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ true,
+ true);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> partitionConst >= Objects.requireNonNull(entry).getKey().partition());
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn >= partitionConst (negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ false,
+ true);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn < partitionConst (negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ false,
+ false);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() < partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+ }
+
+ @Test public void buildScanFormPartitionPredicateLessEq() {
+ // partitionConst <= partitionColumn (flip true)
+ int partitionConst = 2;
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ true,
+ false);
+
+ Map
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionConst > partitionColumn (flip true, negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ true,
+ true);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> partitionConst > Objects.requireNonNull(entry).getKey().partition());
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn > partitionConst (negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ false,
+ true);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn <= partitionConst (negation true)
+ actual =
+ KafkaScanTrimmer.buildScanFormPartitionPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ false,
+ false);
+
+ expected =
+ Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() <= partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+ }
+
+ @Test public void buildScanFromOffsetPredicateEq() {
+ long constantOffset = 30;
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.EQUALS,
+ constantOffset,
+ false,
+ false);
+ Map
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ constantOffset,
+ constantOffset + 1,
+ entry.getPath()));
+
+ Assert.assertEquals(expected, actual);
+
+ // seek to end if offset is out of reach
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse, PredicateLeaf.Operator.EQUALS, 3000000L, false, false);
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getEndOffset(),
+ entry.getEndOffset(),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // seek to end if offset is out of reach
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse, PredicateLeaf.Operator.EQUALS, 0L, false, false);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset() > 0 ? entry.getEndOffset() : 0,
+ entry.getStartOffset() > 0 ? entry.getEndOffset() : 1,
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ }
+
+ @Test public void buildScanFromOffsetPredicateLess() {
+ long constantOffset = 50;
+ // columnOffset < constant
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ false,
+ false);
+
+ Map
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset, entry.getEndOffset()),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset > constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ true,
+ false);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)),
+ entry.getEndOffset(),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset >= constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ false,
+ true);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)),
+ entry.getEndOffset(),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset <= constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ true,
+ true);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset + 1, entry.getEndOffset()),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ }
+
+ @Test public void buildScanFromOffsetPredicateLessEq() {
+ long constantOffset = 50;
+ // columnOffset < constant
+ Map
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ false,
+ false);
+
+ Map
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset + 1, entry.getEndOffset()),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset >= constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ true,
+ false);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)),
+ entry.getEndOffset(),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset > constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ false,
+ true);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)),
+ entry.getEndOffset(),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset < constant
+ actual =
+ KafkaScanTrimmer.buildScanFromOffsetPredicate(fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ true,
+ true);
+
+ expected =
+ Maps.transformValues(fullHouse,
+ entry -> new KafkaPullerInputSplit(Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset, entry.getEndOffset()),
+ entry.getPath()));
+ Assert.assertEquals(expected, actual);
+ }
+
+ private static class ConstantExprBuilder {
+ static ExprNodeDesc build(long constant) {
+ return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant);
+ }
+
+ static ExprNodeDesc build(int constant) {
+ return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant);
+ }
+ }
+
+ private static ExprNodeGenericFuncDesc or(List children) {
+ return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPOr(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc and(List children) {
+ return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc eq(List children) {
+ return new ExprNodeGenericFuncDesc(children.get(0).getTypeInfo(), new GenericUDFOPEqual(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc lessThan(List children) {
+ return new ExprNodeGenericFuncDesc(children.get(0).getTypeInfo(), new GenericUDFOPLessThan(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc lessThanEq(List children) {
+ return new ExprNodeGenericFuncDesc(children.get(0).getTypeInfo(), new GenericUDFOPEqualOrLessThan(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc greaterThan(List children) {
+ return new ExprNodeGenericFuncDesc(children.get(0).getTypeInfo(), new GenericUDFOPGreaterThan(), children);
+ }
+
+ private static ExprNodeGenericFuncDesc greaterThanEq(List children) {
+ return new ExprNodeGenericFuncDesc(children.get(0).getTypeInfo(), new GenericUDFOPEqualOrGreaterThan(), children);
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
new file mode 100644
index 0000000000..28c532f611
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+/**
+ * Test for Utility class.
+ */
+public class KafkaStreamingUtilsTest {
+ public KafkaStreamingUtilsTest() {
+ }
+
+ @Test public void testConsumerProperties() {
+ Configuration configuration = new Configuration();
+ configuration.set("kafka.bootstrap.servers", "localhost:9090");
+ configuration.set("kafka.consumer.fetch.max.wait.ms", "40");
+ configuration.set("kafka.consumer.my.new.wait.ms", "400");
+ Properties properties = KafkaStreamingUtils.consumerProperties(configuration);
+ Assert.assertEquals("localhost:9090", properties.getProperty("bootstrap.servers"));
+ Assert.assertEquals("40", properties.getProperty("fetch.max.wait.ms"));
+ Assert.assertEquals("400", properties.getProperty("my.new.wait.ms"));
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/package-info.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/package-info.java
new file mode 100644
index 0000000000..8a0d8fd0b0
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package info file.
+ */
+
+package org.apache.hadoop.hive.kafka;
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 0f1d5eea4c..ffdd340fde 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -18,44 +18,9 @@
package org.apache.hadoop.hive.llap.cli;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
-import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -64,8 +29,14 @@
import org.apache.hadoop.hive.common.CompressionUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -77,22 +48,50 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.rewrite.handler.Rule;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class LlapServiceDriver {
protected static final Logger LOG = LoggerFactory.getLogger(LlapServiceDriver.class.getName());
- private static final String[] DEFAULT_AUX_CLASSES = new String[] {
- "org.apache.hive.hcatalog.data.JsonSerDe","org.apache.hadoop.hive.druid.DruidStorageHandler",
- "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
- "org.apache.commons.pool.impl.GenericObjectPool"
- };
+ private static final String[]
+ DEFAULT_AUX_CLASSES =
+ new String[] { "org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler",
+ "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
+ "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler" };
private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe";
private static final String[] NEEDED_CONFIGS = LlapDaemonConfiguration.DAEMON_CONFIGS;
private static final String[] OPTIONAL_CONFIGS = LlapDaemonConfiguration.SSL_DAEMON_CONFIGS;
diff --git packaging/pom.xml packaging/pom.xml
index 5c859acfad..0f0037bd61 100644
--- packaging/pom.xml
+++ packaging/pom.xml
@@ -213,6 +213,11 @@
hive-druid-handler
${project.version}