record = recordsCursor.next();
+ bytesWritable.set(record);
+ consumedRecords += 1;
+ readBytes += record.serializedValueSize();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable createKey()
+ {
+ return NullWritable.get();
+ }
+
+ @Override
+ public KafkaRecordWritable createValue()
+ {
+ return new KafkaRecordWritable();
+ }
+
+ @Override
+ public long getPos() throws IOException
+ {
+ return consumedRecords;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException
+ {
+ currentWritableValue = new KafkaRecordWritable();
+ if (next(NullWritable.get(), currentWritableValue)) {
+ return true;
+ }
+ currentWritableValue = null;
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException
+ {
+ return NullWritable.get();
+ }
+
+ @Override
+ public KafkaRecordWritable getCurrentValue() throws IOException, InterruptedException
+ {
+ return Preconditions.checkNotNull(currentWritableValue);
+ }
+
+ @Override
+ public float getProgress() throws IOException
+ {
+ if (consumedRecords == 0) {
+ return 0f;
+ }
+ if (consumedRecords >= totalNumberRecords) {
+ return 1f;
+ }
+ return consumedRecords * 1.0f / totalNumberRecords;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (!started) {
+ return;
+ }
+ log.trace("total read bytes [{}]", readBytes);
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ closer.close();
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
new file mode 100644
index 0000000000..d120b663ec
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end.
+ *
+ * If {@code startOffset} is not null will seek up to that offset
+ * Else If {@code startOffset} is null will seek to beginning see {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(Collection)}
+ *
+ * When provided with an end offset it will return records up to the record with offset == endOffset - 1,
+ * Else If end offsets is null it will read up to the current end see {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(Collection)}
+ *
+ * Current implementation of this Iterator will throw and exception if can not poll up to the endOffset - 1
+ */
+public class KafkaRecordIterator implements Iterator>
+{
+ private static final Logger log = LoggerFactory.getLogger(KafkaRecordIterator.class);
+
+ private final Consumer consumer;
+ private final TopicPartition topicPartition;
+ private long endOffset;
+ private long startOffset;
+ private final long pollTimeoutMs;
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private ConsumerRecords records;
+ private long currentOffset;
+ private ConsumerRecord nextRecord;
+ private boolean hasMore = true;
+
+ //Kafka consumer poll method return an iterator of records.
+ private Iterator> consumerRecordIterator = null;
+
+ /**
+ * @param consumer functional kafka consumer
+ * @param topicPartition kafka topic partition
+ * @param startOffset start position of stream.
+ * @param endOffset requested end position. If null will read up to current last
+ * @param pollTimeoutMs poll time out in ms
+ */
+ public KafkaRecordIterator(
+ Consumer consumer, TopicPartition topicPartition,
+ @Nullable Long startOffset, @Nullable Long endOffset, long pollTimeoutMs
+ )
+ {
+ this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null");
+ this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
+ this.pollTimeoutMs = pollTimeoutMs;
+ Preconditions.checkState(this.pollTimeoutMs > 0, "poll timeout has to be positive number");
+ this.startOffset = startOffset == null ? -1l : startOffset;
+ this.endOffset = endOffset == null ? -1l : endOffset;
+ }
+
+ public KafkaRecordIterator(
+ Consumer consumer, TopicPartition tp, long pollTimeoutMs
+ )
+ {
+ this(consumer, tp, null, null, pollTimeoutMs);
+ }
+
+ private void assignAndSeek()
+ {
+ // assign topic partition to consumer
+ final List topicPartitionList = ImmutableList.of(topicPartition);
+ if (log.isTraceEnabled()) {
+ stopwatch.reset().start();
+ }
+
+ consumer.assign(topicPartitionList);
+ // compute offsets and seek to start
+ if (startOffset > -1) {
+ log.info("Seeking to offset [{}] of topic partition [{}]", startOffset, topicPartition);
+ consumer.seek(topicPartition, startOffset);
+ } else {
+ log.info("Seeking to beginning of topic partition [{}]", topicPartition);
+ // seekToBeginning is lazy thus need to call position() or poll(0)
+ this.consumer.seekToBeginning(Collections.singleton(topicPartition));
+ startOffset = consumer.position(topicPartition);
+ }
+ if (endOffset == -1) {
+ this.endOffset = consumer.endOffsets(topicPartitionList).get(topicPartition);
+ log.debug("EndOffset is {}", endOffset);
+ }
+ currentOffset = consumer.position(topicPartition);
+ Preconditions.checkState(this.endOffset >= currentOffset,
+ "End offset [%s] need to be greater than start offset [%s]", this.endOffset, currentOffset
+ );
+ log.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset [{}]; endOffset [{}]", topicPartition,
+ currentOffset, this.endOffset
+ );
+ if (log.isTraceEnabled()) {
+ stopwatch.stop();
+ log.trace("Time to assign and seek [{}] ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (records == null) {
+ assignAndSeek();
+ }
+ //Init poll OR Need to poll at least one more record since currentOffset + 1 < endOffset
+ if (records == null || (hasMore == false && currentOffset + 1 < endOffset)) {
+ pollRecords();
+ findNext();
+ }
+ return hasMore;
+ }
+
+ private void pollRecords()
+ {
+ if (log.isTraceEnabled()) {
+ stopwatch.reset().start();
+ }
+ records = consumer.poll(pollTimeoutMs);
+ if (log.isTraceEnabled()) {
+ stopwatch.stop();
+ log.trace("Pulled [{}] records in [{}] ms", records.count(),
+ stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ );
+ }
+ // Fail if we can not poll within one lap of pollTimeoutMs.
+ Preconditions.checkState(
+ !records.isEmpty() || currentOffset == endOffset,
+ "Current offset: [%s]-TopicPartition:[%s], target End offset:[%s]."
+ + "Consumer returned 0 record due to exhausted poll timeout [%s]ms, try increasing[%s] ",
+ currentOffset,
+ topicPartition.toString(),
+ endOffset,
+ pollTimeoutMs,
+ KafkaStorageHandler.HIVE_KAFKA_POLL_TIMEOUT
+ );
+ consumerRecordIterator = records.iterator();
+ }
+
+ @Override
+ public ConsumerRecord next()
+ {
+ ConsumerRecord value = nextRecord;
+ Preconditions.checkState(value.offset() < endOffset);
+ findNext();
+ return Preconditions.checkNotNull(value);
+ }
+
+ private void findNext()
+ {
+ if (consumerRecordIterator.hasNext()) {
+ nextRecord = consumerRecordIterator.next();
+ hasMore = true;
+ if (nextRecord.offset() < endOffset) {
+ currentOffset = nextRecord.offset();
+ return;
+ }
+ }
+ hasMore = false;
+ nextRecord = null;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
new file mode 100644
index 0000000000..08cf53be64
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Writable implementation of Kafka ConsumerRecord.
+ * Serialized in the form
+ * kafkaRecordTimestamp(long) | kafkaPartition (int) | recordOffset (long) | value.size (int) | value (byte [])
+ */
+public class KafkaRecordWritable implements Writable {
+
+ private int partition;
+ private long offset;
+ private long timestamp;
+ private byte [] value;
+
+ public static KafkaRecordWritable fromKafkaRecord(ConsumerRecord consumerRecord) {
+ return new KafkaRecordWritable(consumerRecord.partition(), consumerRecord.offset(),
+ consumerRecord.timestamp(), consumerRecord.value()
+ );
+ }
+
+ public void set(ConsumerRecord consumerRecord) {
+ this.partition = consumerRecord.partition();
+ this.timestamp = consumerRecord.timestamp();
+ this.offset = consumerRecord.offset();
+ this.value = consumerRecord.value();
+ }
+
+ private KafkaRecordWritable(int partition, long offset, long timestamp, byte[] value) {
+ this.partition = partition;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+ public KafkaRecordWritable() {
+ }
+
+ @Override public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(timestamp);
+ dataOutput.writeInt(partition);
+ dataOutput.writeLong(offset);
+ dataOutput.writeInt(value.length);
+ dataOutput.write(value);
+ }
+
+ @Override public void readFields(DataInput dataInput) throws IOException {
+ timestamp = dataInput.readLong();
+ partition = dataInput.readInt();
+ offset = dataInput.readLong();
+ int size = dataInput.readInt();
+ if (size > 0) {
+ value = new byte[size];
+ dataInput.readFully(value);
+ } else {
+ value = new byte[0];
+ }
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaRecordWritable)) {
+ return false;
+ }
+ KafkaRecordWritable that = (KafkaRecordWritable) o;
+ return getPartition() == that.getPartition() && getOffset() == that.getOffset()
+ && getTimestamp() == that.getTimestamp() && Arrays.equals(getValue(), that.getValue());
+ }
+
+ @Override public int hashCode() {
+
+ int result = Objects.hash(getPartition(), getOffset(), getTimestamp());
+ result = 31 * result + Arrays.hashCode(getValue());
+ return result;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
new file mode 100644
index 0000000000..4ce82b5939
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * Kafka Range trimmer, takes a full kafka scan and prune the scan based on a filter expression
+ * it is a Best effort trimmer and it can not replace the filter it self, filtration still takes place in Hive executor
+ */
+public class KafkaScanTrimmer
+{
+ private static final Logger log = LoggerFactory.getLogger(KafkaScanTrimmer.class);
+ private final Map fullHouse;
+ private final KafkaConsumer kafkaConsumer;
+
+
+ /**
+ * @param fullHouse initial full scan to be pruned, this is a map of Topic partition to input split.
+ * @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed
+ */
+ public KafkaScanTrimmer(
+ Map fullHouse,
+ KafkaConsumer kafkaConsumer
+ )
+ {
+ this.fullHouse = fullHouse;
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ * This might block due to calls like:
+ * org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(java.util.Map)
+ *
+ * @param filterExpression filter expression to be used for pruning scan
+ *
+ * @return tiny house of of the full house based on filter expression
+ */
+ public Map computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression)
+ {
+ Map optimizedScan = parse(filterExpression);
+
+ if (log.isDebugEnabled()) {
+ if (optimizedScan != null) {
+ log.debug("Optimized scan:");
+ optimizedScan.forEach((tp, input) -> log.info(
+ "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]",
+ tp.topic(),
+ tp.partition(),
+ input.getStartOffset(), input.getEndOffset()
+ ));
+ } else {
+ log.debug("No optimization thus using full scan ");
+ fullHouse.forEach((tp, input) -> log.info(
+ "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]",
+ tp.topic(),
+ tp.partition(),
+ input.getStartOffset(), input.getEndOffset()
+ ));
+ }
+ }
+ return optimizedScan == null ? fullHouse : optimizedScan;
+ }
+
+ /**
+ * @param expression filter to parse and trim the full scan
+ *
+ * @return Map of optimized kafka range scans or null if it is impossible to optimize.
+ */
+ @Nullable
+ private Map parse(ExprNodeDesc expression)
+ {
+ if (expression.getClass() != ExprNodeGenericFuncDesc.class) {
+ return null;
+ }
+ // get the kind of expression
+ ExprNodeGenericFuncDesc expr = (ExprNodeGenericFuncDesc) expression;
+ Class> op = expr.getGenericUDF().getClass();
+
+ // handle the logical operators
+ if (FunctionRegistry.isOpOr(expr)) {
+ return pushOrOp(expr);
+ }
+ if (FunctionRegistry.isOpAnd(expr)) {
+ return pushAndOp(expr);
+ }
+
+ if (op == GenericUDFOPGreaterThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, true);
+ } else if (op == GenericUDFOPEqualOrGreaterThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, true);
+ } else if (op == GenericUDFOPLessThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, false);
+ } else if (op == GenericUDFOPEqualOrLessThan.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, false);
+ } else if (op == GenericUDFOPEqual.class) {
+ return pushLeaf(expr, PredicateLeaf.Operator.EQUALS, false);
+ // otherwise, we didn't understand it, so bailout
+ } else {
+ return null;
+ }
+ }
+
+
+ /**
+ * @param expr leaf node to push
+ * @param operator operator
+ * @param negation true if it is a negation, this is used to represent:
+ * GenericUDFOPGreaterThan and GenericUDFOPEqualOrGreaterThan
+ * using PredicateLeaf.Operator.LESS_THAN and PredicateLeaf.Operator.LESS_THAN_EQUALS
+ *
+ * @return leaf scan or null if can not figure out push down
+ */
+ @Nullable
+ private Map pushLeaf(
+ ExprNodeGenericFuncDesc expr, PredicateLeaf.Operator operator, boolean negation
+ )
+ {
+ if (expr.getChildren().size() != 2) {
+ return null;
+ }
+ GenericUDF genericUDF = expr.getGenericUDF();
+ if (!(genericUDF instanceof GenericUDFBaseCompare)) {
+ return null;
+ }
+ ExprNodeDesc expr1 = expr.getChildren().get(0);
+ ExprNodeDesc expr2 = expr.getChildren().get(1);
+ // We may need to peel off the GenericUDFBridge that is added by CBO or user
+ if (expr1.getTypeInfo().equals(expr2.getTypeInfo())) {
+ expr1 = getColumnExpr(expr1);
+ expr2 = getColumnExpr(expr2);
+ }
+
+ ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2);
+ if (extracted == null || (extracted.length > 2)) {
+ return null;
+ }
+
+ ExprNodeColumnDesc columnDesc;
+ ExprNodeConstantDesc constantDesc;
+ final boolean flip;
+
+ if (extracted[0] instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc) extracted[0];
+ constantDesc = (ExprNodeConstantDesc) extracted[1];
+ flip = false;
+
+ } else {
+ flip = true;
+ columnDesc = (ExprNodeColumnDesc) extracted[1];
+ constantDesc = (ExprNodeConstantDesc) extracted[0];
+ }
+
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.__PARTITION)) {
+ return buildScanFormPartitionPredicate(
+ fullHouse,
+ operator,
+ ((Number) constantDesc.getValue()).intValue(),
+ flip,
+ negation
+ );
+
+ }
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.__OFFSET)) {
+ return buildScanFromOffsetPredicate(
+ fullHouse,
+ operator,
+ ((Number) constantDesc.getValue()).longValue(),
+ flip,
+ negation
+ );
+ }
+
+ if (columnDesc.getColumn().equals(KafkaStorageHandler.__TIMESTAMP)) {
+ long timestamp = ((Number) constantDesc.getValue()).longValue();
+ return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer);
+ }
+ return null;
+ }
+
+
+ /**
+ * Trim kafka scan using a leaf binary predicate on partition column
+ *
+ * @param fullScan kafka full scan to be optimized
+ * @param operator predicate operator, equal, lessThan or lessThanEqual
+ * @param partitionConst partition constant value
+ * @param flip true if the position of column and constant is flipped by default assuming column OP constant
+ * @param negation true if the expression is a negation of the original expression
+ *
+ * @return filtered kafka scan
+ */
+ @VisibleForTesting
+ protected static Map buildScanFormPartitionPredicate(
+ Map fullScan,
+ PredicateLeaf.Operator operator, int partitionConst, boolean flip, boolean negation
+ )
+ {
+ final Predicate predicate;
+ final Predicate intermediatePredicate;
+ switch (operator) {
+ case EQUALS:
+ predicate = topicPartition -> topicPartition != null && topicPartition.partition() == partitionConst;
+ break;
+ case LESS_THAN:
+ intermediatePredicate = flip
+ ? topicPartition -> topicPartition != null
+ && partitionConst < topicPartition.partition()
+ : topicPartition -> topicPartition != null
+ && topicPartition.partition() < partitionConst;
+
+ predicate = negation ? intermediatePredicate.negate() : intermediatePredicate;
+ break;
+ case LESS_THAN_EQUALS:
+ intermediatePredicate = flip ? topicPartition -> topicPartition != null
+ && partitionConst
+ <= topicPartition.partition()
+ : topicPartition -> topicPartition != null
+ && topicPartition.partition() <= partitionConst;
+
+ predicate = negation ? intermediatePredicate.negate() : intermediatePredicate;
+ break;
+ default:
+ //Default to select * for unknown cases
+ predicate = topicPartition -> true;
+ }
+
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ // Filter full scan based on predicate
+ fullScan.entrySet()
+ .stream()
+ .filter(entry -> predicate.test(entry.getKey()))
+ .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone()));
+ return builder.build();
+ }
+
+ /**
+ * @param fullScan full kafka scan to be pruned
+ * @param operator operator kind
+ * @param offsetConst offset constant value
+ * @param flip true if position of constant and column were flipped by default assuming COLUMN OP CONSTANT
+ * @param negation true if the expression is a negation of the original expression
+ *
+ * @return optimized kafka scan
+ */
+ @VisibleForTesting
+ protected static Map buildScanFromOffsetPredicate(
+ final Map fullScan,
+ PredicateLeaf.Operator operator, long offsetConst, boolean flip, boolean negation
+ )
+ {
+ final boolean isEndBound;
+ final long startOffset;
+ final long endOffset;
+
+ if (flip == false && negation == false || flip == true && negation == true) {
+ isEndBound = true;
+ } else {
+ isEndBound = false;
+ }
+ switch (operator) {
+ case LESS_THAN_EQUALS:
+ if (isEndBound) {
+ startOffset = -1;
+ endOffset = negation ? offsetConst : offsetConst + 1;
+ } else {
+ endOffset = -1;
+ startOffset = negation ? offsetConst + 1 : offsetConst;
+ }
+ break;
+ case EQUALS:
+ startOffset = offsetConst;
+ endOffset = offsetConst + 1;
+ break;
+ case LESS_THAN:
+ if (isEndBound) {
+ endOffset = negation ? offsetConst + 1 : offsetConst;
+ startOffset = -1;
+ } else {
+ endOffset = -1;
+ startOffset = negation ? offsetConst : offsetConst + 1;
+ }
+ break;
+ default:
+ // default to select *
+ startOffset = -1;
+ endOffset = -1;
+ }
+
+ final Map newScan = new HashMap<>();
+
+ fullScan.forEach((tp, existingInputSplit) -> {
+ final KafkaPullerInputSplit newInputSplit;
+ if (startOffset != -1 && endOffset == -1) {
+ newInputSplit = new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ // @TODO make sure that this is okay
+ //if the user as for start offset > max offset will replace with last offset
+ Math.min(startOffset, existingInputSplit.getEndOffset()),
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getPath()
+ );
+ } else if (endOffset != -1 && startOffset == -1) {
+ newInputSplit = new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ existingInputSplit.getStartOffset(),
+ //@TODO check this, if user ask for non existing end offset ignore it and position head on start
+ Math.max(endOffset, existingInputSplit.getStartOffset()),
+ existingInputSplit.getPath()
+ );
+ } else if (endOffset == startOffset + 1) {
+ if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) {
+ newInputSplit = new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ //@TODO check this with team if we have ask for offset out of range what to do ?
+ // here am seeking to last offset
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getPath()
+ );
+ } else {
+ newInputSplit = new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ startOffset,
+ endOffset,
+ existingInputSplit.getPath()
+ );
+ }
+
+ } else {
+ newInputSplit = new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ existingInputSplit.getStartOffset(),
+ existingInputSplit.getEndOffset(),
+ existingInputSplit.getPath()
+ );
+ }
+
+ newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit));
+ });
+
+ return newScan;
+ }
+
+ @Nullable
+ protected static Map buildScanForTimesPredicate(
+ final Map fullHouse,
+ PredicateLeaf.Operator operator, long timestamp, boolean flip, boolean negation, KafkaConsumer consumer
+ )
+ {
+ long increment = (flip && operator == PredicateLeaf.Operator.LESS_THAN
+ || negation && operator == PredicateLeaf.Operator.LESS_THAN_EQUALS) ? 1L : 0L;
+ // only accepted cases are timestamp_column [ > ; >= ; = ]constant
+ if (operator == PredicateLeaf.Operator.EQUALS || flip ^ negation) {
+ final Map timePartitionsMap = Maps.toMap(fullHouse.keySet(), tp -> timestamp + increment);
+ try {
+ // Based on Kafka docs
+ // NULL will be returned for that partition If the message format version in a partition is before 0.10.0
+ Map offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap);
+ final Map newScan = Maps.toMap(fullHouse.keySet(), tp -> {
+ KafkaPullerInputSplit existing = fullHouse.get(tp);
+ OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp);
+ //Null in case filter doesn't match or field not existing ie old broker thus return empty scan.
+ final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset();
+ return new KafkaPullerInputSplit(
+ tp.topic(),
+ tp.partition(),
+ startOffset,
+ existing.getEndOffset(),
+ existing.getPath()
+ );
+ });
+ return newScan;
+ }
+ catch (Exception e) {
+ log.error("Error while looking up offsets for time", e);
+ //Bailout when can not figure out offsets for times.
+ return null;
+ }
+
+ }
+ return null;
+ }
+
+ /**
+ * @param expr And expression to be parsed
+ *
+ * @return either full scan or an optimized sub scan.
+ */
+ private Map pushAndOp(ExprNodeGenericFuncDesc expr)
+ {
+ Map currentScan = new HashMap<>();
+
+ fullHouse.forEach((tp, input) -> currentScan.put(
+ tp,
+ KafkaPullerInputSplit.copyOf(input)
+ ));
+
+ for (ExprNodeDesc child :
+ expr.getChildren()) {
+ Map scan = parse(child);
+ if (scan != null) {
+ Set currentKeys = ImmutableSet.copyOf(currentScan.keySet());
+ currentKeys.stream().forEach(key -> {
+ KafkaPullerInputSplit newSplit = scan.get(key);
+ KafkaPullerInputSplit oldSplit = currentScan.get(key);
+ currentScan.remove(key);
+ if (newSplit != null) {
+ KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit);
+ if (intersectionSplit != null) {
+ currentScan.put(key, intersectionSplit);
+ }
+ }
+ });
+
+ }
+ }
+ return currentScan;
+ }
+
+ @Nullable
+ private Map pushOrOp(ExprNodeGenericFuncDesc expr)
+ {
+ final Map currentScan = new HashMap<>();
+ for (ExprNodeDesc child :
+ expr.getChildren()) {
+ Map scan = parse(child);
+ if (scan == null) {
+ // if any of the children is unknown bailout
+ return null;
+ }
+
+ scan.forEach((tp, input) -> {
+ KafkaPullerInputSplit existingSplit = currentScan.get(tp);
+ currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
+ });
+ }
+ return currentScan;
+ }
+
+ private static ExprNodeDesc getColumnExpr(ExprNodeDesc expr)
+ {
+ if (expr instanceof ExprNodeColumnDesc) {
+ return expr;
+ }
+ ExprNodeGenericFuncDesc funcDesc = null;
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ funcDesc = (ExprNodeGenericFuncDesc) expr;
+ }
+ if (null == funcDesc) {
+ return expr;
+ }
+ GenericUDF udf = funcDesc.getGenericUDF();
+ // check if its a simple cast expression.
+ if ((udf instanceof GenericUDFBridge || udf instanceof GenericUDFToBinary
+ || udf instanceof GenericUDFToChar || udf instanceof GenericUDFToVarchar
+ || udf instanceof GenericUDFToDecimal || udf instanceof GenericUDFToDate
+ || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp)
+ && funcDesc.getChildren().size() == 1
+ && funcDesc.getChildren().get(0) instanceof ExprNodeColumnDesc) {
+ return expr.getChildren().get(0);
+ }
+ return expr;
+ }
+
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
new file mode 100644
index 0000000000..0639707e73
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaStorageHandler implements HiveStorageHandler
+{
+
+ public static final String __TIMESTAMP = "__timestamp";
+ public static final String __PARTITION = "__partition";
+ public static final String __OFFSET = "__offset";
+ public static final String SERDE_CLASS_NAME = "kafka.serde.class";
+ public static final String HIVE_KAFKA_TOPIC = "kafka.topic";
+ public static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
+ public static final String HIVE_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+ public static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms";
+ public static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000l; // 5 seconds
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStorageHandler.class);
+
+ Configuration configuration;
+
+ @Override
+ public Class extends InputFormat> getInputFormatClass()
+ {
+ return KafkaPullerInputFormat.class;
+ }
+
+ @Override
+ public Class extends OutputFormat> getOutputFormatClass()
+ {
+ return NullOutputFormat.class;
+ }
+
+ @Override
+ public Class extends AbstractSerDe> getSerDeClass()
+ {
+ return GenericKafkaSerDe.class;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook()
+ {
+ return null;
+ }
+
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException
+ {
+ return new DefaultHiveAuthorizationProvider();
+ }
+
+ @Override
+ public void configureInputJobProperties(
+ TableDesc tableDesc,
+ Map jobProperties
+ )
+ {
+ jobProperties.put(HIVE_KAFKA_TOPIC, Preconditions
+ .checkNotNull(
+ tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC),
+ "kafka topic missing set table property->" + HIVE_KAFKA_TOPIC
+ ));
+ log.debug("Table properties: Kafka Topic {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC));
+ jobProperties.put(HIVE_KAFKA_BOOTSTRAP_SERVERS, Preconditions
+ .checkNotNull(
+ tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS),
+ "Broker address missing set table property->" + HIVE_KAFKA_BOOTSTRAP_SERVERS
+ ));
+ log.debug("Table properties: Kafka broker {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS));
+ jobProperties.put(
+ SERDE_CLASS_NAME,
+ tableDesc.getProperties().getProperty(SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName())
+ );
+
+ log.info("Table properties: SerDe class name {}", jobProperties.get(SERDE_CLASS_NAME));
+
+ //set extra properties
+ tableDesc.getProperties()
+ .entrySet()
+ .stream()
+ .filter(
+ objectObjectEntry -> objectObjectEntry.getKey()
+ .toString()
+ .toLowerCase()
+ .startsWith(CONSUMER_CONFIGURATION_PREFIX))
+ .forEach(entry -> {
+ String key = entry.getKey().toString().substring(CONSUMER_CONFIGURATION_PREFIX.length() + 1);
+ String value = entry.getValue().toString();
+ jobProperties.put(key, value);
+ log.info("Setting extra job properties: key [{}] -> value [{}]", key, value);
+
+ });
+ }
+
+ @Override
+ public void configureInputJobCredentials(
+ TableDesc tableDesc,
+ Map secrets
+ )
+ {
+
+ }
+
+ @Override
+ public void configureOutputJobProperties(
+ TableDesc tableDesc,
+ Map jobProperties
+ )
+ {
+
+ }
+
+ @Override
+ public void configureTableJobProperties(
+ TableDesc tableDesc,
+ Map jobProperties
+ )
+ {
+ configureInputJobProperties(tableDesc, jobProperties);
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf)
+ {
+ Map properties = new HashMap<>();
+ configureInputJobProperties(tableDesc, properties);
+ properties.forEach((key, value) -> jobConf.set(key, value));
+ try {
+ KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration configuration)
+ {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Configuration getConf()
+ {
+ return configuration;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
new file mode 100644
index 0000000000..e5c83f935e
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+
+/**
+ * Utilities class
+ */
+public class KafkaStreamingUtils {
+
+ private KafkaStreamingUtils() {}
+
+ /**
+ * @param configuration Job configs
+ *
+ * @return default consumer properties
+ */
+ public static Properties consumerProperties(Configuration configuration) {
+ final Properties props = new Properties();
+ // those are very important to set to avoid long blocking
+ props.setProperty("request.timeout.ms", "10001" );
+ props.setProperty("fetch.max.wait.ms", "10000" );
+ props.setProperty("session.timeout.ms", "10000" );
+ // we are managing the commit offset
+ props.setProperty("enable.auto.commit", "false");
+ // we are seeking in the stream so no reset
+ props.setProperty("auto.offset.reset", "none");
+ String brokerEndPoint = configuration.get(KafkaStorageHandler.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+ props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+ props.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ // user can always override stuff
+ final Map kafkaProperties =
+ configuration.getValByRegex("^" + KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX + "\\..*");
+ for (Map.Entry entry : kafkaProperties.entrySet()) {
+ props.setProperty(entry.getKey().substring(
+ KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX.length() + 1),
+ entry.getValue()
+ );
+ }
+ return props;
+ }
+
+ public static void copyDependencyJars(Configuration conf, Class>... classes) throws IOException
+ {
+ Set jars = new HashSet<>();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ jars.addAll(conf.getStringCollection("tmpjars"));
+ jars.addAll(Arrays.asList(classes).stream().filter(aClass -> aClass != null).map(clazz -> {
+ String path = Utilities.jarFinderGetJar(clazz);
+ if (path == null) {
+ throw new RuntimeException(
+ "Could not find jar for class " + clazz + " in order to ship it to the cluster.");
+ }
+ try {
+ if (!localFs.exists(new Path(path))) {
+ throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return path;
+ }).collect(Collectors.toList()));
+
+ if (jars.isEmpty()) {
+ return;
+ }
+ conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
new file mode 100644
index 0000000000..aa3aba7e3e
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class KafkaPullerInputSplitTest
+{
+ private String topic = "my_topic";
+ private KafkaPullerInputSplit expectedInputSplit;
+
+ public KafkaPullerInputSplitTest()
+ {
+ this.expectedInputSplit = new KafkaPullerInputSplit(this.topic, 1, 50L, 56L,
+ new Path("/tmp")
+ );
+ }
+
+ @Test
+ public void testWriteRead() throws IOException
+ {
+ DataOutput output = new DataOutputBuffer();
+ this.expectedInputSplit.write(output);
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit();
+ DataInput input = new DataInputBuffer();
+ ((DataInputBuffer) input).reset(((DataOutputBuffer) output).getData(), 0, ((DataOutputBuffer) output).getLength());
+ kafkaPullerInputSplit.readFields(input);
+ Assert.assertEquals(this.expectedInputSplit, kafkaPullerInputSplit);
+ }
+
+
+ @Test
+ public void andRangeOverLapping()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 10,
+ 400,
+ new Path("/tmp")
+ );
+
+ KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 3,
+ 200,
+ new Path("/tmp")
+ );
+
+
+ Assert.assertEquals(
+ new KafkaPullerInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
+ KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2)
+ );
+
+
+ }
+
+
+ @Test
+ public void andRangeNonOverLapping()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 10,
+ 400,
+ new Path("/tmp")
+ );
+
+ KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 550,
+ 700,
+ new Path("/tmp")
+ );
+
+
+ Assert.assertEquals(null, KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
+
+
+ }
+
+ @Test
+ public void orRange()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 300,
+ 400,
+ new Path("/tmp")
+ );
+
+
+ KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 3,
+ 600,
+ new Path("/tmp")
+ );
+
+
+ Assert.assertEquals(
+ kafkaPullerInputSplit2,
+ KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit2)
+ );
+
+ KafkaPullerInputSplit kafkaPullerInputSplit3 = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 700,
+ 6000,
+ new Path("/tmp")
+ );
+
+
+ Assert.assertEquals(new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 300,
+ 6000,
+ new Path("/tmp")
+ ), KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit3));
+ }
+
+
+ @Test
+ public void copyOf()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 300,
+ 400,
+ new Path("/tmp")
+ );
+
+ KafkaPullerInputSplit copyOf = KafkaPullerInputSplit.copyOf(kafkaPullerInputSplit);
+ Assert.assertEquals(kafkaPullerInputSplit, copyOf);
+ Assert.assertTrue(kafkaPullerInputSplit != copyOf);
+ }
+
+ @Test
+ public void TestClone()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 300,
+ 400,
+ new Path("/tmp")
+ );
+
+ KafkaPullerInputSplit clone = kafkaPullerInputSplit.clone();
+ Assert.assertEquals(kafkaPullerInputSplit, clone);
+ Assert.assertTrue(clone != kafkaPullerInputSplit);
+
+ }
+
+ @Test
+ public void testSlice()
+ {
+ KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(
+ "test-topic",
+ 2,
+ 300,
+ 400,
+ new Path("/tmp")
+ );
+ List kafkaPullerInputSplitList = KafkaPullerInputSplit.slice(14, kafkaPullerInputSplit);
+ Assert.assertEquals(
+ kafkaPullerInputSplitList.stream()
+ .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
+ - kafkaPullerInputSplit1.getStartOffset())
+ .sum(),
+ kafkaPullerInputSplit.getEndOffset() - kafkaPullerInputSplit.getStartOffset()
+ );
+ Assert.assertTrue(kafkaPullerInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getStartOffset()
+ == kafkaPullerInputSplit1.getStartOffset())
+ .count() == 1);
+ Assert.assertTrue(kafkaPullerInputSplitList.stream()
+ .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getEndOffset()
+ == kafkaPullerInputSplit1.getEndOffset())
+ .count() == 1);
+
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
new file mode 100644
index 0000000000..bc3141c718
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableList;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class KafkaRecordIteratorTest
+{
+ private static final Logger log = LoggerFactory.getLogger(KafkaRecordIteratorTest.class);
+ private static final String topic = "my_topic2";
+ private static final List> RECORDS = new ArrayList();
+ private static final int RECORD_NUMBER = 100;
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition("my_topic2", 0);
+ public static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+ private static ZkUtils zkUtils;
+ private static ZkClient zkClient;
+ private static KafkaProducer producer;
+ private static KafkaServer kafkaServer;
+ private static String zkConnect;
+ private KafkaConsumer consumer = null;
+ private KafkaRecordIterator kafkaRecordIterator = null;
+ private Configuration conf = new Configuration();
+
+
+ public KafkaRecordIteratorTest()
+ {
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws IOException, InterruptedException
+ {
+ log.info("init embedded Zookeeper");
+ EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
+ zkConnect = "127.0.0.1:" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+ log.info("init kafka broker");
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
+ brokerProps.setProperty("offsets.topic.replication.factor", "1");
+ brokerProps.setProperty("log.retention.ms", "1000000");
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ log.info("Creating kafka topic [{}]", "my_topic2");
+ AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ setupProducer();
+ sendData();
+ }
+
+ @Before
+ public void setUp()
+ {
+ log.info("setting up consumer");
+ this.setupConsumer();
+ this.kafkaRecordIterator = null;
+ }
+
+ @Test
+ public void testHasNextAbsoluteStartEnd()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, (long) RECORDS.size(), 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextGivenStartEnd()
+ {
+ long startOffset = 2L;
+ long lastOffset = 4L;
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 2L, 4L, 100L);
+ this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> {
+ return consumerRecord.offset() >= 2L && consumerRecord.offset() < 4L;
+ }).collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextNoOffsets()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextLastRecord()
+ {
+ long startOffset = (long) (RECORDS.size() - 1);
+ long lastOffset = (long) RECORDS.size();
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, startOffset, lastOffset, 100L);
+ this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> {
+ return consumerRecord.offset() >= startOffset && consumerRecord.offset() < lastOffset;
+ }).collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextFirstRecord()
+ {
+ long startOffset = 0L;
+ long lastOffset = 1L;
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 1L, 100L);
+ this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> {
+ return consumerRecord.offset() >= 0L && consumerRecord.offset() < 1L;
+ }).collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextNoStart()
+ {
+ long startOffset = 0L;
+ long lastOffset = 10L;
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, (Long) null, 10L, 100L);
+ this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> {
+ return consumerRecord.offset() >= 0L && consumerRecord.offset() < 10L;
+ }).collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testHasNextNoEnd()
+ {
+ long startOffset = 5L;
+ long lastOffset = (long) RECORDS.size();
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 5L, (Long) null, 100L);
+ this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> {
+ return consumerRecord.offset() >= 5L && consumerRecord.offset() < lastOffset;
+ }).collect(Collectors.toList()), this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testRecordReader() throws IOException, InterruptedException
+ {
+ InputSplit inputSplits = new KafkaPullerInputSplit("my_topic2", 0, 0L, 50L, null);
+ KafkaPullerRecordReader recordReader = new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplits, this.conf);
+ List serRecords = (List) RECORDS.stream().map((recordx) -> {
+ return KafkaRecordWritable.fromKafkaRecord(recordx);
+ }).collect(Collectors.toList());
+
+ for (int i = 0; i < 50; ++i) {
+ KafkaRecordWritable record = new KafkaRecordWritable();
+ Assert.assertTrue(recordReader.next((NullWritable) null, record));
+ Assert.assertEquals(serRecords.get(i), record);
+ }
+
+ recordReader.close();
+ recordReader = new KafkaPullerRecordReader();
+ TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID());
+ recordReader.initialize(new KafkaPullerInputSplit("my_topic2", 0, 50L, 100L, null), context);
+
+ for (int i = 50; i < 100; ++i) {
+ KafkaRecordWritable record = new KafkaRecordWritable();
+ Assert.assertTrue(recordReader.next((NullWritable) null, record));
+ Assert.assertEquals(serRecords.get(i), record);
+ }
+
+ recordReader.close();
+ }
+
+ @Test(
+ expected = IllegalStateException.class
+ )
+ public void testPullingBeyondLimit()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(
+ this.consumer,
+ TOPIC_PARTITION,
+ 0L,
+ (long) RECORDS.size() + 1L,
+ 100L
+ );
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(
+ expected = IllegalStateException.class
+ )
+ public void testPullingStartGreaterThanEnd()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 1L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(
+ expected = IllegalStateException.class
+ )
+ public void testPullingFromEmptyTopic()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 0), 0L, 100L, 100L);
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test(
+ expected = IllegalStateException.class
+ )
+ public void testPullingFromEmptyPartition()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(
+ this.consumer,
+ new TopicPartition("my_topic2", 1),
+ 0L,
+ 100L,
+ 100L
+ );
+ this.compareIterator(RECORDS, this.kafkaRecordIterator);
+ }
+
+ @Test
+ public void testStartIsEqualEnd()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 10L, 100L);
+ this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator);
+ }
+
+
+ @Test
+ public void testStartIsTheLastOffset()
+ {
+ this.kafkaRecordIterator = new KafkaRecordIterator(
+ this.consumer,
+ TOPIC_PARTITION,
+ new Long(RECORD_NUMBER),
+ new Long(RECORD_NUMBER),
+ 100L
+ );
+ this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator);
+ }
+
+ private void compareIterator(
+ List> expected,
+ Iterator> kafkaRecordIterator
+ )
+ {
+ expected.stream().forEachOrdered((expectedRecord) -> {
+ Assert.assertTrue("record with offset " + expectedRecord.offset(), kafkaRecordIterator.hasNext());
+ ConsumerRecord record = kafkaRecordIterator.next();
+ Assert.assertTrue(record.topic().equals(topic));
+ Assert.assertTrue(record.partition() == 0);
+ Assert.assertEquals("Offsets not matching", expectedRecord.offset(), record.offset());
+ byte[] binaryExceptedValue = expectedRecord.value();
+ byte[] binaryExceptedKey = expectedRecord.key();
+ byte[] binaryValue = (byte[]) record.value();
+ byte[] binaryKey = (byte[]) record.key();
+ Assert.assertArrayEquals(binaryExceptedValue, binaryValue);
+ Assert.assertArrayEquals(binaryExceptedKey, binaryKey);
+ });
+ Assert.assertFalse(kafkaRecordIterator.hasNext());
+ }
+
+ private static void setupProducer()
+ {
+ log.info("Setting up kafka producer");
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
+ producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.setProperty("max.block.ms", "10000");
+ producer = new KafkaProducer(producerProps);
+ log.info("kafka producer started");
+ }
+
+ private void setupConsumer()
+ {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("enable.auto.commit", "false");
+ consumerProps.setProperty("auto.offset.reset", "none");
+ consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092");
+ this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092");
+ consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+ consumerProps.setProperty("request.timeout.ms", "3002");
+ consumerProps.setProperty("fetch.max.wait.ms", "3001");
+ consumerProps.setProperty("session.timeout.ms", "3001");
+ consumerProps.setProperty("metadata.max.age.ms", "100");
+ this.consumer = new KafkaConsumer(consumerProps);
+ }
+
+ private static void sendData() throws InterruptedException
+ {
+ log.info("Sending {} records", RECORD_NUMBER);
+ RECORDS.clear();
+ for (int i = 0; i < RECORD_NUMBER; ++i) {
+
+ final byte[] value = ("VALUE-" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"));
+ //noinspection unchecked
+ producer.send(new ProducerRecord(
+ topic,
+ 0,
+ 0L,
+ KEY_BYTES,
+ value
+ ));
+
+ //noinspection unchecked
+ RECORDS.add(new ConsumerRecord(
+ topic,
+ 0,
+ (long) i,
+ 0L,
+ (TimestampType) null,
+ 0L,
+ 0,
+ 0,
+ KEY_BYTES,
+ value
+ ));
+ }
+
+ producer.close();
+ }
+
+ @After
+ public void tearDown()
+ {
+ this.kafkaRecordIterator = null;
+ if (this.consumer != null) {
+ this.consumer.close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownCluster()
+ {
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ kafkaServer.awaitShutdown();
+ }
+
+ zkClient.close();
+ zkUtils.close();
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
new file mode 100644
index 0000000000..1327eb3311
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class KafkaRecordWritableTest {
+ public KafkaRecordWritableTest() {
+ }
+
+ @Test
+ public void testWriteReadFields() throws IOException {
+ ConsumerRecord record = new ConsumerRecord("topic", 0, 3L, "key".getBytes(), "value".getBytes());
+ KafkaRecordWritable kafkaRecordWritable = KafkaRecordWritable.fromKafkaRecord(record);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream w = new DataOutputStream(baos);
+ kafkaRecordWritable.write(w);
+ w.flush();
+
+ ByteArrayInputStream input = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream inputStream = new DataInputStream(input);
+ KafkaRecordWritable actualKafkaRecordWritable = new KafkaRecordWritable();
+ actualKafkaRecordWritable.readFields(inputStream);
+ Assert.assertEquals(kafkaRecordWritable, actualKafkaRecordWritable);
+ }
+}
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
new file mode 100644
index 0000000000..232bfcbb96
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertNotNull;
+
+public class KafkaScanTrimmerTest
+{
+ private static final Path PATH = new Path("/tmp");
+
+ private ExprNodeDesc zeroInt = ConstantExprBuilder.build(0);
+ private ExprNodeDesc threeInt = ConstantExprBuilder.build(3);
+ private ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L);
+ private ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L);
+ private ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L);
+ private ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L);
+
+ private ExprNodeDesc partitionColumn = new ExprNodeColumnDesc(
+ TypeInfoFactory.intTypeInfo,
+ KafkaStorageHandler.__PARTITION,
+ null,
+ false
+ );
+ private ExprNodeDesc offsetColumn = new ExprNodeColumnDesc(
+ TypeInfoFactory.longTypeInfo,
+ KafkaStorageHandler.__OFFSET,
+ null,
+ false
+ );
+ /*private ExprNodeDesc timestampColumn = new ExprNodeColumnDesc(
+ TypeInfoFactory.longTypeInfo,
+ KafkaJsonSerDe.__TIMESTAMP,
+ null,
+ false
+ );*/
+
+ private String topic = "my_topic";
+ private Map fullHouse = ImmutableMap.of(
+ new TopicPartition(topic, 0),
+ new KafkaPullerInputSplit(
+ topic,
+ 0,
+ 0,
+ 45,
+ PATH
+ ),
+ new TopicPartition(topic, 1),
+ new KafkaPullerInputSplit(
+ topic,
+ 1,
+ 5,
+ 1005,
+ PATH
+ ),
+ new TopicPartition(topic, 2),
+ new KafkaPullerInputSplit(
+ topic,
+ 2,
+ 9,
+ 100,
+ PATH
+ ),
+ new TopicPartition(topic, 3),
+ new KafkaPullerInputSplit(
+ topic,
+ 3,
+ 0,
+ 100,
+ PATH
+ )
+ );
+
+ @Test
+ public void computeOptimizedScanPartitionBinaryOpFilter()
+ {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ int partitionId = 2;
+ ExprNodeDesc constant = ConstantExprBuilder.build(partitionId);
+ final List children = Lists.newArrayList(partitionColumn, constant);
+
+ ExprNodeGenericFuncDesc node = EQ(children);
+ assertNotNull(node);
+
+ Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(node)));
+ Map expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() == partitionId);
+ Assert.assertEquals(expected, actual);
+
+ ExprNodeGenericFuncDesc lessNode = LESS_THAN(children);
+ assertNotNull(lessNode);
+ actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(SerializationUtilities.serializeExpression(
+ lessNode)));
+ expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() < partitionId);
+ Assert.assertEquals(expected, actual);
+
+
+ ExprNodeGenericFuncDesc lessEqNode = LESS_THAN_EQ(children);
+
+ assertNotNull(lessEqNode);
+ actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(SerializationUtilities.serializeExpression(
+ lessEqNode)));
+ expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() <= partitionId);
+ Assert.assertEquals(expected, actual);
+
+ }
+
+
+ @Test
+ public void computeOptimizedScanFalseFilter()
+ {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ ExprNodeGenericFuncDesc falseFilter = AND(Lists.newArrayList(
+ EQ(Lists.newArrayList(partitionColumn, zeroInt)),
+ EQ(Lists.newArrayList(partitionColumn, threeInt))
+ ));
+
+ assertNotNull(falseFilter);
+ Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(falseFilter)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc falseFilter2 = AND(Lists.newArrayList(
+ EQ(Lists.newArrayList(offsetColumn, thirtyFiveLong)),
+ EQ(Lists.newArrayList(offsetColumn, fortyLong))
+ ));
+
+ assertNotNull(falseFilter2);
+ actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(falseFilter2)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc filter3 = OR(Lists.newArrayList(falseFilter, falseFilter2));
+
+ assertNotNull(filter3);
+ actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(filter3)));
+ Assert.assertTrue(actual.isEmpty());
+
+ ExprNodeGenericFuncDesc filter4 = AND(Lists.newArrayList(
+ filter3,
+ EQ(Lists.newArrayList(partitionColumn, zeroInt))
+ ));
+ assertNotNull(filter4);
+ actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(filter4)));
+ Assert.assertTrue(actual.isEmpty());
+ }
+
+ @Test
+ public void computeOptimizedScanOrAndCombinedFilter()
+ {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+ // partition = 0 and 30 <= offset < 35 or partition = 3 and 35 <= offset < 75 or (partition = 0 and offset = 40)
+
+
+ ExprNodeGenericFuncDesc part1 = AND(Lists.newArrayList(
+ GREATER_THAN_EQ(Lists.newArrayList(offsetColumn, thirtyLong)),
+ EQ(Lists.newArrayList(partitionColumn, zeroInt)),
+ LESS_THAN(Lists.newArrayList(offsetColumn, thirtyFiveLong))
+ ));
+
+ ExprNodeGenericFuncDesc part2 = AND(Lists.newArrayList(
+ GREATER_THAN_EQ(Lists.newArrayList(offsetColumn, thirtyFiveLong)),
+ EQ(Lists.newArrayList(partitionColumn, threeInt)),
+ LESS_THAN(Lists.newArrayList(offsetColumn, seventyFiveLong))
+ ));
+
+ ExprNodeGenericFuncDesc part3 = AND(Lists.newArrayList(
+ EQ(Lists.newArrayList(offsetColumn, fortyLong)),
+ EQ(Lists.newArrayList(partitionColumn, zeroInt))
+ ));
+
+ ExprNodeGenericFuncDesc orExpression = OR(Lists.newArrayList(part1, part2, part3));
+
+ assertNotNull(orExpression);
+ Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(
+ orExpression)));
+ TopicPartition tpZero = new TopicPartition(topic, 0);
+ TopicPartition toThree = new TopicPartition(topic, 3);
+ KafkaPullerInputSplit split1 = new KafkaPullerInputSplit(topic, 0, 30, 41, PATH);
+ KafkaPullerInputSplit split2 = new KafkaPullerInputSplit(topic, 3, 35, 75, PATH);
+
+ Map expected = ImmutableMap.of(tpZero, split1, toThree, split2);
+ Assert.assertEquals(expected, actual);
+
+
+ }
+
+ @Test
+ public void computeOptimizedScanPartitionOrAndCombinedFilter()
+ {
+ KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null);
+
+ // partition = 1 or (partition >2 and <= 3)
+ ExprNodeGenericFuncDesc eq = EQ(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(1)));
+ ExprNodeGenericFuncDesc lessEq = LESS_THAN_EQ(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(3)));
+ ExprNodeGenericFuncDesc greater = GREATER_THAN(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(2)));
+ ExprNodeGenericFuncDesc orNode = OR(Lists.newArrayList(AND(Lists.newArrayList(lessEq, greater)), eq));
+
+ Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(
+ SerializationUtilities.serializeExpression(orNode)));
+ Map expected = Maps.filterValues(
+ fullHouse,
+ tp -> Objects.requireNonNull(tp).getPartition() == 1 || tp.getPartition() == 3
+ );
+ Assert.assertEquals(expected, actual);
+ assertNotNull(orNode);
+ }
+
+
+ @Test
+ public void buildScanFormPartitionPredicateEq()
+ {
+ Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.EQUALS,
+ 3,
+ false,
+ false
+ );
+ TopicPartition topicPartition = new TopicPartition(topic, 3);
+ Assert.assertEquals(fullHouse.get(topicPartition), actual.get(topicPartition));
+ }
+
+ @Test
+ public void buildScanFormPartitionPredicateLess()
+ {
+ // partitionConst < partitionColumn (flip true)
+ int partitionConst = 2;
+ Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ true,
+ false
+ );
+
+ Map expected = Maps.filterEntries(
+ fullHouse,
+ entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionConst >= partitionColumn (flip true, negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ true,
+ true
+ );
+
+ expected = Maps.filterEntries(
+ fullHouse,
+ entry -> partitionConst >= Objects.requireNonNull(entry).getKey().partition()
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn >= partitionConst (negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ false,
+ true
+ );
+
+ expected = Maps.filterEntries(
+ fullHouse,
+ entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn < partitionConst (negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ partitionConst,
+ false,
+ false
+ );
+
+ expected = Maps.filterEntries(
+ fullHouse,
+ entry -> Objects.requireNonNull(entry).getKey().partition() < partitionConst
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+ }
+
+ @Test
+ public void buildScanFormPartitionPredicateLessEq()
+ {
+ // partitionConst <= partitionColumn (flip true)
+ int partitionConst = 2;
+ Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ true,
+ false
+ );
+
+ Map expected = Maps.filterEntries(
+ fullHouse,
+ entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionConst > partitionColumn (flip true, negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ true,
+ true
+ );
+
+ expected = Maps.filterEntries(
+ fullHouse,
+ entry -> partitionConst > Objects.requireNonNull(entry).getKey().partition()
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+
+ // partitionColumn > partitionConst (negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ false,
+ true
+ );
+
+ expected = Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst);
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+
+ // partitionColumn <= partitionConst (negation true)
+ actual = KafkaScanTrimmer.buildScanFormPartitionPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ partitionConst,
+ false,
+ false
+ );
+
+ expected = Maps.filterEntries(
+ fullHouse,
+ entry -> Objects.requireNonNull(entry).getKey().partition() <= partitionConst
+ );
+ Assert.assertEquals(expected, actual);
+ Assert.assertFalse(actual.isEmpty());
+ }
+
+
+ @Test
+ public void buildScanFromOffsetPredicateEq()
+ {
+ long constantOffset = 30;
+ Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.EQUALS,
+ constantOffset,
+ false,
+ false
+ );
+ Map expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ constantOffset,
+ constantOffset + 1,
+ entry.getPath()
+ )
+ );
+
+ Assert.assertEquals(expected, actual);
+
+ // seek to end if offset is out of reach
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.EQUALS,
+ 3000000L,
+ false,
+ false
+ );
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getEndOffset(),
+ entry.getEndOffset(),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ // seek to end if offset is out of reach
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.EQUALS,
+ 0L,
+ false,
+ false
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset() > 0 ? entry.getEndOffset() : 0,
+ entry.getStartOffset() > 0 ? entry.getEndOffset() : 1,
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+
+ }
+
+ @Test
+ public void buildScanFromOffsetPredicateLess()
+ {
+ long constantOffset = 50;
+ // columnOffset < constant
+ Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ false,
+ false
+ );
+
+ Map expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset, entry.getEndOffset()),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+
+ // columnOffset > constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ true,
+ false
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)),
+ entry.getEndOffset(),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset >= constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ false,
+ true
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)),
+ entry.getEndOffset(),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+
+// columnOffset <= constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN,
+ constantOffset,
+ true,
+ true
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset + 1, entry.getEndOffset()),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ }
+
+ @Test
+ public void buildScanFromOffsetPredicateLessEq()
+ {
+ long constantOffset = 50;
+ // columnOffset < constant
+ Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ false,
+ false
+ );
+
+ Map expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset + 1, entry.getEndOffset()),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset >= constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ true,
+ false
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)),
+ entry.getEndOffset(),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset > constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ false,
+ true
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)),
+ entry.getEndOffset(),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+
+ // columnOffset < constant
+ actual = KafkaScanTrimmer.buildScanFromOffsetPredicate(
+ fullHouse,
+ PredicateLeaf.Operator.LESS_THAN_EQUALS,
+ constantOffset,
+ true,
+ true
+ );
+
+ expected = Maps.transformValues(
+ fullHouse,
+ entry -> new KafkaPullerInputSplit(
+ Objects.requireNonNull(entry).getTopic(),
+ entry.getPartition(),
+ entry.getStartOffset(),
+ Math.min(constantOffset, entry.getEndOffset()),
+ entry.getPath()
+ )
+ );
+ Assert.assertEquals(expected, actual);
+ }
+
+ private static class ConstantExprBuilder
+ {
+ static ExprNodeDesc build(long constant)
+ {
+ return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant);
+ }
+
+ static ExprNodeDesc build(int constant)
+ {
+ return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant);
+ }
+ }
+
+
+ private static ExprNodeGenericFuncDesc OR(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ TypeInfoFactory.booleanTypeInfo,
+ new GenericUDFOPOr(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc AND(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ TypeInfoFactory.booleanTypeInfo,
+ new GenericUDFOPAnd(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc EQ(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ children.get(0).getTypeInfo(),
+ new GenericUDFOPEqual(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc LESS_THAN(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ children.get(0).getTypeInfo(),
+ new GenericUDFOPLessThan(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc LESS_THAN_EQ(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ children.get(0).getTypeInfo(),
+ new GenericUDFOPEqualOrLessThan(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc GREATER_THAN(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ children.get(0).getTypeInfo(),
+ new GenericUDFOPGreaterThan(),
+ children
+ );
+ }
+
+ private static ExprNodeGenericFuncDesc GREATER_THAN_EQ(List children)
+ {
+ return new ExprNodeGenericFuncDesc(
+ children.get(0).getTypeInfo(),
+ new GenericUDFOPEqualOrGreaterThan(),
+ children
+ );
+ }
+}
\ No newline at end of file
diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
new file mode 100644
index 0000000000..9b107f137f
--- /dev/null
+++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class KafkaStreamingUtilsTest {
+ public KafkaStreamingUtilsTest() {
+ }
+
+ @Test
+ public void testConsumerProperties() {
+ Configuration configuration = new Configuration();
+ configuration.set("kafka.bootstrap.servers", "localhost:9090");
+ configuration.set("kafka.consumer.fetch.max.wait.ms", "40");
+ configuration.set("kafka.consumer.my.new.wait.ms", "400");
+ Properties properties = KafkaStreamingUtils.consumerProperties(configuration);
+ Assert.assertEquals("localhost:9090", properties.getProperty("bootstrap.servers"));
+ Assert.assertEquals("40", properties.getProperty("fetch.max.wait.ms"));
+ Assert.assertEquals("400", properties.getProperty("my.new.wait.ms"));
+ }
+}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 0f1d5eea4c..ae1de26adc 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -91,7 +91,7 @@
private static final String[] DEFAULT_AUX_CLASSES = new String[] {
"org.apache.hive.hcatalog.data.JsonSerDe","org.apache.hadoop.hive.druid.DruidStorageHandler",
"org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
- "org.apache.commons.pool.impl.GenericObjectPool"
+ "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler"
};
private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe";
private static final String[] NEEDED_CONFIGS = LlapDaemonConfiguration.DAEMON_CONFIGS;
diff --git packaging/pom.xml packaging/pom.xml
index 5c859acfad..07f2382b03 100644
--- packaging/pom.xml
+++ packaging/pom.xml
@@ -213,6 +213,11 @@
hive-druid-handler
${project.version}