diff --git itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java index c9339b565e..3f2c9a7b34 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -10,6 +10,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -26,6 +27,10 @@ import java.nio.charset.Charset; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; /** @@ -100,7 +105,7 @@ public void createTopicWithData(String topicName, File datafile){ )){ List events = Files.readLines(datafile, Charset.forName("UTF-8")); for(String event : events){ - producer.send(new ProducerRecord<>(topicName, event)); + producer.send(new ProducerRecord<>(topicName, "key", event)); } } catch (IOException e) { Throwables.propagate(e); diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java index e7ea53f4bc..a0c79b3a9f 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -40,11 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.slf4j.Logger; @@ -56,6 +51,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -64,21 +60,6 @@ */ public class GenericKafkaSerDe extends AbstractSerDe { private static final Logger LOG = LoggerFactory.getLogger(GenericKafkaSerDe.class); - // ORDER of fields and types matters here - private static final ImmutableList - METADATA_COLUMN_NAMES = - ImmutableList.of(KafkaStreamingUtils.PARTITION_COLUMN, - KafkaStreamingUtils.OFFSET_COLUMN, - KafkaStreamingUtils.TIMESTAMP_COLUMN, - KafkaStreamingUtils.START_OFFSET_COLUMN, - KafkaStreamingUtils.END_OFFSET_COLUMN); - private static final ImmutableList - METADATA_PRIMITIVE_TYPE_INFO = - ImmutableList.of(TypeInfoFactory.intTypeInfo, - TypeInfoFactory.longTypeInfo, - TypeInfoFactory.longTypeInfo, - TypeInfoFactory.longTypeInfo, - TypeInfoFactory.longTypeInfo); private AbstractSerDe delegateSerDe; private ObjectInspector objectInspector; @@ -106,16 +87,14 @@ .stream() .map(StructField::getFieldName) .collect(Collectors.toList())); - columnNames.addAll(METADATA_COLUMN_NAMES); + columnNames.addAll(KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES); final List inspectors = new ArrayList<>(columnNames.size()); inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs() .stream() .map(StructField::getFieldObjectInspector) .collect(Collectors.toList())); - inspectors.addAll(METADATA_PRIMITIVE_TYPE_INFO.stream() - .map(KafkaJsonSerDe.typeInfoToObjectInspector) - .collect(Collectors.toList())); + inspectors.addAll(KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS); objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); // lazy supplier to read Avro Records if needed @@ -159,20 +138,11 @@ } return columnNames.stream().map(name -> { - switch (name) { - case KafkaStreamingUtils.PARTITION_COLUMN: - return new IntWritable(record.getPartition()); - case KafkaStreamingUtils.OFFSET_COLUMN: - return new LongWritable(record.getOffset()); - case KafkaStreamingUtils.TIMESTAMP_COLUMN: - return new LongWritable(record.getTimestamp()); - case KafkaStreamingUtils.START_OFFSET_COLUMN: - return new LongWritable(record.getStartOffset()); - case KafkaStreamingUtils.END_OFFSET_COLUMN: - return new LongWritable(record.getEndOffset()); - default: - return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name)); + Function metaColumnMapper = KafkaStreamingUtils.recordWritableFnMap.get(name); + if (metaColumnMapper != null) { + return metaColumnMapper.apply(record); } + return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name)); }).collect(Collectors.toList()); } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java index f383190083..5f0143de09 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java @@ -80,11 +80,13 @@ */ public class KafkaJsonSerDe extends AbstractSerDe { private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonSerDe.class); - private static final DateTimeFormatter TS_PARSER = createAutoParser(); - static Function - typeInfoToObjectInspector = typeInfo -> - PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName())); + private static final ThreadLocal + TS_PARSER = + ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser); + private static final Function + typeInfoToObjectInspector = + typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo( + typeInfo.getTypeName())); private List columnNames; private List columnTypes; private ObjectInspector inspector; @@ -176,11 +178,11 @@ private Object parseAsPrimitive(JsonNode value, TypeInfo typeInfo) throws SerDeE switch (TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()).getPrimitiveCategory()) { case TIMESTAMP: TimestampWritable timestampWritable = new TimestampWritable(); - timestampWritable.setTime(TS_PARSER.parseMillis(value.textValue())); + timestampWritable.setTime(TS_PARSER.get().parseMillis(value.textValue())); return timestampWritable; case TIMESTAMPLOCALTZ: - final long numberOfMillis = TS_PARSER.parseMillis(value.textValue()); + final long numberOfMillis = TS_PARSER.get().parseMillis(value.textValue()); return new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(numberOfMillis), ((TimestampLocalTZTypeInfo) typeInfo).timeZone()))); @@ -234,8 +236,7 @@ private static DateTimeFormatter createAutoParser() { DateTimeParser timeOrOffset = new DateTimeFormatterBuilder().append(null, - new DateTimeParser[] { - new DateTimeFormatterBuilder().appendLiteral('T').toParser(), + new DateTimeParser[] { new DateTimeFormatterBuilder().appendLiteral('T').toParser(), new DateTimeFormatterBuilder().appendLiteral(' ').toParser() }) .appendOptional(ISODateTimeFormat.timeElementParser().getParser()) .appendOptional(offsetElement.getParser()) diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java index 908ee5e29f..4f0ee94655 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.kafka; import com.google.common.base.Preconditions; -import com.google.common.io.Closer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordReader; @@ -31,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Iterator; import java.util.Properties; @@ -43,7 +41,6 @@ private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerRecordReader.class); - private final Closer closer = Closer.create(); private KafkaConsumer consumer = null; private Configuration config = null; private KafkaRecordWritable currentWritableValue; @@ -67,7 +64,6 @@ private void initConsumer() { Preconditions.checkNotNull(brokerString, "broker end point can not be null"); LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString); consumer = new KafkaConsumer<>(properties); - closer.register(consumer); } } @@ -154,11 +150,11 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura return consumedRecords * 1.0f / totalNumberRecords; } - @Override public void close() throws IOException { + @Override public void close() { LOG.trace("total read bytes [{}]", readBytes); if (consumer != null) { consumer.wakeup(); + consumer.close(); } - closer.close(); } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java index c6924ea480..1b00f8549f 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java @@ -21,6 +21,7 @@ import org.apache.hadoop.io.Writable; import org.apache.kafka.clients.consumer.ConsumerRecord; +import javax.annotation.Nullable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -31,7 +32,8 @@ * Writable implementation of Kafka ConsumerRecord. * Serialized in the form * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) | - * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) | {@code value} (byte []) + * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) | + * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])} */ public class KafkaRecordWritable implements Writable { @@ -60,25 +62,34 @@ */ private byte[] value; + /** + * Record key content or null + */ + private byte[] recordKey; + + void set(ConsumerRecord consumerRecord, long startOffset, long endOffset) { this.partition = consumerRecord.partition(); this.timestamp = consumerRecord.timestamp(); this.offset = consumerRecord.offset(); this.value = consumerRecord.value(); + this.recordKey = consumerRecord.key(); this.startOffset = startOffset; this.endOffset = endOffset; } KafkaRecordWritable(int partition, - long offset, - long timestamp, - byte[] value, - long startOffset, - long endOffset) { + long offset, + long timestamp, + byte[] value, + long startOffset, + long endOffset, + @Nullable byte[] recordKey) { this.partition = partition; this.offset = offset; this.timestamp = timestamp; this.value = value; + this.recordKey = recordKey; this.startOffset = startOffset; this.endOffset = endOffset; } @@ -94,6 +105,12 @@ void set(ConsumerRecord consumerRecord, long startOffset, long e dataOutput.writeLong(endOffset); dataOutput.writeInt(value.length); dataOutput.write(value); + if (recordKey != null) { + dataOutput.writeInt(recordKey.length); + dataOutput.write(recordKey); + } else { + dataOutput.writeInt(-1); + } } @Override public void readFields(DataInput dataInput) throws IOException { @@ -102,13 +119,20 @@ void set(ConsumerRecord consumerRecord, long startOffset, long e offset = dataInput.readLong(); startOffset = dataInput.readLong(); endOffset = dataInput.readLong(); - int size = dataInput.readInt(); - if (size > 0) { - value = new byte[size]; + int dataSize = dataInput.readInt(); + if (dataSize > 0) { + value = new byte[dataSize]; dataInput.readFully(value); } else { value = new byte[0]; } + int keyArraySize = dataInput.readInt(); + if (keyArraySize > -1) { + recordKey = new byte[keyArraySize]; + dataInput.readFully(recordKey); + } else { + recordKey = null; + } } int getPartition() { @@ -135,6 +159,11 @@ long getEndOffset() { return endOffset; } + @Nullable + byte[] getRecordKey() { + return recordKey; + } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -148,12 +177,14 @@ long getEndOffset() { && startOffset == writable.startOffset && endOffset == writable.endOffset && timestamp == writable.timestamp - && Arrays.equals(value, writable.value); + && Arrays.equals(value, writable.value) + && Arrays.equals(recordKey, writable.recordKey); } @Override public int hashCode() { int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp); result = 31 * result + Arrays.hashCode(value); + result = 31 * result + Arrays.hashCode(recordKey); return result; } @@ -171,6 +202,8 @@ long getEndOffset() { + timestamp + ", value=" + Arrays.toString(value) + + ", recordKey=" + + Arrays.toString(recordKey) + '}'; } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java index 76415151ec..8fbdfdab11 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java @@ -89,7 +89,7 @@ if (LOG.isDebugEnabled()) { if (optimizedScan != null) { LOG.debug("Optimized scan:"); - optimizedScan.forEach((tp, input) -> LOG.info( + optimizedScan.forEach((tp, input) -> LOG.debug( "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]", tp.topic(), tp.partition(), @@ -97,7 +97,7 @@ input.getEndOffset())); } else { LOG.debug("No optimization thus using full scan "); - fullHouse.forEach((tp, input) -> LOG.info( + fullHouse.forEach((tp, input) -> LOG.debug( "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]", tp.topic(), tp.partition(), @@ -193,7 +193,7 @@ } - if (columnDesc.getColumn().equals(KafkaStreamingUtils.PARTITION_COLUMN)) { + if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.PARTITION.getName())) { return buildScanFromPartitionPredicate(fullHouse, operator, ((Number) constantDesc.getValue()).intValue(), @@ -201,7 +201,7 @@ negation); } - if (columnDesc.getColumn().equals(KafkaStreamingUtils.OFFSET_COLUMN)) { + if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.OFFSET.getName())) { return buildScanFromOffsetPredicate(fullHouse, operator, ((Number) constantDesc.getValue()).longValue(), @@ -209,7 +209,7 @@ negation); } - if (columnDesc.getColumn().equals(KafkaStreamingUtils.TIMESTAMP_COLUMN)) { + if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.TIMESTAMP.getName())) { long timestamp = ((Number) constantDesc.getValue()).longValue(); //noinspection unchecked return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer); @@ -280,7 +280,8 @@ * * @return optimized kafka scan */ - @VisibleForTesting static Map buildScanFromOffsetPredicate(Map fullScan, + @VisibleForTesting static Map buildScanFromOffsetPredicate(Map fullScan, PredicateLeaf.Operator operator, long offsetConst, boolean flip, diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java index 5847df5e7e..96222c934b 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hive.kafka; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -31,12 +32,16 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Properties; /** * Hive Kafka storage handler to allow user querying Stream of tuples from a Kafka queue. @@ -44,8 +49,9 @@ public class KafkaStorageHandler implements HiveStorageHandler { private static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHandler.class); + private static final String KAFKA_STORAGE_HANDLER = "org.apache.hadoop.hive.kafka.KafkaStorageHandler"; - Configuration configuration; + private Configuration configuration; @Override public Class getInputFormatClass() { return KafkaPullerInputFormat.class; @@ -63,22 +69,26 @@ return null; } - @Override public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + @Override public HiveAuthorizationProvider getAuthorizationProvider() { return new DefaultHiveAuthorizationProvider(); } @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { - jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, - Preconditions.checkNotNull(tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC), - "kafka topic missing set table property->" + KafkaStreamingUtils.HIVE_KAFKA_TOPIC)); - LOG.debug("Table properties: Kafka Topic {}", tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC)); - jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, - Preconditions.checkNotNull(tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS), - "Broker address missing set table property->" + KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS)); - LOG.debug("Table properties: Kafka broker {}", tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS)); + String topic = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, ""); + if (topic.isEmpty()) { + throw new IllegalArgumentException("Kafka topic missing set table property->" + + KafkaStreamingUtils.HIVE_KAFKA_TOPIC); + } + jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, topic); + String brokerString = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, ""); + if (brokerString.isEmpty()) { + throw new IllegalArgumentException("Broker address missing set table property->" + + KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS); + } + jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, brokerString); + jobProperties.put(KafkaStreamingUtils.SERDE_CLASS_NAME, tableDesc.getProperties().getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName())); - LOG.debug("Table properties: SerDe class name {}", jobProperties.get(KafkaStreamingUtils.SERDE_CLASS_NAME)); //set extra properties @@ -90,7 +100,9 @@ .toLowerCase() .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX)) .forEach(entry -> { - String key = entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1); + String + key = + entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1); if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) { throw new IllegalArgumentException("Not suppose to set Kafka Property " + key); } @@ -116,7 +128,7 @@ @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { Map properties = new HashMap<>(); configureInputJobProperties(tableDesc, properties); - properties.forEach((key, value) -> jobConf.set(key, value)); + properties.forEach(jobConf::set); try { KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class); } catch (IOException e) { @@ -133,6 +145,35 @@ } @Override public String toString() { - return "org.apache.hadoop.hive.kafka.KafkaStorageHandler"; + return KAFKA_STORAGE_HANDLER; + } + + @Override public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException { + String topic = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC); + if (topic == null || topic.isEmpty()) { + throw new MetaException("topic is null or empty"); + } + String brokers = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS); + if (brokers == null || brokers.isEmpty()) { + throw new MetaException("kafka brokers string is null or empty"); + } + final Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers); + table.getParameters() + .entrySet() + .stream() + .filter(objectObjectEntry -> objectObjectEntry.getKey() + .toLowerCase() + .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX)) + .forEach(entry -> { + String key = entry.getKey().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1); + if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) { + throw new IllegalArgumentException("Not suppose to set Kafka Property " + key); + } + properties.put(key, entry.getValue()); + }); + return new KafkaStorageHandlerInfo(topic, properties); } } diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java new file mode 100644 index 0000000000..2c7a0861d5 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandlerInfo.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * Kafka Storage Handler info. + */ +class KafkaStorageHandlerInfo implements StorageHandlerInfo { + private final String topic; + private final Properties consumerProperties; + + KafkaStorageHandlerInfo(String topic, Properties consumerProperties) { + this.topic = topic; + this.consumerProperties = consumerProperties; + } + + @Override public String formatAsText() { + + try (KafkaConsumer consumer = new KafkaConsumer(consumerProperties) { + }) { + //noinspection unchecked + List partitionsInfo = consumer.partitionsFor(topic); + List + topicPartitions = + partitionsInfo.stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + Map endOffsets = consumer.endOffsets(topicPartitions); + Map startOffsets = consumer.beginningOffsets(topicPartitions); + + return partitionsInfo.stream() + .map(partitionInfo -> String.format("%s [start offset = [%s], end offset = [%s]]", + partitionInfo.toString(), + startOffsets.get(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())), + endOffsets.get(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())))) + .collect(Collectors.joining("\n")); + } catch (Exception e) { + return String.format("ERROR fetching metadata for Topic [%s], Connection String [%s], Error [%s]", + topic, + consumerProperties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + e.getMessage()); + } + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java index d2d0ebc192..4802c4ec2a 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java @@ -19,11 +19,21 @@ package org.apache.hadoop.hive.kafka; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ReflectionUtil; import org.apache.kafka.clients.CommonClientConfigs; @@ -33,10 +43,12 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -45,7 +57,7 @@ final class KafkaStreamingUtils { /** - * MANDATORY Table property indicating kafka topic backing the table + * MANDATORY Table property indicating kafka topic backing the table. */ static final String HIVE_KAFKA_TOPIC = "kafka.topic"; /** @@ -58,47 +70,26 @@ static final String SERDE_CLASS_NAME = "kafka.serde.class"; /** * Table property indicating poll/fetch timeout period in millis. - * FYI this is independent from internal Kafka consumer timeouts, defaults to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS} + * FYI this is independent from internal Kafka consumer timeouts, defaults to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS}. */ static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms"; /** - * default poll timeout for fetching metadata and record batch + * Default poll timeout for fetching metadata and record batch. */ static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000L; // 5 seconds - /** - * Record Timestamp column name, added as extra meta column of type long - */ - static final String TIMESTAMP_COLUMN = "__timestamp"; - /** - * Record Kafka Partition column name added as extra meta column of type int - */ - static final String PARTITION_COLUMN = "__partition"; - /** - * Record offset column name added as extra metadata column to row as long - */ - static final String OFFSET_COLUMN = "__offset"; - - /** - * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner - */ - static final String START_OFFSET_COLUMN = "__start_offset"; - - /** - * End offset given by input split at run time - */ - static final String END_OFFSET_COLUMN = "__end_offset"; /** * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000" * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing */ static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer"; - /** - * Set of Kafka properties that the user can not set via DDLs + * Set of Kafka properties that the user can not set via DDLs. */ static final HashSet FORBIDDEN_PROPERTIES = new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); private KafkaStreamingUtils() { } @@ -172,4 +163,93 @@ static AbstractSerDe createDelegate(String className) { // we are not setting conf thus null is okay return ReflectionUtil.newInstance(clazz, null); } + + /** + * Basic Enum class for all the metadata columns appended to the Kafka row by the deserializer. + */ + enum MetadataColumn { + /** + * Record offset column name added as extra metadata column to row as long. + */ + OFFSET("__offset", TypeInfoFactory.longTypeInfo), + /** + * Record Kafka Partition column name added as extra meta column of type int. + */ + PARTITION("__partition", TypeInfoFactory.intTypeInfo), + /** + * Record Kafka key column name added as extra meta column of type binary blob. + */ + KEY("__key", TypeInfoFactory.binaryTypeInfo), + /** + * Record Timestamp column name, added as extra meta column of type long. + */ + TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo), + /** + * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner. + */ + START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo), + /** + * End offset given by input split at run time. + */ + END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo); + + private final String name; + private final TypeInfo typeInfo; + + MetadataColumn(String name, TypeInfo typeInfo) { + this.name = name; + this.typeInfo = typeInfo; + } + + public String getName() { + return name; + } + + public AbstractPrimitiveWritableObjectInspector getObjectInspector() { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo( + typeInfo.getTypeName())); + } + } + + //Order at which column and types will be appended to the original row. + /** + * Kafka metadata columns order list. + */ + private static final List KAFKA_METADATA_COLUMNS = + Arrays.asList(MetadataColumn.KEY, + MetadataColumn.PARTITION, + MetadataColumn.OFFSET, + MetadataColumn.TIMESTAMP, + MetadataColumn.START_OFFSET, + MetadataColumn.END_OFFSET); + + /** + * Kafka metadata column names. + */ + static final List KAFKA_METADATA_COLUMN_NAMES = KAFKA_METADATA_COLUMNS + .stream() + .map(MetadataColumn::getName) + .collect(Collectors.toList()); + + /** + * Kafka metadata column inspectors. + */ + static final List KAFKA_METADATA_INSPECTORS = KAFKA_METADATA_COLUMNS + .stream() + .map(MetadataColumn::getObjectInspector) + .collect(Collectors.toList()); + + /** + * Reverse lookup map used to convert records from kafka Writable to hive Writable based on Kafka semantic. + */ + static final Map> + recordWritableFnMap = ImmutableMap.>builder() + .put(MetadataColumn.END_OFFSET.getName(), (record) -> new LongWritable(record.getEndOffset())) + .put(MetadataColumn.KEY.getName(), + record -> record.getRecordKey() == null ? null : new BytesWritable(record.getRecordKey())) + .put(MetadataColumn.OFFSET.getName(), record -> new LongWritable(record.getOffset())) + .put(MetadataColumn.PARTITION.getName(), record -> new IntWritable(record.getPartition())) + .put(MetadataColumn.START_OFFSET.getName(), record -> new LongWritable(record.getStartOffset())) + .put(MetadataColumn.TIMESTAMP.getName(), record -> new LongWritable(record.getTimestamp())) + .build(); } diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java index be26986818..00f95ca329 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java @@ -33,11 +33,11 @@ * Kafka Hadoop InputSplit Test. */ public class KafkaPullerInputSplitTest { - private String topic = "my_topic"; private KafkaPullerInputSplit expectedInputSplit; public KafkaPullerInputSplitTest() { - this.expectedInputSplit = new KafkaPullerInputSplit(this.topic, 1, 50L, 56L, new Path("/tmp")); + String topic = "my_topic"; + this.expectedInputSplit = new KafkaPullerInputSplit(topic, 1, 50L, 56L, new Path("/tmp")); } @Test public void testWriteRead() throws IOException { diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java index 5de51cd00a..98a5568f91 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -30,8 +30,6 @@ import kafka.zk.EmbeddedZookeeper; import org.I0Itec.zkclient.ZkClient; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -69,22 +67,21 @@ private static final int RECORD_NUMBER = 100; private static final String TOPIC = "my_test_topic"; private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 0); - public static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8")); + private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8")); private static final List> RECORDS = IntStream.range(0, RECORD_NUMBER).mapToObj(number -> { final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8")); return new ConsumerRecord<>(TOPIC, 0, (long) number, 0L, null, 0L, 0, 0, KEY_BYTES, value); }).collect(Collectors.toList()); - public static final long POLL_TIMEOUT_MS = 900L; + private static final long POLL_TIMEOUT_MS = 900L; private static ZkUtils zkUtils; private static ZkClient zkClient; private static KafkaProducer producer; private static KafkaServer kafkaServer; - private static String zkConnect; private KafkaConsumer consumer = null; private KafkaRecordIterator kafkaRecordIterator = null; - private Configuration conf = new Configuration(); + private final Configuration conf = new Configuration(); private static EmbeddedZookeeper zkServer; public KafkaRecordIteratorTest() { @@ -93,7 +90,7 @@ public KafkaRecordIteratorTest() { @BeforeClass public static void setupCluster() throws IOException { LOG.info("init embedded Zookeeper"); zkServer = new EmbeddedZookeeper(); - zkConnect = "127.0.0.1:" + zkServer.port(); + String zkConnect = "127.0.0.1:" + zkServer.port(); zkClient = new ZkClient(zkConnect, 3000, 3000, ZKStringSerializer$.MODULE$); zkUtils = ZkUtils.apply(zkClient, false); LOG.info("init kafka broker"); @@ -174,12 +171,13 @@ public KafkaRecordIteratorTest() { List serRecords = RECORDS.stream() - .map((aRecord) -> new KafkaRecordWritable(aRecord.partition(), - aRecord.offset(), - aRecord.timestamp(), - aRecord.value(), + .map((consumerRecord) -> new KafkaRecordWritable(consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.timestamp(), + consumerRecord.value(), 50L, - 100L)) + 100L, + consumerRecord.key())) .collect(Collectors.toList()); KafkaPullerRecordReader recordReader = new KafkaPullerRecordReader(); TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID()); @@ -225,8 +223,8 @@ public KafkaRecordIteratorTest() { this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, - new Long(RECORD_NUMBER), - new Long(RECORD_NUMBER), + (long) RECORD_NUMBER, + (long) RECORD_NUMBER, POLL_TIMEOUT_MS); this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator); } @@ -238,11 +236,11 @@ public KafkaRecordIteratorTest() { private void compareIterator(List> expected, Iterator> kafkaRecordIterator) { - expected.stream().forEachOrdered((expectedRecord) -> { + expected.forEach((expectedRecord) -> { Assert.assertTrue("record with offset " + expectedRecord.offset(), kafkaRecordIterator.hasNext()); ConsumerRecord record = kafkaRecordIterator.next(); - Assert.assertTrue(record.topic().equals(TOPIC)); - Assert.assertTrue(record.partition() == 0); + Assert.assertEquals(record.topic(), TOPIC); + Assert.assertEquals(0, record.partition()); Assert.assertEquals("Offsets not matching", expectedRecord.offset(), record.offset()); byte[] binaryExceptedValue = expectedRecord.value(); byte[] binaryExceptedKey = expectedRecord.key(); @@ -261,7 +259,7 @@ private static void setupProducer() { producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.setProperty("max.block.ms", "10000"); - producer = new KafkaProducer(producerProps); + producer = new KafkaProducer<>(producerProps); LOG.info("kafka producer started"); } @@ -277,13 +275,13 @@ private void setupConsumer() { consumerProps.setProperty("fetch.max.wait.ms", "3001"); consumerProps.setProperty("session.timeout.ms", "3001"); consumerProps.setProperty("metadata.max.age.ms", "100"); - this.consumer = new KafkaConsumer(consumerProps); + this.consumer = new KafkaConsumer<>(consumerProps); } private static void sendData() { LOG.info("Sending {} records", RECORD_NUMBER); RECORDS.stream() - .map(consumerRecord -> new ProducerRecord(consumerRecord.topic(), + .map(consumerRecord -> new ProducerRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.timestamp(), consumerRecord.key(), diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java index 8f9df548f7..4fb9664780 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java @@ -37,7 +37,39 @@ public KafkaRecordWritableTest() { @Test public void testWriteReadFields() throws IOException { ConsumerRecord record = new ConsumerRecord("topic", 0, 3L, "key".getBytes(), "value".getBytes()); - KafkaRecordWritable kafkaRecordWritable = new KafkaRecordWritable(record.partition(), record.offset(), record.timestamp(), record.value(), 0L, 100L); + KafkaRecordWritable + kafkaRecordWritable = + new KafkaRecordWritable(record.partition(), + record.offset(), + record.timestamp(), + record.value(), + 0L, + 100L, + null); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream w = new DataOutputStream(baos); + kafkaRecordWritable.write(w); + w.flush(); + + ByteArrayInputStream input = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream inputStream = new DataInputStream(input); + KafkaRecordWritable actualKafkaRecordWritable = new KafkaRecordWritable(); + actualKafkaRecordWritable.readFields(inputStream); + Assert.assertEquals(kafkaRecordWritable, actualKafkaRecordWritable); + } + + + @Test public void testWriteReadFields2() throws IOException { + ConsumerRecord record = new ConsumerRecord("topic", 0, 3L, "key".getBytes(), "value".getBytes()); + KafkaRecordWritable + kafkaRecordWritable = + new KafkaRecordWritable(record.partition(), + record.offset(), + record.timestamp(), + record.value(), + 0L, + 100L, + "thiskey".getBytes()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream w = new DataOutputStream(baos); kafkaRecordWritable.write(w); diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java index 289dafde36..2a40bff4b5 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java @@ -52,19 +52,25 @@ public class KafkaScanTrimmerTest { private static final Path PATH = new Path("/tmp"); - private ExprNodeDesc zeroInt = ConstantExprBuilder.build(0); - private ExprNodeDesc threeInt = ConstantExprBuilder.build(3); - private ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L); - private ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L); - private ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L); - private ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L); + private final ExprNodeDesc zeroInt = ConstantExprBuilder.build(0); + private final ExprNodeDesc threeInt = ConstantExprBuilder.build(3); + private final ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L); + private final ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L); + private final ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L); + private final ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L); private ExprNodeDesc partitionColumn = - new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, KafkaStreamingUtils.PARTITION_COLUMN, null, false); + new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, + KafkaStreamingUtils.MetadataColumn.PARTITION.getName(), + null, + false); private ExprNodeDesc offsetColumn = - new ExprNodeColumnDesc(TypeInfoFactory.longTypeInfo, KafkaStreamingUtils.OFFSET_COLUMN, null, false); + new ExprNodeColumnDesc(TypeInfoFactory.longTypeInfo, + KafkaStreamingUtils.MetadataColumn.OFFSET.getName(), + null, + false); private String topic = "my_topic"; private Map diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java index 8d68ec27c8..071df3f858 100644 --- kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java @@ -19,11 +19,21 @@ package org.apache.hadoop.hive.kafka; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hive.kafka.KafkaStreamingUtils.*; /** * Test for Utility class. @@ -37,7 +47,7 @@ public KafkaStreamingUtilsTest() { configuration.set("kafka.bootstrap.servers", "localhost:9090"); configuration.set("kafka.consumer.fetch.max.wait.ms", "40"); configuration.set("kafka.consumer.my.new.wait.ms", "400"); - Properties properties = KafkaStreamingUtils.consumerProperties(configuration); + Properties properties = consumerProperties(configuration); Assert.assertEquals("localhost:9090", properties.getProperty("bootstrap.servers")); Assert.assertEquals("40", properties.getProperty("fetch.max.wait.ms")); Assert.assertEquals("400", properties.getProperty("my.new.wait.ms")); @@ -47,13 +57,52 @@ public KafkaStreamingUtilsTest() { Configuration configuration = new Configuration(); configuration.set("kafka.bootstrap.servers", "localhost:9090"); configuration.set("kafka.consumer." + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - KafkaStreamingUtils.consumerProperties(configuration); + consumerProperties(configuration); } @Test(expected = IllegalArgumentException.class) public void canNotSetForbiddenProp2() { Configuration configuration = new Configuration(); configuration.set("kafka.bootstrap.servers", "localhost:9090"); configuration.set("kafka.consumer." + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "value"); - KafkaStreamingUtils.consumerProperties(configuration); + consumerProperties(configuration); + } + + @Test public void testMetadataEnumLookupMapper() { + int partition = 1; + long offset = 5L; + long ts = System.currentTimeMillis(); + long startOffset = 0L; + long endOffset = 200L; + byte[] value = "value".getBytes(); + byte[] key = "key".getBytes(); + // ORDER MATTERS here. + List + expectedWritables = + Arrays.asList(new BytesWritable(key), + new IntWritable(partition), + new LongWritable(offset), + new LongWritable(ts), + new LongWritable(startOffset), + new LongWritable(endOffset)); + KafkaRecordWritable KRWritable = new KafkaRecordWritable(partition, offset, ts, value, startOffset, endOffset, key); + + List + actual = + KAFKA_METADATA_COLUMN_NAMES.stream() + .map(recordWritableFnMap::get) + .map(fn -> fn.apply(KRWritable)) + .collect(Collectors.toList()); + + Assert.assertEquals(expectedWritables, actual); + } + + @Test + public void testEnsureThatAllTheColumnAreListed() { + Assert.assertEquals(MetadataColumn.values().length, KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES.size()); + Assert.assertEquals(MetadataColumn.values().length, KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS.size()); + Assert.assertFalse(Arrays.stream(MetadataColumn.values()) + .map(MetadataColumn::getName) + .anyMatch(name -> !KAFKA_METADATA_COLUMN_NAMES.contains(name))); + Arrays.stream(MetadataColumn.values()).forEach(element -> Assert.assertNotNull(KafkaStreamingUtils.recordWritableFnMap.get(element.getName()))); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java index dbc44a6621..e68b8eaaa7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java @@ -18,13 +18,8 @@ package org.apache.hadoop.hive.ql.metadata; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; /** * StorageHandlerInfo is a marker interface used to provide runtime information associated with a storage handler. diff --git ql/src/test/queries/clientpositive/kafka_storage_handler.q ql/src/test/queries/clientpositive/kafka_storage_handler.q index 8daa3e3bc0..f10bb90e34 100644 --- ql/src/test/queries/clientpositive/kafka_storage_handler.q +++ ql/src/test/queries/clientpositive/kafka_storage_handler.q @@ -14,7 +14,7 @@ TBLPROPERTIES DESCRIBE EXTENDED kafka_table; -Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table; Select count(*) FROM kafka_table; @@ -31,11 +31,11 @@ Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0); -Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5; +Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5; -Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5; +Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5; -Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5; +Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5; -- Timestamp filter @@ -150,6 +150,15 @@ FROM kafka_table_2; Select count(*) FROM kafka_table_2; +CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "wiki_kafka_avro_table", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe"); + +SELECT * FROM wiki_kafka_avro_table_1; +SELECT COUNT (*) from wiki_kafka_avro_table_1; CREATE EXTERNAL TABLE wiki_kafka_avro_table STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' @@ -222,6 +231,7 @@ TBLPROPERTIES describe extended wiki_kafka_avro_table; + select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table; select count(*) from wiki_kafka_avro_table; @@ -231,6 +241,6 @@ select count(distinct `user`) from wiki_kafka_avro_table; select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table; select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090; diff --git ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out index 3dec33d790..593cd07b93 100644 --- ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out +++ ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out @@ -44,6 +44,7 @@ robot boolean from deserializer added int from deserializer deleted int from deserializer delta bigint from deserializer +__key binary from deserializer __partition int from deserializer __offset bigint from deserializer __timestamp bigint from deserializer @@ -51,26 +52,28 @@ __start_offset bigint from deserializer __end_offset bigint from deserializer #### A masked pattern was here #### -PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +StorageHandlerInfo +Partition(topic = test-topic, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [10]] +PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 10 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 0 10 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 0 10 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 0 10 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 0 10 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 -0 0 10 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 -0 0 10 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 -0 0 10 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 -0 0 10 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -0 0 10 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 0 10 0 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 0 10 1 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 0 10 2 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 0 10 3 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 0 10 4 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 0 10 5 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 0 10 6 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 0 10 7 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 0 10 8 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 0 10 9 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 PREHOOK: query: Select count(*) FROM kafka_table PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table @@ -141,40 +144,40 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 0 1 9 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 0 1 9 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 0 1 9 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 -PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 5 6 5 NULL Gypsy Danger nuclear -PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +key 0 5 6 5 NULL Gypsy Danger nuclear +PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 0 5 0 NULL Gypsy Danger nuclear -0 0 5 1 NULL Striker Eureka speed -0 0 5 2 NULL Cherno Alpha masterYi -0 0 5 3 NULL Crimson Typhoon triplets -0 0 5 4 NULL Coyote Tango stringer -PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +key 0 0 5 0 NULL Gypsy Danger nuclear +key 0 0 5 1 NULL Striker Eureka speed +key 0 0 5 2 NULL Cherno Alpha masterYi +key 0 0 5 3 NULL Crimson Typhoon triplets +key 0 0 5 4 NULL Coyote Tango stringer +PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 PREHOOK: type: QUERY PREHOOK: Input: default@kafka_table PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table POSTHOOK: Output: hdfs://### HDFS PATH ### -0 6 10 6 NULL Striker Eureka speed -0 6 10 7 NULL Cherno Alpha masterYi -0 6 10 8 NULL Crimson Typhoon triplets -0 6 10 9 NULL Coyote Tango stringer +key 0 6 10 6 NULL Striker Eureka speed +key 0 6 10 7 NULL Cherno Alpha masterYi +key 0 6 10 8 NULL Crimson Typhoon triplets +key 0 6 10 9 NULL Coyote Tango stringer PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user` from kafka_table where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) PREHOOK: type: QUERY @@ -606,6 +609,52 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@kafka_table_2 POSTHOOK: Output: hdfs://### HDFS PATH ### 10 +PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "wiki_kafka_avro_table", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@wiki_kafka_avro_table_1 +POSTHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table_1 +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "wiki_kafka_avro_table", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@wiki_kafka_avro_table_1 +PREHOOK: query: SELECT * FROM wiki_kafka_avro_table_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table_1 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM wiki_kafka_avro_table_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table_1 +POSTHOOK: Output: hdfs://### HDFS PATH ### +key-0 0 0 1534736225090 0 11 +key-1 0 1 1534739825090 0 11 +key-2 0 2 1534743425090 0 11 +key-3 0 3 1534747025090 0 11 +key-4 0 4 1534750625090 0 11 +key-5 0 5 1534754225090 0 11 +key-6 0 6 1534757825090 0 11 +key-7 0 7 1534761425090 0 11 +key-8 0 8 1534765025090 0 11 +key-9 0 9 1534768625090 0 11 +key-10 0 10 1534772225090 0 11 +PREHOOK: query: SELECT COUNT (*) from wiki_kafka_avro_table_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table_1 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT COUNT (*) from wiki_kafka_avro_table_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table_1 +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES @@ -772,6 +821,7 @@ user string from deserializer deltabucket double from deserializer deleted bigint from deserializer namespace string from deserializer +__key binary from deserializer __partition int from deserializer __offset bigint from deserializer __timestamp bigint from deserializer @@ -779,6 +829,8 @@ __start_offset bigint from deserializer __end_offset bigint from deserializer #### A masked pattern was here #### +StorageHandlerInfo +Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]] PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table @@ -826,20 +878,20 @@ POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### 5522.000000000001 0 PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 PREHOOK: type: QUERY PREHOOK: Input: default@wiki_kafka_avro_table PREHOOK: Output: hdfs://### HDFS PATH ### POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, -`__partition`, `__start_offset`,`__end_offset`,`__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 POSTHOOK: type: QUERY POSTHOOK: Input: default@wiki_kafka_avro_table POSTHOOK: Output: hdfs://### HDFS PATH ### -2018-08-20 08:37:05.09 1534754225090 0 5 11 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 -2018-08-20 09:37:05.09 1534757825090 0 5 11 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 -2018-08-20 10:37:05.09 1534761425090 0 5 11 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 -2018-08-20 11:37:05.09 1534765025090 0 5 11 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 -2018-08-20 12:37:05.09 1534768625090 0 5 11 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 -2018-08-20 13:37:05.09 1534772225090 0 5 11 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +2018-08-20 08:37:05.09 1534754225090 0 5 11 key-5 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 1534757825090 0 5 11 key-6 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 1534761425090 0 5 11 key-7 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 1534765025090 0 5 11 key-8 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 1534768625090 0 5 11 key-9 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 1534772225090 0 5 11 key-10 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 diff --git testutils/ptest2/conf/deployed/master-mr2.properties testutils/ptest2/conf/deployed/master-mr2.properties index 90a654cf8a..333c3def03 100644 --- testutils/ptest2/conf/deployed/master-mr2.properties +++ testutils/ptest2/conf/deployed/master-mr2.properties @@ -182,7 +182,7 @@ qFileTest.erasurecodingCli.groups.normal = mainProperties.${erasurecoding.only.q qFileTest.miniDruid.driver = TestMiniDruidCliDriver qFileTest.miniDruid.directory = ql/src/test/queries/clientpositive -qFileTest.miniDruid.batchSize = 5 +qFileTest.miniDruid.batchSize = 50 qFileTest.miniDruid.queryFilesProperty = qfile qFileTest.miniDruid.include = normal qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files}