diff --git itests/qtest-druid/pom.xml itests/qtest-druid/pom.xml index 79a0fb3f2f..e566fcf4d7 100644 --- itests/qtest-druid/pom.xml +++ itests/qtest-druid/pom.xml @@ -43,7 +43,7 @@ 10.11.1.1 16.0.1 4.1.0 - 0.10.2.0 + 1.0.1 diff --git itests/qtest/pom.xml itests/qtest/pom.xml index 5767806017..801a43d02f 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -138,7 +138,12 @@ ${project.version} test - + + org.apache.hive + kafka-handler + ${project.version} + test + diff --git itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java index c54b2bf63a..b526c29cee 100644 --- itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java +++ itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -56,7 +56,6 @@ public TestMiniDruidKafkaCliDriver(String name, File qfile) { this.qfile = qfile; } - @Ignore("HIVE-19509: Disable tests that are failing continuously") @Test public void testCliDriver() throws Exception { adapter.runTest(name, qfile); diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 27d2974322..77fbfbd01b 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1724,7 +1724,8 @@ druid.query.files=druidmini_test1.q,\ druid.llap.local.query.files=druidmini_noop.q -druid.kafka.query.files=druidkafkamini_basic.q +druid.kafka.query.files=druidkafkamini_basic.q \ + kafka_storage_handler.q # tests to be run by TestErasureCodingHDFSCliDriver and TestCliDriver erasurecoding.shared.query.files=erasure_commands.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 92919e9daf..491b6db581 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -202,6 +202,7 @@ public MiniDruidKafkaCliConfig() { setQueryDir("ql/src/test/queries/clientpositive"); includesFrom(testConfigProps, "druid.kafka.query.files"); + excludeQuery("druidkafkamini_basic.q"); // HIVE-19509 setResultsDir("ql/src/test/results/clientpositive/druid"); setLogDir("itests/qtest/target/tmp/log"); diff --git kafka-handler/pom.xml kafka-handler/pom.xml new file mode 100644 index 0000000000..cbcb2fb170 --- /dev/null +++ kafka-handler/pom.xml @@ -0,0 +1,156 @@ + + + + + + org.apache.hive + hive + 4.0.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + + .. + 1.0.1 + + + kafka-handler + jar + Hive Kafka Storage Handler + + + + + org.apache.hive + hive-exec + provided + ${project.version} + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-client + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + junit + junit + ${junit.version} + test + + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + + + + org.apache.kafka + kafka_2.11 + ${kafka.version} + test + test + + + org.apache.kafka + kafka_2.11 + ${kafka.version} + test + + + + + + dev-fast-build + + + skipShade + !true + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + package + + shade + + + true + false + + + org.apache.kafka:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + static/ + + + + + + + + + + + + + ${basedir}/src/java + ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + \ No newline at end of file diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java new file mode 100644 index 0000000000..827b4475e2 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.JsonSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +; + +public class GenericKafkaSerDe extends AbstractSerDe +{ + private static final Logger log = LoggerFactory.getLogger(GenericKafkaSerDe.class); + // ORDER of fields and types matters here + public static final ImmutableList METADATA_COLUMN_NAMES = ImmutableList.of( + KafkaStorageHandler.__PARTITION, + KafkaStorageHandler.__OFFSET, + KafkaStorageHandler.__TIMESTAMP + ); + public static final ImmutableList METADATA_PRIMITIVE_TYPE_INFO = ImmutableList.of( + TypeInfoFactory.intTypeInfo, + TypeInfoFactory.longTypeInfo, + TypeInfoFactory.longTypeInfo + ); + + private AbstractSerDe delegateSerDe; + private ObjectInspector objectInspector; + private final List columnNames = Lists.newArrayList(); + StructObjectInspector delegateObjectInspector; + + @Override + public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException + { + final String className = tbl.getProperty(KafkaStorageHandler.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName()); + delegateSerDe = createDelegate(className); + delegateSerDe.initialize(conf, tbl); + log.info("Using SerDe instance {}", delegateSerDe.getClass().getCanonicalName()); + if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) { + throw new SerDeException("Was expecting StructObject Inspector but have " + delegateSerDe.getObjectInspector() + .getClass() + .getName()); + } + + delegateObjectInspector = (StructObjectInspector) delegateSerDe.getObjectInspector(); + + final List inspectors; + // Get column names and types + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl + .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + // all table column names + if (!columnNameProperty.isEmpty()) { + columnNames.addAll(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); + } + + columnNames.addAll(METADATA_COLUMN_NAMES); + + if (log.isDebugEnabled()) { + log.debug("columns: {}, {}", columnNameProperty, columnNames); + } + + inspectors = new ArrayList<>(columnNames.size()); + inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs() + .stream() + .map(structField -> structField.getFieldObjectInspector()) + .collect(Collectors.toList())); + inspectors.addAll(METADATA_PRIMITIVE_TYPE_INFO.stream() + .map(KafkaJsonSerDe.typeInfoToObjectInspector) + .collect(Collectors.toList())); + + objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + } + + private AbstractSerDe createDelegate(String className) + { + final Class clazz; + try { + clazz = (Class) Class.forName(className); + } + catch (ClassNotFoundException e) { + log.error("Failed a loading delegate SerDe {}", className); + throw new RuntimeException(e); + } + // we are not setting conf thus null is okay + return ReflectionUtil.newInstance(clazz, null); + } + + @Override + public Class getSerializedClass() + { + return delegateSerDe.getSerializedClass(); + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException + { + return delegateSerDe.serialize(obj, objInspector); + } + + @Override + public SerDeStats getSerDeStats() + { + return delegateSerDe.getSerDeStats(); + } + + @Override + public Object deserialize(Writable blob) throws SerDeException + { + KafkaRecordWritable record = (KafkaRecordWritable) blob; + // switch case the serde nature + final Object row; + if (delegateSerDe instanceof JsonSerDe) { + // @TODO Text constructor copies the data, this op is not needed + row = delegateSerDe.deserialize(new Text(record.getValue())); + } else if (delegateSerDe instanceof AvroSerDe) { + AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable(); + try { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(record.getValue()); + avroGenericRecordWritable.readFields(new DataInputStream(byteArrayInputStream)); + } + catch (IOException e) { + throw new SerDeException(e); + } + row = delegateSerDe.deserialize(avroGenericRecordWritable); + } else { + // default assuming delegate Serde know how to deal with + row = delegateSerDe.deserialize(new BytesRefWritable(record.getValue())); + } + + return columnNames.stream().map(name -> { + switch (name) { + case KafkaStorageHandler.__PARTITION: + return new IntWritable(record.getPartition()); + case KafkaStorageHandler.__OFFSET: + return new LongWritable(record.getOffset()); + case KafkaStorageHandler.__TIMESTAMP: + return new LongWritable(record.getTimestamp()); + default: + return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name)); + } + }).collect(Collectors.toList()); + } + + @Override + public ObjectInspector getObjectInspector() + { + return objectInspector; + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java new file mode 100644 index 0000000000..3a2d7d77d1 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.common.type.TimestampTZ; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KafkaJsonSerDe extends AbstractSerDe +{ + private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonSerDe.class); + private static final DateTimeFormatter TS_PARSER = createAutoParser(); + public static Function typeInfoToObjectInspector = typeInfo -> PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector( + TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName())); + private List columnNames; + private List columnTypes; + private ObjectInspector inspector; + private final ObjectMapper mapper = new ObjectMapper(); + private long rowCount = 0L; + private long rawDataSize = 0L; + + + @Override + public void initialize( + @Nullable Configuration conf, Properties tbl + ) + { + final List inspectors; + // Get column names and types + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl + .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + // all table column names + if (!columnNameProperty.isEmpty()) { + columnNames = Arrays.asList(columnNameProperty.split(columnNameDelimiter)); + } + // all column types + if (!columnTypeProperty.isEmpty()) { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("columns: {}, {}", columnNameProperty, columnNames); + LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes); + } + + inspectors = columnTypes.stream().map(typeInfoToObjectInspector).collect(Collectors.toList()); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + } + + @Override + public Class getSerializedClass() + { + return BytesRefWritable.class; + } + + @Override + public Writable serialize( + Object obj, ObjectInspector objInspector + ) throws SerDeException + { + throw new SerDeException("unimplemented"); + } + + @Override + public SerDeStats getSerDeStats() + { + SerDeStats serDeStats = new SerDeStats(); + serDeStats.setRawDataSize(rawDataSize); + serDeStats.setRowCount(rowCount); + return serDeStats; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException + { + BytesRefWritable record = (BytesRefWritable) blob; + Map payload; + try { + payload = parseAsJson(record.getData()); + rowCount += 1; + rawDataSize += record.getData().length; + } + catch (IOException e) { + throw new SerDeException(e); + } + + final List output = new ArrayList<>(columnNames.size()); + + for (int i = 0; i < columnNames.size(); i++) { + final String name = columnNames.get(i); + final TypeInfo typeInfo = columnTypes.get(i); + final JsonNode value = payload.get(name); + if (value == null) { + output.add(null); + } else { + switch (columnTypes.get(i).getCategory()) { + case PRIMITIVE: + output.add(parseAsPrimitive(value, typeInfo)); + break; + case MAP: + case LIST: + case UNION: + case STRUCT: + throw new SerDeException("not supported yet"); + } + } + + } + return output; + } + + private Object parseAsPrimitive(JsonNode value, TypeInfo typeInfo) throws SerDeException + { + switch (TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()) + .getPrimitiveCategory()) { + case TIMESTAMP: + TimestampWritable timestampWritable = new TimestampWritable(); + timestampWritable.setTime(TS_PARSER.parseMillis(value.textValue())); + return timestampWritable; + + case TIMESTAMPLOCALTZ: + final long numberOfMillis = TS_PARSER.parseMillis(value.textValue()); + return new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime + .ofInstant( + Instant.ofEpochMilli(numberOfMillis), + ((TimestampLocalTZTypeInfo) typeInfo).timeZone() + ))); + + case BYTE: + return new ByteWritable((byte) value.intValue()); + case SHORT: + return (new ShortWritable(value.shortValue())); + case INT: + return new IntWritable(value.intValue()); + case LONG: + return (new LongWritable((value.longValue()))); + case FLOAT: + return (new FloatWritable(value.floatValue())); + case DOUBLE: + return (new DoubleWritable(value.doubleValue())); + case DECIMAL: + return (new HiveDecimalWritable(HiveDecimal.create(value.decimalValue()))); + case CHAR: + return (new HiveCharWritable( + new HiveChar(value.textValue(), ((CharTypeInfo) typeInfo).getLength()))); + case VARCHAR: + return (new HiveVarcharWritable( + new HiveVarchar(value.textValue(), ((CharTypeInfo) typeInfo).getLength()))); + case STRING: + return (new Text(value.textValue())); + case BOOLEAN: + return (new BooleanWritable(value.isBoolean() ? value.booleanValue() : Boolean.valueOf(value.textValue()))); + default: + throw new SerDeException("Unknown type: " + typeInfo.getTypeName()); + } + } + + private Map parseAsJson(byte[] value) throws IOException + { + JsonNode document = mapper.readValue(value, JsonNode.class); + //Hive Column names are case insensitive. + Map documentMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + document.fields() + .forEachRemaining(field -> documentMap.put(field.getKey().toLowerCase(), field.getValue())); + return documentMap; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException + { + if (inspector == null) { + throw new SerDeException("null inspector ??"); + } + return inspector; + } + + private static DateTimeFormatter createAutoParser() + { + final DateTimeFormatter offsetElement = new DateTimeFormatterBuilder() + .appendTimeZoneOffset("Z", true, 2, 4) + .toFormatter(); + + DateTimeParser timeOrOffset = new DateTimeFormatterBuilder() + .append( + null, + new DateTimeParser[]{ + new DateTimeFormatterBuilder().appendLiteral('T').toParser(), + new DateTimeFormatterBuilder().appendLiteral(' ').toParser() + } + ) + .appendOptional(ISODateTimeFormat.timeElementParser().getParser()) + .appendOptional(offsetElement.getParser()) + .toParser(); + + return new DateTimeFormatterBuilder() + .append(ISODateTimeFormat.dateElementParser()) + .appendOptional(timeOrOffset) + .toFormatter(); + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java new file mode 100644 index 0000000000..2931394ef8 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + + +/** + * Kafka puller input format is in charge of reading a exact set of records from a Kafka Queue + * The input split will contain the set of topic partition and start/end offsets + * Records will be returned as bytes + */ +public class KafkaPullerInputFormat extends InputFormat + implements org.apache.hadoop.mapred.InputFormat +{ + + private static final Logger log = LoggerFactory.getLogger(KafkaPullerInputFormat.class); + + + @Override + public InputSplit[] getSplits( + JobConf jobConf, int i + ) throws IOException + { + List inputSplits = null; + try { + inputSplits = computeSplits(jobConf); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + InputSplit[] inputSplitsArray = new InputSplit[inputSplits.size()]; + return inputSplits.toArray(inputSplitsArray); + } + + /** + * Build a full scan using Kafka list partition then beginning/end offsets + * This function might block duo to calls like + * org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets(java.util.Collection) + * + * @param topic kafka topic + * @param consumer initialized kafka consumer + * @param tablePaths hive table path + * + * @return full scan input split collection based on Kafka metadata APIs + */ + private static List buildFullScanFromKafka( + String topic, + KafkaConsumer consumer, + Path[] tablePaths + ) + { + final Map starOffsetsMap; + final Map endOffsetsMap; + + final List topicPartitions; + topicPartitions = fetchTopicPartitions(topic, consumer); + starOffsetsMap = consumer.beginningOffsets(topicPartitions); + endOffsetsMap = consumer.endOffsets(topicPartitions); + + if (log.isDebugEnabled()) { + log.info( + "Found the following partitions [{}]", + topicPartitions.stream().map(topicPartition -> topicPartition.toString()) + .collect(Collectors.joining(",")) + ); + starOffsetsMap.forEach((tp, start) -> log.info("TPartition [{}],Start offsets [{}]", tp, start)); + endOffsetsMap.forEach((tp, end) -> log.info("TPartition [{}],End offsets [{}]", tp, end)); + } + return topicPartitions.stream().map( + topicPartition -> new KafkaPullerInputSplit( + topicPartition.topic(), + topicPartition.partition(), + starOffsetsMap.get(topicPartition), + endOffsetsMap.get(topicPartition), + tablePaths[0] + )).collect(Collectors.toList()); + } + + private List computeSplits(Configuration configuration) + throws IOException, InterruptedException + { + // this will be used to harness some KAFKA blocking calls + final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); + try (KafkaConsumer consumer = new KafkaConsumer(KafkaStreamingUtils.consumerProperties(configuration))) { + final String topic = configuration.get(KafkaStorageHandler.HIVE_KAFKA_TOPIC); + final long timeoutMs = configuration.getLong( + KafkaStorageHandler.HIVE_KAFKA_POLL_TIMEOUT, + KafkaStorageHandler.DEFAULT_CONSUMER_POLL_TIMEOUT_MS + ); + // hive depends on FileSplits + JobConf jobConf = new JobConf(configuration); + Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf); + + Future> futureFullHouse = EXECUTOR.submit(() -> buildFullScanFromKafka( + topic, + consumer, + tablePaths + )); + List fullHouse; + try { + fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS); + } + catch (TimeoutException | ExecutionException e) { + futureFullHouse.cancel(true); + log.error("can not generate full scan split", e); + // at this point we can not go further fail split generation + throw new IOException(e); + } + + + final ImmutableMap.Builder builder = new ImmutableMap.Builder(); + fullHouse.stream().forEach(input -> builder.put(new TopicPartition( + input.getTopic(), + input.getPartition() + ), input)); + + final KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(builder.build(), consumer); + final String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR); + + if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) { + ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized); + log.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString()); + Callable> trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr) + .entrySet() + .stream() + .map(entry -> entry.getValue()) + .collect(Collectors.toList()); + + Future> futureTinyHouse = EXECUTOR.submit(trimmerWorker); + try { + return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS); + } + catch (ExecutionException | TimeoutException e) { + futureTinyHouse.cancel(true); + log.error("Had issue with trimmer will return full scan ", e); + return fullHouse; + } + } + //Case null: it can be filter evaluated to false or no filter at all thus return full scan + return fullHouse; + } + finally { + EXECUTOR.shutdown(); + } + } + + + private static List fetchTopicPartitions(String topic, KafkaConsumer consumer) + { + // this will block till REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms" + // then throws org.apache.kafka.common.errors.TimeoutException if can not fetch metadata + // @TODO add retry logic maybe + List partitions = consumer.partitionsFor(topic); + return partitions.stream().map(p -> new TopicPartition(topic, p.partition())) + .collect(Collectors.toList()); + } + + @Override + public RecordReader getRecordReader( + InputSplit inputSplit, + JobConf jobConf, Reporter reporter + ) throws IOException + { + return new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplit, jobConf); + } + + @Override + public List getSplits( + JobContext jobContext + ) throws IOException, InterruptedException + { + return computeSplits(jobContext.getConfiguration()).stream() + .map(kafkaPullerInputSplit -> (org.apache.hadoop.mapreduce.InputSplit) kafkaPullerInputSplit) + .collect(Collectors.toList()); + } + + @Override + public org.apache.hadoop.mapreduce.RecordReader createRecordReader( + org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext + ) throws IOException, InterruptedException + { + return new KafkaPullerRecordReader(); + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java new file mode 100644 index 0000000000..9c6e866b91 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; + +import javax.annotation.Nullable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class KafkaPullerInputSplit extends FileSplit + implements org.apache.hadoop.mapred.InputSplit +{ + private String topic; + private long startOffset; + private int partition; + private long endOffset; + + public KafkaPullerInputSplit() + { + super((Path) null, 0, 0, (String[]) null); + } + + public KafkaPullerInputSplit( + String topic, int partition, long startOffset, + long endOffset, + Path dummyPath + ) + { + super(dummyPath, 0, 0, (String[]) null); + this.topic = topic; + this.startOffset = startOffset; + this.partition = partition; + this.endOffset = endOffset; + Preconditions.checkArgument( + startOffset >= 0 && startOffset <= endOffset, + "start [%s] has to be positive and >= end [%]", + startOffset, + endOffset + ); + } + + @Override + public long getLength() + { + return 0; + } + + @Override + public String[] getLocations() throws IOException + { + return new String[0]; + } + + @Override + public void write(DataOutput dataOutput) throws IOException + { + super.write(dataOutput); + dataOutput.writeUTF(topic); + dataOutput.writeInt(partition); + dataOutput.writeLong(startOffset); + dataOutput.writeLong(endOffset); + } + + @Override + public void readFields(DataInput dataInput) throws IOException + { + super.readFields(dataInput); + topic = dataInput.readUTF(); + partition = dataInput.readInt(); + startOffset = dataInput.readLong(); + endOffset = dataInput.readLong(); + Preconditions.checkArgument( + startOffset >= 0 && startOffset <= endOffset, + "start [%s] has to be positive and >= end [%]", + startOffset, + endOffset + ); + } + + public String getTopic() + { + return topic; + } + + public int getPartition() + { + return partition; + } + + public long getStartOffset() + { + return startOffset; + } + + public long getEndOffset() + { + return endOffset; + } + + + + + /** + * Compute the intersection of 2 splits. Splits must share the same topic and partition number. + * + * @param split1 + * @param split2 + * + * @return new split that represents range intersection or null if it is not overlapping + */ + @Nullable + public static KafkaPullerInputSplit intersectRange(KafkaPullerInputSplit split1, KafkaPullerInputSplit split2) { + assert (split1.topic == split2.topic); + assert (split1.partition == split2.partition); + final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset()); + final long endOffset = Math.min(split1.getEndOffset(), split2.getEndOffset()); + if (startOffset > endOffset) { + // there is no overlapping + return null; + } + return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath()); + } + + /** + * Compute union of ranges between splits. Splits must share the same topic and partition + * + * @param split1 + * @param split2 + * + * @return new split with a range including both splits. + */ + public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, KafkaPullerInputSplit split2) { + assert (split1.topic == split2.topic); + assert (split1.partition == split2.partition); + final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset()); + final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset()); + return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof KafkaPullerInputSplit)) { + return false; + } + KafkaPullerInputSplit that = (KafkaPullerInputSplit) o; + return Objects.equal(getTopic(), that.getTopic()) && Objects + .equal(getStartOffset(), that.getStartOffset()) && Objects + .equal(getPartition(), that.getPartition()) && Objects + .equal(getEndOffset(), that.getEndOffset()); + } + + @Override + public int hashCode() + { + return Objects.hashCode(getTopic(), getStartOffset(), getPartition(), getEndOffset()); + } + + @Override + public String toString() + { + return "KafkaPullerInputSplit{" + + "topic='" + topic + '\'' + + ", startOffset=" + startOffset + + ", partition=" + partition + + ", endOffset=" + endOffset + + ", path=" + super.getPath().toString() + + '}'; + } + + public static KafkaPullerInputSplit copyOf(KafkaPullerInputSplit other) + { + return new KafkaPullerInputSplit( + other.getTopic(), + other.getPartition(), + other.getStartOffset(), + other.getEndOffset(), + other.getPath() + ); + } + + public KafkaPullerInputSplit clone() + { + return copyOf(this); + } + + public static List slice(long sliceSize, final KafkaPullerInputSplit split) { + if (split.getEndOffset() - split.getStartOffset() > sliceSize) { + ImmutableList.Builder builder = ImmutableList.builder(); + long start = split.getStartOffset(); + while (start < split.getEndOffset() - sliceSize) { + builder.add(new KafkaPullerInputSplit(split.topic, split.partition, start, start + sliceSize + 1, split.getPath())); + start += sliceSize + 1; + } + // last split + if (start < split.getEndOffset()) { + builder.add(new KafkaPullerInputSplit(split.topic, split.partition, start, split.getEndOffset(), split.getPath())); + } + return builder.build(); + } + + return Collections.singletonList(copyOf(split)); + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java new file mode 100644 index 0000000000..0e353dc525 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.io.Closer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Properties; + +public class KafkaPullerRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader +{ + + private static final Logger log = LoggerFactory.getLogger(KafkaPullerRecordReader.class); + + private final Closer closer = Closer.create(); + private KafkaConsumer consumer = null; + private Configuration config = null; + private KafkaRecordWritable currentWritableValue; + private Iterator> recordsCursor = null; + + private TopicPartition topicPartition; + private long startOffset; + private long endOffset; + + private long totalNumberRecords = 0l; + private long consumedRecords = 0l; + private long readBytes = 0l; + private long pollTimeout; + private volatile boolean started = false; + + public KafkaPullerRecordReader() + { + } + + private void initConsumer() + { + if (consumer == null) { + log.info("Initializing Kafka Consumer"); + final Properties properties = KafkaStreamingUtils.consumerProperties(config); + String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + Preconditions.checkNotNull(brokerString, "broker end point can not be null"); + log.info("Starting Consumer with Kafka broker string [{}]", brokerString); + consumer = new KafkaConsumer(properties); + closer.register(consumer); + } + } + + public KafkaPullerRecordReader(KafkaPullerInputSplit inputSplit, Configuration jobConf) + { + initialize(inputSplit, jobConf); + } + + synchronized private void initialize(KafkaPullerInputSplit inputSplit, Configuration jobConf) + { + if (!started) { + this.config = jobConf; + computeTopicPartitionOffsets(inputSplit); + initConsumer(); + pollTimeout = config.getLong( + KafkaStorageHandler.HIVE_KAFKA_POLL_TIMEOUT, + KafkaStorageHandler.DEFAULT_CONSUMER_POLL_TIMEOUT_MS + ); + log.debug("Consumer poll timeout [{}] ms", pollTimeout); + recordsCursor = new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); + started = true; + } + } + + private void computeTopicPartitionOffsets(KafkaPullerInputSplit split) + { + String topic = split.getTopic(); + int partition = split.getPartition(); + startOffset = split.getStartOffset(); + endOffset = split.getEndOffset(); + topicPartition = new TopicPartition(topic, partition); + Preconditions.checkState( + startOffset >= 0 && startOffset <= endOffset, + "Start [%s] has to be positive and less or equal than End [%s]", + startOffset, + endOffset + ); + totalNumberRecords += endOffset - startOffset; + } + + @Override + synchronized public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext context) + { + initialize((KafkaPullerInputSplit) inputSplit, context.getConfiguration()); + } + + @Override + public boolean next(NullWritable nullWritable, KafkaRecordWritable bytesWritable) + { + if (started && recordsCursor.hasNext()) { + ConsumerRecord record = recordsCursor.next(); + bytesWritable.set(record); + consumedRecords += 1; + readBytes += record.serializedValueSize(); + return true; + } + return false; + } + + @Override + public NullWritable createKey() + { + return NullWritable.get(); + } + + @Override + public KafkaRecordWritable createValue() + { + return new KafkaRecordWritable(); + } + + @Override + public long getPos() throws IOException + { + + return consumedRecords; + } + + @Override + public boolean nextKeyValue() throws IOException + { + currentWritableValue = new KafkaRecordWritable(); + if (next(NullWritable.get(), currentWritableValue)) { + return true; + } + currentWritableValue = null; + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public KafkaRecordWritable getCurrentValue() throws IOException, InterruptedException + { + return Preconditions.checkNotNull(currentWritableValue); + } + + @Override + public float getProgress() throws IOException + { + if (consumedRecords == 0) { + return 0f; + } + if (consumedRecords >= totalNumberRecords) { + return 1f; + } + return consumedRecords * 1.0f / totalNumberRecords; + } + + @Override + public void close() throws IOException + { + if (!started) { + return; + } + log.trace("total read bytes [{}]", readBytes); + if (consumer != null) { + consumer.wakeup(); + } + closer.close(); + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java new file mode 100644 index 0000000000..a5e1c0b247 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Iterator over Kafka Records to read records from a single topic partition inclusive start exclusive end. + *

+ * If {@code startOffset} is not null will seek up to that offset + * Else If {@code startOffset} is null will seek to beginning see {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(Collection)} + *

+ * When provided with an end offset it will return records up to the record with offset == endOffset - 1, + * Else If end offsets is null it will read up to the current end see {@link org.apache.kafka.clients.consumer.Consumer#endOffsets(Collection)} + */ +public class KafkaRecordIterator implements Iterator> +{ + private static final Logger log = LoggerFactory.getLogger(KafkaRecordIterator.class); + + private final Consumer consumer; + private final TopicPartition topicPartition; + private long endOffset; + private long startOffset; + private final long pollTimeout; + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + private ConsumerRecords records; + private long currentOffset; + private ConsumerRecord nextRecord; + private boolean hasMore = true; + private Iterator> cursor = null; + + /** + * @param consumer functional kafka consumer + * @param topicPartition kafka topic partition + * @param startOffset start position of stream. + * @param endOffset requested end position. If null will read up to current last + * @param pollTimeout poll time out in ms + */ + public KafkaRecordIterator( + Consumer consumer, TopicPartition topicPartition, + @Nullable Long startOffset, @Nullable Long endOffset, long pollTimeout + ) + { + this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null"); + this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null"); + this.pollTimeout = pollTimeout; + Preconditions.checkState(pollTimeout > 0, "poll timeout has to be positive number"); + this.startOffset = startOffset == null ? -1l : startOffset; + this.endOffset = endOffset == null ? -1l : endOffset; + } + + public KafkaRecordIterator( + Consumer consumer, TopicPartition tp, long pollTimeout + ) + { + this(consumer, tp, null, null, pollTimeout); + } + + private void assignAndSeek() + { + // assign topic partition to consumer + final List topicPartitionList = ImmutableList.of(topicPartition); + if (log.isTraceEnabled()) { + stopwatch.reset().start(); + } + + consumer.assign(topicPartitionList); + // compute offsets and seek to start + if (startOffset > -1) { + log.info("Seeking to offset [{}] of topic partition [{}]", startOffset, topicPartition); + consumer.seek(topicPartition, startOffset); + } else { + log.info("Seeking to beginning of topic partition [{}]", topicPartition); + // seekToBeginning is lazy thus need to call position() or poll(0) + this.consumer.seekToBeginning(Collections.singleton(topicPartition)); + startOffset = consumer.position(topicPartition); + } + if (endOffset == -1) { + this.endOffset = consumer.endOffsets(topicPartitionList).get(topicPartition); + log.debug("EndOffset is {}", endOffset); + } + currentOffset = consumer.position(topicPartition); + Preconditions.checkState(this.endOffset >= currentOffset, + "End offset [%s] need to be greater than start offset [%s]", this.endOffset, currentOffset + ); + log.info("Kafka Iterator ready, assigned TopicPartition [{}]; startOffset [{}]; endOffset [{}]", topicPartition, + currentOffset, this.endOffset + ); + if (log.isTraceEnabled()) { + stopwatch.stop(); + log.trace("Time to assign and seek [{}] ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + } + + @Override + public boolean hasNext() + { + if (records == null) { + assignAndSeek(); + } + //Init poll OR Need to poll at least one more record since currentOffset + 1 < endOffset + if (records == null || (hasMore == false && currentOffset + 1 < endOffset)) { + pollRecords(); + findNext(); + } + return hasMore; + } + + private void pollRecords() + { + if (log.isTraceEnabled()) { + stopwatch.reset().start(); + } + records = consumer.poll(pollTimeout); + if (log.isTraceEnabled()) { + stopwatch.stop(); + log.trace("Pulled [{}] records in [{}] ms", records.count(), + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ); + } + Preconditions.checkState(!records.isEmpty() || currentOffset == endOffset, + "Current read offset [%s]-TopicPartition:[%s], End offset[%s]." + + "Consumer returned 0 record due to exhausted poll timeout [%s] ms", + currentOffset, topicPartition.toString(), endOffset, pollTimeout + ); + cursor = records.iterator(); + } + + @Override + public ConsumerRecord next() + { + ConsumerRecord value = nextRecord; + Preconditions.checkState(value.offset() < endOffset); + findNext(); + return Preconditions.checkNotNull(value); + } + + private void findNext() + { + if (cursor.hasNext()) { + nextRecord = cursor.next(); + hasMore = true; + if (nextRecord.offset() < endOffset) { + currentOffset = nextRecord.offset(); + return; + } + } + hasMore = false; + nextRecord = null; + } + +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java new file mode 100644 index 0000000000..08cf53be64 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import org.apache.hadoop.io.Writable; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +/** + * Writable implementation of Kafka ConsumerRecord. + * Serialized in the form + * kafkaRecordTimestamp(long) | kafkaPartition (int) | recordOffset (long) | value.size (int) | value (byte []) + */ +public class KafkaRecordWritable implements Writable { + + private int partition; + private long offset; + private long timestamp; + private byte [] value; + + public static KafkaRecordWritable fromKafkaRecord(ConsumerRecord consumerRecord) { + return new KafkaRecordWritable(consumerRecord.partition(), consumerRecord.offset(), + consumerRecord.timestamp(), consumerRecord.value() + ); + } + + public void set(ConsumerRecord consumerRecord) { + this.partition = consumerRecord.partition(); + this.timestamp = consumerRecord.timestamp(); + this.offset = consumerRecord.offset(); + this.value = consumerRecord.value(); + } + + private KafkaRecordWritable(int partition, long offset, long timestamp, byte[] value) { + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.value = value; + } + + public KafkaRecordWritable() { + } + + @Override public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(timestamp); + dataOutput.writeInt(partition); + dataOutput.writeLong(offset); + dataOutput.writeInt(value.length); + dataOutput.write(value); + } + + @Override public void readFields(DataInput dataInput) throws IOException { + timestamp = dataInput.readLong(); + partition = dataInput.readInt(); + offset = dataInput.readLong(); + int size = dataInput.readInt(); + if (size > 0) { + value = new byte[size]; + dataInput.readFully(value); + } else { + value = new byte[0]; + } + } + + public int getPartition() { + return partition; + } + + public long getOffset() { + return offset; + } + + public long getTimestamp() { + return timestamp; + } + + public byte[] getValue() { + return value; + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaRecordWritable)) { + return false; + } + KafkaRecordWritable that = (KafkaRecordWritable) o; + return getPartition() == that.getPartition() && getOffset() == that.getOffset() + && getTimestamp() == that.getTimestamp() && Arrays.equals(getValue(), that.getValue()); + } + + @Override public int hashCode() { + + int result = Objects.hash(getPartition(), getOffset(), getTimestamp()); + result = 31 * result + Arrays.hashCode(getValue()); + return result; + } + +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java new file mode 100644 index 0000000000..4ce82b5939 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** + * Kafka Range trimmer, takes a full kafka scan and prune the scan based on a filter expression + * it is a Best effort trimmer and it can not replace the filter it self, filtration still takes place in Hive executor + */ +public class KafkaScanTrimmer +{ + private static final Logger log = LoggerFactory.getLogger(KafkaScanTrimmer.class); + private final Map fullHouse; + private final KafkaConsumer kafkaConsumer; + + + /** + * @param fullHouse initial full scan to be pruned, this is a map of Topic partition to input split. + * @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed + */ + public KafkaScanTrimmer( + Map fullHouse, + KafkaConsumer kafkaConsumer + ) + { + this.fullHouse = fullHouse; + this.kafkaConsumer = kafkaConsumer; + } + + /** + * This might block due to calls like: + * org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(java.util.Map) + * + * @param filterExpression filter expression to be used for pruning scan + * + * @return tiny house of of the full house based on filter expression + */ + public Map computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) + { + Map optimizedScan = parse(filterExpression); + + if (log.isDebugEnabled()) { + if (optimizedScan != null) { + log.debug("Optimized scan:"); + optimizedScan.forEach((tp, input) -> log.info( + "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]", + tp.topic(), + tp.partition(), + input.getStartOffset(), input.getEndOffset() + )); + } else { + log.debug("No optimization thus using full scan "); + fullHouse.forEach((tp, input) -> log.info( + "Topic-[{}] Partition-[{}] - Split startOffset [{}] :-> endOffset [{}]", + tp.topic(), + tp.partition(), + input.getStartOffset(), input.getEndOffset() + )); + } + } + return optimizedScan == null ? fullHouse : optimizedScan; + } + + /** + * @param expression filter to parse and trim the full scan + * + * @return Map of optimized kafka range scans or null if it is impossible to optimize. + */ + @Nullable + private Map parse(ExprNodeDesc expression) + { + if (expression.getClass() != ExprNodeGenericFuncDesc.class) { + return null; + } + // get the kind of expression + ExprNodeGenericFuncDesc expr = (ExprNodeGenericFuncDesc) expression; + Class op = expr.getGenericUDF().getClass(); + + // handle the logical operators + if (FunctionRegistry.isOpOr(expr)) { + return pushOrOp(expr); + } + if (FunctionRegistry.isOpAnd(expr)) { + return pushAndOp(expr); + } + + if (op == GenericUDFOPGreaterThan.class) { + return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, true); + } else if (op == GenericUDFOPEqualOrGreaterThan.class) { + return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, true); + } else if (op == GenericUDFOPLessThan.class) { + return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN, false); + } else if (op == GenericUDFOPEqualOrLessThan.class) { + return pushLeaf(expr, PredicateLeaf.Operator.LESS_THAN_EQUALS, false); + } else if (op == GenericUDFOPEqual.class) { + return pushLeaf(expr, PredicateLeaf.Operator.EQUALS, false); + // otherwise, we didn't understand it, so bailout + } else { + return null; + } + } + + + /** + * @param expr leaf node to push + * @param operator operator + * @param negation true if it is a negation, this is used to represent: + * GenericUDFOPGreaterThan and GenericUDFOPEqualOrGreaterThan + * using PredicateLeaf.Operator.LESS_THAN and PredicateLeaf.Operator.LESS_THAN_EQUALS + * + * @return leaf scan or null if can not figure out push down + */ + @Nullable + private Map pushLeaf( + ExprNodeGenericFuncDesc expr, PredicateLeaf.Operator operator, boolean negation + ) + { + if (expr.getChildren().size() != 2) { + return null; + } + GenericUDF genericUDF = expr.getGenericUDF(); + if (!(genericUDF instanceof GenericUDFBaseCompare)) { + return null; + } + ExprNodeDesc expr1 = expr.getChildren().get(0); + ExprNodeDesc expr2 = expr.getChildren().get(1); + // We may need to peel off the GenericUDFBridge that is added by CBO or user + if (expr1.getTypeInfo().equals(expr2.getTypeInfo())) { + expr1 = getColumnExpr(expr1); + expr2 = getColumnExpr(expr2); + } + + ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2); + if (extracted == null || (extracted.length > 2)) { + return null; + } + + ExprNodeColumnDesc columnDesc; + ExprNodeConstantDesc constantDesc; + final boolean flip; + + if (extracted[0] instanceof ExprNodeColumnDesc) { + columnDesc = (ExprNodeColumnDesc) extracted[0]; + constantDesc = (ExprNodeConstantDesc) extracted[1]; + flip = false; + + } else { + flip = true; + columnDesc = (ExprNodeColumnDesc) extracted[1]; + constantDesc = (ExprNodeConstantDesc) extracted[0]; + } + + if (columnDesc.getColumn().equals(KafkaStorageHandler.__PARTITION)) { + return buildScanFormPartitionPredicate( + fullHouse, + operator, + ((Number) constantDesc.getValue()).intValue(), + flip, + negation + ); + + } + if (columnDesc.getColumn().equals(KafkaStorageHandler.__OFFSET)) { + return buildScanFromOffsetPredicate( + fullHouse, + operator, + ((Number) constantDesc.getValue()).longValue(), + flip, + negation + ); + } + + if (columnDesc.getColumn().equals(KafkaStorageHandler.__TIMESTAMP)) { + long timestamp = ((Number) constantDesc.getValue()).longValue(); + return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer); + } + return null; + } + + + /** + * Trim kafka scan using a leaf binary predicate on partition column + * + * @param fullScan kafka full scan to be optimized + * @param operator predicate operator, equal, lessThan or lessThanEqual + * @param partitionConst partition constant value + * @param flip true if the position of column and constant is flipped by default assuming column OP constant + * @param negation true if the expression is a negation of the original expression + * + * @return filtered kafka scan + */ + @VisibleForTesting + protected static Map buildScanFormPartitionPredicate( + Map fullScan, + PredicateLeaf.Operator operator, int partitionConst, boolean flip, boolean negation + ) + { + final Predicate predicate; + final Predicate intermediatePredicate; + switch (operator) { + case EQUALS: + predicate = topicPartition -> topicPartition != null && topicPartition.partition() == partitionConst; + break; + case LESS_THAN: + intermediatePredicate = flip + ? topicPartition -> topicPartition != null + && partitionConst < topicPartition.partition() + : topicPartition -> topicPartition != null + && topicPartition.partition() < partitionConst; + + predicate = negation ? intermediatePredicate.negate() : intermediatePredicate; + break; + case LESS_THAN_EQUALS: + intermediatePredicate = flip ? topicPartition -> topicPartition != null + && partitionConst + <= topicPartition.partition() + : topicPartition -> topicPartition != null + && topicPartition.partition() <= partitionConst; + + predicate = negation ? intermediatePredicate.negate() : intermediatePredicate; + break; + default: + //Default to select * for unknown cases + predicate = topicPartition -> true; + } + + ImmutableMap.Builder builder = ImmutableMap.builder(); + // Filter full scan based on predicate + fullScan.entrySet() + .stream() + .filter(entry -> predicate.test(entry.getKey())) + .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone())); + return builder.build(); + } + + /** + * @param fullScan full kafka scan to be pruned + * @param operator operator kind + * @param offsetConst offset constant value + * @param flip true if position of constant and column were flipped by default assuming COLUMN OP CONSTANT + * @param negation true if the expression is a negation of the original expression + * + * @return optimized kafka scan + */ + @VisibleForTesting + protected static Map buildScanFromOffsetPredicate( + final Map fullScan, + PredicateLeaf.Operator operator, long offsetConst, boolean flip, boolean negation + ) + { + final boolean isEndBound; + final long startOffset; + final long endOffset; + + if (flip == false && negation == false || flip == true && negation == true) { + isEndBound = true; + } else { + isEndBound = false; + } + switch (operator) { + case LESS_THAN_EQUALS: + if (isEndBound) { + startOffset = -1; + endOffset = negation ? offsetConst : offsetConst + 1; + } else { + endOffset = -1; + startOffset = negation ? offsetConst + 1 : offsetConst; + } + break; + case EQUALS: + startOffset = offsetConst; + endOffset = offsetConst + 1; + break; + case LESS_THAN: + if (isEndBound) { + endOffset = negation ? offsetConst + 1 : offsetConst; + startOffset = -1; + } else { + endOffset = -1; + startOffset = negation ? offsetConst : offsetConst + 1; + } + break; + default: + // default to select * + startOffset = -1; + endOffset = -1; + } + + final Map newScan = new HashMap<>(); + + fullScan.forEach((tp, existingInputSplit) -> { + final KafkaPullerInputSplit newInputSplit; + if (startOffset != -1 && endOffset == -1) { + newInputSplit = new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + // @TODO make sure that this is okay + //if the user as for start offset > max offset will replace with last offset + Math.min(startOffset, existingInputSplit.getEndOffset()), + existingInputSplit.getEndOffset(), + existingInputSplit.getPath() + ); + } else if (endOffset != -1 && startOffset == -1) { + newInputSplit = new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + existingInputSplit.getStartOffset(), + //@TODO check this, if user ask for non existing end offset ignore it and position head on start + Math.max(endOffset, existingInputSplit.getStartOffset()), + existingInputSplit.getPath() + ); + } else if (endOffset == startOffset + 1) { + if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) { + newInputSplit = new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + //@TODO check this with team if we have ask for offset out of range what to do ? + // here am seeking to last offset + existingInputSplit.getEndOffset(), + existingInputSplit.getEndOffset(), + existingInputSplit.getPath() + ); + } else { + newInputSplit = new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + startOffset, + endOffset, + existingInputSplit.getPath() + ); + } + + } else { + newInputSplit = new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + existingInputSplit.getStartOffset(), + existingInputSplit.getEndOffset(), + existingInputSplit.getPath() + ); + } + + newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit)); + }); + + return newScan; + } + + @Nullable + protected static Map buildScanForTimesPredicate( + final Map fullHouse, + PredicateLeaf.Operator operator, long timestamp, boolean flip, boolean negation, KafkaConsumer consumer + ) + { + long increment = (flip && operator == PredicateLeaf.Operator.LESS_THAN + || negation && operator == PredicateLeaf.Operator.LESS_THAN_EQUALS) ? 1L : 0L; + // only accepted cases are timestamp_column [ > ; >= ; = ]constant + if (operator == PredicateLeaf.Operator.EQUALS || flip ^ negation) { + final Map timePartitionsMap = Maps.toMap(fullHouse.keySet(), tp -> timestamp + increment); + try { + // Based on Kafka docs + // NULL will be returned for that partition If the message format version in a partition is before 0.10.0 + Map offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap); + final Map newScan = Maps.toMap(fullHouse.keySet(), tp -> { + KafkaPullerInputSplit existing = fullHouse.get(tp); + OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp); + //Null in case filter doesn't match or field not existing ie old broker thus return empty scan. + final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset(); + return new KafkaPullerInputSplit( + tp.topic(), + tp.partition(), + startOffset, + existing.getEndOffset(), + existing.getPath() + ); + }); + return newScan; + } + catch (Exception e) { + log.error("Error while looking up offsets for time", e); + //Bailout when can not figure out offsets for times. + return null; + } + + } + return null; + } + + /** + * @param expr And expression to be parsed + * + * @return either full scan or an optimized sub scan. + */ + private Map pushAndOp(ExprNodeGenericFuncDesc expr) + { + Map currentScan = new HashMap<>(); + + fullHouse.forEach((tp, input) -> currentScan.put( + tp, + KafkaPullerInputSplit.copyOf(input) + )); + + for (ExprNodeDesc child : + expr.getChildren()) { + Map scan = parse(child); + if (scan != null) { + Set currentKeys = ImmutableSet.copyOf(currentScan.keySet()); + currentKeys.stream().forEach(key -> { + KafkaPullerInputSplit newSplit = scan.get(key); + KafkaPullerInputSplit oldSplit = currentScan.get(key); + currentScan.remove(key); + if (newSplit != null) { + KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit); + if (intersectionSplit != null) { + currentScan.put(key, intersectionSplit); + } + } + }); + + } + } + return currentScan; + } + + @Nullable + private Map pushOrOp(ExprNodeGenericFuncDesc expr) + { + final Map currentScan = new HashMap<>(); + for (ExprNodeDesc child : + expr.getChildren()) { + Map scan = parse(child); + if (scan == null) { + // if any of the children is unknown bailout + return null; + } + + scan.forEach((tp, input) -> { + KafkaPullerInputSplit existingSplit = currentScan.get(tp); + currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit)); + }); + } + return currentScan; + } + + private static ExprNodeDesc getColumnExpr(ExprNodeDesc expr) + { + if (expr instanceof ExprNodeColumnDesc) { + return expr; + } + ExprNodeGenericFuncDesc funcDesc = null; + if (expr instanceof ExprNodeGenericFuncDesc) { + funcDesc = (ExprNodeGenericFuncDesc) expr; + } + if (null == funcDesc) { + return expr; + } + GenericUDF udf = funcDesc.getGenericUDF(); + // check if its a simple cast expression. + if ((udf instanceof GenericUDFBridge || udf instanceof GenericUDFToBinary + || udf instanceof GenericUDFToChar || udf instanceof GenericUDFToVarchar + || udf instanceof GenericUDFToDecimal || udf instanceof GenericUDFToDate + || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp) + && funcDesc.getChildren().size() == 1 + && funcDesc.getChildren().get(0) instanceof ExprNodeColumnDesc) { + return expr.getChildren().get(0); + } + return expr; + } + +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java new file mode 100644 index 0000000000..0639707e73 --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class KafkaStorageHandler implements HiveStorageHandler +{ + + public static final String __TIMESTAMP = "__timestamp"; + public static final String __PARTITION = "__partition"; + public static final String __OFFSET = "__offset"; + public static final String SERDE_CLASS_NAME = "kafka.serde.class"; + public static final String HIVE_KAFKA_TOPIC = "kafka.topic"; + public static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer"; + public static final String HIVE_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms"; + public static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000l; // 5 seconds + + private static final Logger log = LoggerFactory.getLogger(KafkaStorageHandler.class); + + Configuration configuration; + + @Override + public Class getInputFormatClass() + { + return KafkaPullerInputFormat.class; + } + + @Override + public Class getOutputFormatClass() + { + return NullOutputFormat.class; + } + + @Override + public Class getSerDeClass() + { + return GenericKafkaSerDe.class; + } + + @Override + public HiveMetaHook getMetaHook() + { + return null; + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException + { + return new DefaultHiveAuthorizationProvider(); + } + + @Override + public void configureInputJobProperties( + TableDesc tableDesc, + Map jobProperties + ) + { + jobProperties.put(HIVE_KAFKA_TOPIC, Preconditions + .checkNotNull( + tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC), + "kafka topic missing set table property->" + HIVE_KAFKA_TOPIC + )); + log.debug("Table properties: Kafka Topic {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_TOPIC)); + jobProperties.put(HIVE_KAFKA_BOOTSTRAP_SERVERS, Preconditions + .checkNotNull( + tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS), + "Broker address missing set table property->" + HIVE_KAFKA_BOOTSTRAP_SERVERS + )); + log.debug("Table properties: Kafka broker {}", tableDesc.getProperties().getProperty(HIVE_KAFKA_BOOTSTRAP_SERVERS)); + jobProperties.put( + SERDE_CLASS_NAME, + tableDesc.getProperties().getProperty(SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName()) + ); + + log.info("Table properties: SerDe class name {}", jobProperties.get(SERDE_CLASS_NAME)); + + //set extra properties + tableDesc.getProperties() + .entrySet() + .stream() + .filter( + objectObjectEntry -> objectObjectEntry.getKey() + .toString() + .toLowerCase() + .startsWith(CONSUMER_CONFIGURATION_PREFIX)) + .forEach(entry -> { + String key = entry.getKey().toString().substring(CONSUMER_CONFIGURATION_PREFIX.length() + 1); + String value = entry.getValue().toString(); + jobProperties.put(key, value); + log.info("Setting extra job properties: key [{}] -> value [{}]", key, value); + + }); + } + + @Override + public void configureInputJobCredentials( + TableDesc tableDesc, + Map secrets + ) + { + + } + + @Override + public void configureOutputJobProperties( + TableDesc tableDesc, + Map jobProperties + ) + { + + } + + @Override + public void configureTableJobProperties( + TableDesc tableDesc, + Map jobProperties + ) + { + configureInputJobProperties(tableDesc, jobProperties); + } + + @Override + public void configureJobConf(TableDesc tableDesc, JobConf jobConf) + { + Map properties = new HashMap<>(); + configureInputJobProperties(tableDesc, properties); + properties.forEach((key, value) -> jobConf.set(key, value)); + try { + KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void setConf(Configuration configuration) + { + this.configuration = configuration; + } + + @Override + public Configuration getConf() + { + return configuration; + } + + @Override + public String toString() + { + return "org.apache.hadoop.hive.kafka.KafkaStorageHandler"; + } +} diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java new file mode 100644 index 0000000000..e5c83f935e --- /dev/null +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.util.StringUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + + +/** + * Utilities class + */ +public class KafkaStreamingUtils { + + private KafkaStreamingUtils() {} + + /** + * @param configuration Job configs + * + * @return default consumer properties + */ + public static Properties consumerProperties(Configuration configuration) { + final Properties props = new Properties(); + // those are very important to set to avoid long blocking + props.setProperty("request.timeout.ms", "10001" ); + props.setProperty("fetch.max.wait.ms", "10000" ); + props.setProperty("session.timeout.ms", "10000" ); + // we are managing the commit offset + props.setProperty("enable.auto.commit", "false"); + // we are seeking in the stream so no reset + props.setProperty("auto.offset.reset", "none"); + String brokerEndPoint = configuration.get(KafkaStorageHandler.HIVE_KAFKA_BOOTSTRAP_SERVERS); + props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint); + props.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // user can always override stuff + final Map kafkaProperties = + configuration.getValByRegex("^" + KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX + "\\..*"); + for (Map.Entry entry : kafkaProperties.entrySet()) { + props.setProperty(entry.getKey().substring( + KafkaStorageHandler.CONSUMER_CONFIGURATION_PREFIX.length() + 1), + entry.getValue() + ); + } + return props; + } + + public static void copyDependencyJars(Configuration conf, Class... classes) throws IOException + { + Set jars = new HashSet<>(); + FileSystem localFs = FileSystem.getLocal(conf); + jars.addAll(conf.getStringCollection("tmpjars")); + jars.addAll(Arrays.asList(classes).stream().filter(aClass -> aClass != null).map(clazz -> { + String path = Utilities.jarFinderGetJar(clazz); + if (path == null) { + throw new RuntimeException( + "Could not find jar for class " + clazz + " in order to ship it to the cluster."); + } + try { + if (!localFs.exists(new Path(path))) { + throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + return path; + }).collect(Collectors.toList())); + + if (jars.isEmpty()) { + return; + } + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java new file mode 100644 index 0000000000..aa3aba7e3e --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +public class KafkaPullerInputSplitTest +{ + private String topic = "my_topic"; + private KafkaPullerInputSplit expectedInputSplit; + + public KafkaPullerInputSplitTest() + { + this.expectedInputSplit = new KafkaPullerInputSplit(this.topic, 1, 50L, 56L, + new Path("/tmp") + ); + } + + @Test + public void testWriteRead() throws IOException + { + DataOutput output = new DataOutputBuffer(); + this.expectedInputSplit.write(output); + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit(); + DataInput input = new DataInputBuffer(); + ((DataInputBuffer) input).reset(((DataOutputBuffer) output).getData(), 0, ((DataOutputBuffer) output).getLength()); + kafkaPullerInputSplit.readFields(input); + Assert.assertEquals(this.expectedInputSplit, kafkaPullerInputSplit); + } + + + @Test + public void andRangeOverLapping() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 10, + 400, + new Path("/tmp") + ); + + KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit( + "test-topic", + 2, + 3, + 200, + new Path("/tmp") + ); + + + Assert.assertEquals( + new KafkaPullerInputSplit("test-topic", 2, 10, 200, new Path("/tmp")), + KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2) + ); + + + } + + + @Test + public void andRangeNonOverLapping() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 10, + 400, + new Path("/tmp") + ); + + KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit( + "test-topic", + 2, + 550, + 700, + new Path("/tmp") + ); + + + Assert.assertEquals(null, KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2)); + + + } + + @Test + public void orRange() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 300, + 400, + new Path("/tmp") + ); + + + KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit( + "test-topic", + 2, + 3, + 600, + new Path("/tmp") + ); + + + Assert.assertEquals( + kafkaPullerInputSplit2, + KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit2) + ); + + KafkaPullerInputSplit kafkaPullerInputSplit3 = new KafkaPullerInputSplit( + "test-topic", + 2, + 700, + 6000, + new Path("/tmp") + ); + + + Assert.assertEquals(new KafkaPullerInputSplit( + "test-topic", + 2, + 300, + 6000, + new Path("/tmp") + ), KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit3)); + } + + + @Test + public void copyOf() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 300, + 400, + new Path("/tmp") + ); + + KafkaPullerInputSplit copyOf = KafkaPullerInputSplit.copyOf(kafkaPullerInputSplit); + Assert.assertEquals(kafkaPullerInputSplit, copyOf); + Assert.assertTrue(kafkaPullerInputSplit != copyOf); + } + + @Test + public void TestClone() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 300, + 400, + new Path("/tmp") + ); + + KafkaPullerInputSplit clone = kafkaPullerInputSplit.clone(); + Assert.assertEquals(kafkaPullerInputSplit, clone); + Assert.assertTrue(clone != kafkaPullerInputSplit); + + } + + @Test + public void testSlice() + { + KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit( + "test-topic", + 2, + 300, + 400, + new Path("/tmp") + ); + List kafkaPullerInputSplitList = KafkaPullerInputSplit.slice(14, kafkaPullerInputSplit); + Assert.assertEquals( + kafkaPullerInputSplitList.stream() + .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset() + - kafkaPullerInputSplit1.getStartOffset()) + .sum(), + kafkaPullerInputSplit.getEndOffset() - kafkaPullerInputSplit.getStartOffset() + ); + Assert.assertTrue(kafkaPullerInputSplitList.stream() + .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getStartOffset() + == kafkaPullerInputSplit1.getStartOffset()) + .count() == 1); + Assert.assertTrue(kafkaPullerInputSplitList.stream() + .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getEndOffset() + == kafkaPullerInputSplit1.getEndOffset()) + .count() == 1); + + } +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java new file mode 100644 index 0000000000..73629aa557 --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -0,0 +1,376 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by Fernflower decompiler) +// + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.ImmutableList; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +public class KafkaRecordIteratorTest +{ + private static final Logger log = LoggerFactory.getLogger(KafkaRecordIteratorTest.class); + private static final String topic = "my_topic2"; + private static final List> RECORDS = new ArrayList(); + private static final int RECORD_NUMBER = 100; + private static final TopicPartition TOPIC_PARTITION = new TopicPartition("my_topic2", 0); + public static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8")); + private static ZkUtils zkUtils; + private static ZkClient zkClient; + private static KafkaProducer producer; + private static KafkaServer kafkaServer; + private static String zkConnect; + private KafkaConsumer consumer = null; + private KafkaRecordIterator kafkaRecordIterator = null; + private Configuration conf = new Configuration(); + + + public KafkaRecordIteratorTest() + { + } + + @BeforeClass + public static void setupCluster() throws IOException, InterruptedException + { + log.info("init embedded Zookeeper"); + EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); + zkConnect = "127.0.0.1:" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + log.info("init kafka broker"); + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092"); + brokerProps.setProperty("offsets.topic.replication.factor", "1"); + brokerProps.setProperty("log.retention.ms", "1000000"); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + log.info("Creating kafka topic [{}]", "my_topic2"); + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + setupProducer(); + sendData(); + } + + @Before + public void setUp() + { + log.info("setting up consumer"); + this.setupConsumer(); + this.kafkaRecordIterator = null; + } + + @Test + public void testHasNextAbsoluteStartEnd() + { + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, (long) RECORDS.size(), 100L); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test + public void testHasNextGivenStartEnd() + { + long startOffset = 2L; + long lastOffset = 4L; + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 2L, 4L, 100L); + this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> { + return consumerRecord.offset() >= 2L && consumerRecord.offset() < 4L; + }).collect(Collectors.toList()), this.kafkaRecordIterator); + } + + @Test + public void testHasNextNoOffsets() + { + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 100L); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test + public void testHasNextLastRecord() + { + long startOffset = (long) (RECORDS.size() - 1); + long lastOffset = (long) RECORDS.size(); + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, startOffset, lastOffset, 100L); + this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> { + return consumerRecord.offset() >= startOffset && consumerRecord.offset() < lastOffset; + }).collect(Collectors.toList()), this.kafkaRecordIterator); + } + + @Test + public void testHasNextFirstRecord() + { + long startOffset = 0L; + long lastOffset = 1L; + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 0L, 1L, 100L); + this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> { + return consumerRecord.offset() >= 0L && consumerRecord.offset() < 1L; + }).collect(Collectors.toList()), this.kafkaRecordIterator); + } + + @Test + public void testHasNextNoStart() + { + long startOffset = 0L; + long lastOffset = 10L; + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, (Long) null, 10L, 100L); + this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> { + return consumerRecord.offset() >= 0L && consumerRecord.offset() < 10L; + }).collect(Collectors.toList()), this.kafkaRecordIterator); + } + + @Test + public void testHasNextNoEnd() + { + long startOffset = 5L; + long lastOffset = (long) RECORDS.size(); + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 5L, (Long) null, 100L); + this.compareIterator((List) RECORDS.stream().filter((consumerRecord) -> { + return consumerRecord.offset() >= 5L && consumerRecord.offset() < lastOffset; + }).collect(Collectors.toList()), this.kafkaRecordIterator); + } + + @Test + public void testRecordReader() throws IOException, InterruptedException + { + InputSplit inputSplits = new KafkaPullerInputSplit("my_topic2", 0, 0L, 50L, null); + KafkaPullerRecordReader recordReader = new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplits, this.conf); + List serRecords = (List) RECORDS.stream().map((recordx) -> { + return KafkaRecordWritable.fromKafkaRecord(recordx); + }).collect(Collectors.toList()); + + for (int i = 0; i < 50; ++i) { + KafkaRecordWritable record = new KafkaRecordWritable(); + Assert.assertTrue(recordReader.next((NullWritable) null, record)); + Assert.assertEquals(serRecords.get(i), record); + } + + recordReader.close(); + recordReader = new KafkaPullerRecordReader(); + TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID()); + recordReader.initialize(new KafkaPullerInputSplit("my_topic2", 0, 50L, 100L, null), context); + + for (int i = 50; i < 100; ++i) { + KafkaRecordWritable record = new KafkaRecordWritable(); + Assert.assertTrue(recordReader.next((NullWritable) null, record)); + Assert.assertEquals(serRecords.get(i), record); + } + + recordReader.close(); + } + + @Test( + expected = IllegalStateException.class + ) + public void testPullingBeyondLimit() + { + this.kafkaRecordIterator = new KafkaRecordIterator( + this.consumer, + TOPIC_PARTITION, + 0L, + (long) RECORDS.size() + 1L, + 100L + ); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test( + expected = IllegalStateException.class + ) + public void testPullingStartGreaterThanEnd() + { + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 1L, 100L); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test( + expected = IllegalStateException.class + ) + public void testPullingFromEmptyTopic() + { + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, new TopicPartition("noHere", 0), 0L, 100L, 100L); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test( + expected = IllegalStateException.class + ) + public void testPullingFromEmptyPartition() + { + this.kafkaRecordIterator = new KafkaRecordIterator( + this.consumer, + new TopicPartition("my_topic2", 1), + 0L, + 100L, + 100L + ); + this.compareIterator(RECORDS, this.kafkaRecordIterator); + } + + @Test + public void testStartIsEqualEnd() + { + this.kafkaRecordIterator = new KafkaRecordIterator(this.consumer, TOPIC_PARTITION, 10L, 10L, 100L); + this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator); + } + + + @Test + public void testStartIsTheLastOffset() + { + this.kafkaRecordIterator = new KafkaRecordIterator( + this.consumer, + TOPIC_PARTITION, + new Long(RECORD_NUMBER), + new Long(RECORD_NUMBER), + 100L + ); + this.compareIterator(ImmutableList.of(), this.kafkaRecordIterator); + } + + private void compareIterator( + List> expected, + Iterator> kafkaRecordIterator + ) + { + expected.stream().forEachOrdered((expectedRecord) -> { + Assert.assertTrue("record with offset " + expectedRecord.offset(), kafkaRecordIterator.hasNext()); + ConsumerRecord record = kafkaRecordIterator.next(); + Assert.assertTrue(record.topic().equals(topic)); + Assert.assertTrue(record.partition() == 0); + Assert.assertEquals("Offsets not matching", expectedRecord.offset(), record.offset()); + byte[] binaryExceptedValue = expectedRecord.value(); + byte[] binaryExceptedKey = expectedRecord.key(); + byte[] binaryValue = (byte[]) record.value(); + byte[] binaryKey = (byte[]) record.key(); + Assert.assertArrayEquals(binaryExceptedValue, binaryValue); + Assert.assertArrayEquals(binaryExceptedKey, binaryKey); + }); + Assert.assertFalse(kafkaRecordIterator.hasNext()); + } + + private static void setupProducer() + { + log.info("Setting up kafka producer"); + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", "127.0.0.1:9092"); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.setProperty("max.block.ms", "10000"); + producer = new KafkaProducer(producerProps); + log.info("kafka producer started"); + } + + private void setupConsumer() + { + Properties consumerProps = new Properties(); + consumerProps.setProperty("enable.auto.commit", "false"); + consumerProps.setProperty("auto.offset.reset", "none"); + consumerProps.setProperty("bootstrap.servers", "127.0.0.1:9092"); + this.conf.set("kafka.bootstrap.servers", "127.0.0.1:9092"); + consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); + consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); + consumerProps.setProperty("request.timeout.ms", "3002"); + consumerProps.setProperty("fetch.max.wait.ms", "3001"); + consumerProps.setProperty("session.timeout.ms", "3001"); + consumerProps.setProperty("metadata.max.age.ms", "100"); + this.consumer = new KafkaConsumer(consumerProps); + } + + private static void sendData() throws InterruptedException + { + log.info("Sending {} records", RECORD_NUMBER); + RECORDS.clear(); + for (int i = 0; i < RECORD_NUMBER; ++i) { + + final byte[] value = ("VALUE-" + Integer.toString(i)).getBytes(Charset.forName("UTF-8")); + //noinspection unchecked + producer.send(new ProducerRecord( + topic, + 0, + 0L, + KEY_BYTES, + value + )); + + //noinspection unchecked + RECORDS.add(new ConsumerRecord( + topic, + 0, + (long) i, + 0L, + (TimestampType) null, + 0L, + 0, + 0, + KEY_BYTES, + value + )); + } + + producer.close(); + } + + @After + public void tearDown() + { + this.kafkaRecordIterator = null; + if (this.consumer != null) { + this.consumer.close(); + } + } + + @AfterClass + public static void tearDownCluster() + { + if (kafkaServer != null) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + + zkClient.close(); + zkUtils.close(); + } +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java new file mode 100644 index 0000000000..6811351439 --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordWritableTest.java @@ -0,0 +1,37 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by Fernflower decompiler) +// + +package org.apache.hadoop.hive.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class KafkaRecordWritableTest { + public KafkaRecordWritableTest() { + } + + @Test + public void testWriteReadFields() throws IOException { + ConsumerRecord record = new ConsumerRecord("topic", 0, 3L, "key".getBytes(), "value".getBytes()); + KafkaRecordWritable kafkaRecordWritable = KafkaRecordWritable.fromKafkaRecord(record); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream w = new DataOutputStream(baos); + kafkaRecordWritable.write(w); + w.flush(); + + ByteArrayInputStream input = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream inputStream = new DataInputStream(input); + KafkaRecordWritable actualKafkaRecordWritable = new KafkaRecordWritable(); + actualKafkaRecordWritable.readFields(inputStream); + Assert.assertEquals(kafkaRecordWritable, actualKafkaRecordWritable); + } +} diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java new file mode 100644 index 0000000000..232bfcbb96 --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaScanTrimmerTest.java @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.kafka; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.kafka.common.TopicPartition; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.junit.Assert.assertNotNull; + +public class KafkaScanTrimmerTest +{ + private static final Path PATH = new Path("/tmp"); + + private ExprNodeDesc zeroInt = ConstantExprBuilder.build(0); + private ExprNodeDesc threeInt = ConstantExprBuilder.build(3); + private ExprNodeDesc thirtyLong = ConstantExprBuilder.build(30L); + private ExprNodeDesc thirtyFiveLong = ConstantExprBuilder.build(35L); + private ExprNodeDesc seventyFiveLong = ConstantExprBuilder.build(75L); + private ExprNodeDesc fortyLong = ConstantExprBuilder.build(40L); + + private ExprNodeDesc partitionColumn = new ExprNodeColumnDesc( + TypeInfoFactory.intTypeInfo, + KafkaStorageHandler.__PARTITION, + null, + false + ); + private ExprNodeDesc offsetColumn = new ExprNodeColumnDesc( + TypeInfoFactory.longTypeInfo, + KafkaStorageHandler.__OFFSET, + null, + false + ); + /*private ExprNodeDesc timestampColumn = new ExprNodeColumnDesc( + TypeInfoFactory.longTypeInfo, + KafkaJsonSerDe.__TIMESTAMP, + null, + false + );*/ + + private String topic = "my_topic"; + private Map fullHouse = ImmutableMap.of( + new TopicPartition(topic, 0), + new KafkaPullerInputSplit( + topic, + 0, + 0, + 45, + PATH + ), + new TopicPartition(topic, 1), + new KafkaPullerInputSplit( + topic, + 1, + 5, + 1005, + PATH + ), + new TopicPartition(topic, 2), + new KafkaPullerInputSplit( + topic, + 2, + 9, + 100, + PATH + ), + new TopicPartition(topic, 3), + new KafkaPullerInputSplit( + topic, + 3, + 0, + 100, + PATH + ) + ); + + @Test + public void computeOptimizedScanPartitionBinaryOpFilter() + { + KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null); + int partitionId = 2; + ExprNodeDesc constant = ConstantExprBuilder.build(partitionId); + final List children = Lists.newArrayList(partitionColumn, constant); + + ExprNodeGenericFuncDesc node = EQ(children); + assertNotNull(node); + + Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(node))); + Map expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() == partitionId); + Assert.assertEquals(expected, actual); + + ExprNodeGenericFuncDesc lessNode = LESS_THAN(children); + assertNotNull(lessNode); + actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(SerializationUtilities.serializeExpression( + lessNode))); + expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() < partitionId); + Assert.assertEquals(expected, actual); + + + ExprNodeGenericFuncDesc lessEqNode = LESS_THAN_EQ(children); + + assertNotNull(lessEqNode); + actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression(SerializationUtilities.serializeExpression( + lessEqNode))); + expected = Maps.filterValues(fullHouse, tp -> Objects.requireNonNull(tp).getPartition() <= partitionId); + Assert.assertEquals(expected, actual); + + } + + + @Test + public void computeOptimizedScanFalseFilter() + { + KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null); + ExprNodeGenericFuncDesc falseFilter = AND(Lists.newArrayList( + EQ(Lists.newArrayList(partitionColumn, zeroInt)), + EQ(Lists.newArrayList(partitionColumn, threeInt)) + )); + + assertNotNull(falseFilter); + Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(falseFilter))); + Assert.assertTrue(actual.isEmpty()); + + ExprNodeGenericFuncDesc falseFilter2 = AND(Lists.newArrayList( + EQ(Lists.newArrayList(offsetColumn, thirtyFiveLong)), + EQ(Lists.newArrayList(offsetColumn, fortyLong)) + )); + + assertNotNull(falseFilter2); + actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(falseFilter2))); + Assert.assertTrue(actual.isEmpty()); + + ExprNodeGenericFuncDesc filter3 = OR(Lists.newArrayList(falseFilter, falseFilter2)); + + assertNotNull(filter3); + actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(filter3))); + Assert.assertTrue(actual.isEmpty()); + + ExprNodeGenericFuncDesc filter4 = AND(Lists.newArrayList( + filter3, + EQ(Lists.newArrayList(partitionColumn, zeroInt)) + )); + assertNotNull(filter4); + actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(filter4))); + Assert.assertTrue(actual.isEmpty()); + } + + @Test + public void computeOptimizedScanOrAndCombinedFilter() + { + KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null); + // partition = 0 and 30 <= offset < 35 or partition = 3 and 35 <= offset < 75 or (partition = 0 and offset = 40) + + + ExprNodeGenericFuncDesc part1 = AND(Lists.newArrayList( + GREATER_THAN_EQ(Lists.newArrayList(offsetColumn, thirtyLong)), + EQ(Lists.newArrayList(partitionColumn, zeroInt)), + LESS_THAN(Lists.newArrayList(offsetColumn, thirtyFiveLong)) + )); + + ExprNodeGenericFuncDesc part2 = AND(Lists.newArrayList( + GREATER_THAN_EQ(Lists.newArrayList(offsetColumn, thirtyFiveLong)), + EQ(Lists.newArrayList(partitionColumn, threeInt)), + LESS_THAN(Lists.newArrayList(offsetColumn, seventyFiveLong)) + )); + + ExprNodeGenericFuncDesc part3 = AND(Lists.newArrayList( + EQ(Lists.newArrayList(offsetColumn, fortyLong)), + EQ(Lists.newArrayList(partitionColumn, zeroInt)) + )); + + ExprNodeGenericFuncDesc orExpression = OR(Lists.newArrayList(part1, part2, part3)); + + assertNotNull(orExpression); + Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression( + orExpression))); + TopicPartition tpZero = new TopicPartition(topic, 0); + TopicPartition toThree = new TopicPartition(topic, 3); + KafkaPullerInputSplit split1 = new KafkaPullerInputSplit(topic, 0, 30, 41, PATH); + KafkaPullerInputSplit split2 = new KafkaPullerInputSplit(topic, 3, 35, 75, PATH); + + Map expected = ImmutableMap.of(tpZero, split1, toThree, split2); + Assert.assertEquals(expected, actual); + + + } + + @Test + public void computeOptimizedScanPartitionOrAndCombinedFilter() + { + KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouse, null); + + // partition = 1 or (partition >2 and <= 3) + ExprNodeGenericFuncDesc eq = EQ(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(1))); + ExprNodeGenericFuncDesc lessEq = LESS_THAN_EQ(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(3))); + ExprNodeGenericFuncDesc greater = GREATER_THAN(Lists.newArrayList(partitionColumn, ConstantExprBuilder.build(2))); + ExprNodeGenericFuncDesc orNode = OR(Lists.newArrayList(AND(Lists.newArrayList(lessEq, greater)), eq)); + + Map actual = kafkaScanTrimmer.computeOptimizedScan(SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(orNode))); + Map expected = Maps.filterValues( + fullHouse, + tp -> Objects.requireNonNull(tp).getPartition() == 1 || tp.getPartition() == 3 + ); + Assert.assertEquals(expected, actual); + assertNotNull(orNode); + } + + + @Test + public void buildScanFormPartitionPredicateEq() + { + Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.EQUALS, + 3, + false, + false + ); + TopicPartition topicPartition = new TopicPartition(topic, 3); + Assert.assertEquals(fullHouse.get(topicPartition), actual.get(topicPartition)); + } + + @Test + public void buildScanFormPartitionPredicateLess() + { + // partitionConst < partitionColumn (flip true) + int partitionConst = 2; + Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + partitionConst, + true, + false + ); + + Map expected = Maps.filterEntries( + fullHouse, + entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + // partitionConst >= partitionColumn (flip true, negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + partitionConst, + true, + true + ); + + expected = Maps.filterEntries( + fullHouse, + entry -> partitionConst >= Objects.requireNonNull(entry).getKey().partition() + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + // partitionColumn >= partitionConst (negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + partitionConst, + false, + true + ); + + expected = Maps.filterEntries( + fullHouse, + entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + // partitionColumn < partitionConst (negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + partitionConst, + false, + false + ); + + expected = Maps.filterEntries( + fullHouse, + entry -> Objects.requireNonNull(entry).getKey().partition() < partitionConst + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + } + + @Test + public void buildScanFormPartitionPredicateLessEq() + { + // partitionConst <= partitionColumn (flip true) + int partitionConst = 2; + Map actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + partitionConst, + true, + false + ); + + Map expected = Maps.filterEntries( + fullHouse, + entry -> Objects.requireNonNull(entry).getKey().partition() >= partitionConst + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + // partitionConst > partitionColumn (flip true, negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + partitionConst, + true, + true + ); + + expected = Maps.filterEntries( + fullHouse, + entry -> partitionConst > Objects.requireNonNull(entry).getKey().partition() + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + + // partitionColumn > partitionConst (negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + partitionConst, + false, + true + ); + + expected = Maps.filterEntries(fullHouse, entry -> Objects.requireNonNull(entry).getKey().partition() > partitionConst); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + + // partitionColumn <= partitionConst (negation true) + actual = KafkaScanTrimmer.buildScanFormPartitionPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + partitionConst, + false, + false + ); + + expected = Maps.filterEntries( + fullHouse, + entry -> Objects.requireNonNull(entry).getKey().partition() <= partitionConst + ); + Assert.assertEquals(expected, actual); + Assert.assertFalse(actual.isEmpty()); + } + + + @Test + public void buildScanFromOffsetPredicateEq() + { + long constantOffset = 30; + Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.EQUALS, + constantOffset, + false, + false + ); + Map expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + constantOffset, + constantOffset + 1, + entry.getPath() + ) + ); + + Assert.assertEquals(expected, actual); + + // seek to end if offset is out of reach + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.EQUALS, + 3000000L, + false, + false + ); + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getEndOffset(), + entry.getEndOffset(), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + // seek to end if offset is out of reach + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.EQUALS, + 0L, + false, + false + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getStartOffset() > 0 ? entry.getEndOffset() : 0, + entry.getStartOffset() > 0 ? entry.getEndOffset() : 1, + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + + } + + @Test + public void buildScanFromOffsetPredicateLess() + { + long constantOffset = 50; + // columnOffset < constant + Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + constantOffset, + false, + false + ); + + Map expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getStartOffset(), + Math.min(constantOffset, entry.getEndOffset()), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + + // columnOffset > constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + constantOffset, + true, + false + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)), + entry.getEndOffset(), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + // columnOffset >= constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + constantOffset, + false, + true + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)), + entry.getEndOffset(), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + +// columnOffset <= constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN, + constantOffset, + true, + true + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getStartOffset(), + Math.min(constantOffset + 1, entry.getEndOffset()), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + } + + @Test + public void buildScanFromOffsetPredicateLessEq() + { + long constantOffset = 50; + // columnOffset < constant + Map actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + constantOffset, + false, + false + ); + + Map expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getStartOffset(), + Math.min(constantOffset + 1, entry.getEndOffset()), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + // columnOffset >= constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + constantOffset, + true, + false + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset)), + entry.getEndOffset(), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + // columnOffset > constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + constantOffset, + false, + true + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + Math.min(entry.getEndOffset(), Math.max(entry.getStartOffset(), constantOffset + 1)), + entry.getEndOffset(), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + + // columnOffset < constant + actual = KafkaScanTrimmer.buildScanFromOffsetPredicate( + fullHouse, + PredicateLeaf.Operator.LESS_THAN_EQUALS, + constantOffset, + true, + true + ); + + expected = Maps.transformValues( + fullHouse, + entry -> new KafkaPullerInputSplit( + Objects.requireNonNull(entry).getTopic(), + entry.getPartition(), + entry.getStartOffset(), + Math.min(constantOffset, entry.getEndOffset()), + entry.getPath() + ) + ); + Assert.assertEquals(expected, actual); + } + + private static class ConstantExprBuilder + { + static ExprNodeDesc build(long constant) + { + return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant); + } + + static ExprNodeDesc build(int constant) + { + return new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, constant); + } + } + + + private static ExprNodeGenericFuncDesc OR(List children) + { + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPOr(), + children + ); + } + + private static ExprNodeGenericFuncDesc AND(List children) + { + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPAnd(), + children + ); + } + + private static ExprNodeGenericFuncDesc EQ(List children) + { + return new ExprNodeGenericFuncDesc( + children.get(0).getTypeInfo(), + new GenericUDFOPEqual(), + children + ); + } + + private static ExprNodeGenericFuncDesc LESS_THAN(List children) + { + return new ExprNodeGenericFuncDesc( + children.get(0).getTypeInfo(), + new GenericUDFOPLessThan(), + children + ); + } + + private static ExprNodeGenericFuncDesc LESS_THAN_EQ(List children) + { + return new ExprNodeGenericFuncDesc( + children.get(0).getTypeInfo(), + new GenericUDFOPEqualOrLessThan(), + children + ); + } + + private static ExprNodeGenericFuncDesc GREATER_THAN(List children) + { + return new ExprNodeGenericFuncDesc( + children.get(0).getTypeInfo(), + new GenericUDFOPGreaterThan(), + children + ); + } + + private static ExprNodeGenericFuncDesc GREATER_THAN_EQ(List children) + { + return new ExprNodeGenericFuncDesc( + children.get(0).getTypeInfo(), + new GenericUDFOPEqualOrGreaterThan(), + children + ); + } +} \ No newline at end of file diff --git kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java new file mode 100644 index 0000000000..add3f7137e --- /dev/null +++ kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaStreamingUtilsTest.java @@ -0,0 +1,29 @@ +// +// Source code recreated from a .class file by IntelliJ IDEA +// (powered by Fernflower decompiler) +// + +package org.apache.hadoop.hive.kafka; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Properties; + +public class KafkaStreamingUtilsTest { + public KafkaStreamingUtilsTest() { + } + + @Test + public void testConsumerProperties() { + Configuration configuration = new Configuration(); + configuration.set("kafka.bootstrap.servers", "localhost:9090"); + configuration.set("kafka.consumer.fetch.max.wait.ms", "40"); + configuration.set("kafka.consumer.my.new.wait.ms", "400"); + Properties properties = KafkaStreamingUtils.consumerProperties(configuration); + Assert.assertEquals("localhost:9090", properties.getProperty("bootstrap.servers")); + Assert.assertEquals("40", properties.getProperty("fetch.max.wait.ms")); + Assert.assertEquals("400", properties.getProperty("my.new.wait.ms")); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 0f1d5eea4c..ae1de26adc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -91,7 +91,7 @@ private static final String[] DEFAULT_AUX_CLASSES = new String[] { "org.apache.hive.hcatalog.data.JsonSerDe","org.apache.hadoop.hive.druid.DruidStorageHandler", "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory", - "org.apache.commons.pool.impl.GenericObjectPool" + "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler" }; private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe"; private static final String[] NEEDED_CONFIGS = LlapDaemonConfiguration.DAEMON_CONFIGS; diff --git packaging/pom.xml packaging/pom.xml index 5c859acfad..07f2382b03 100644 --- packaging/pom.xml +++ packaging/pom.xml @@ -213,6 +213,11 @@ hive-druid-handler ${project.version} + + org.apache.hive + hive-kafka-reader + ${project.version} + org.apache.hive hive-jdbc-handler diff --git pom.xml pom.xml index 7503cff532..4e1ab94186 100644 --- pom.xml +++ pom.xml @@ -63,6 +63,7 @@ packaging standalone-metastore upgrade-acid + kafka-handler diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java index da31f4d9a2..f39ba87a80 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java @@ -17,16 +17,6 @@ */ package org.apache.hadoop.hive.ql.index; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -44,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar; @@ -54,7 +45,16 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; /** * IndexPredicateAnalyzer decomposes predicates, separating the parts @@ -181,7 +181,7 @@ public Object process(Node nd, Stack stack, //Check if ExprNodeColumnDesc is wrapped in expr. //If so, peel off. Otherwise return itself. - private ExprNodeDesc getColumnExpr(ExprNodeDesc expr) { + private static ExprNodeDesc getColumnExpr(ExprNodeDesc expr) { if (expr instanceof ExprNodeColumnDesc) { return expr; } diff --git ql/src/test/queries/clientpositive/kafka_storage_handler.q ql/src/test/queries/clientpositive/kafka_storage_handler.q new file mode 100644 index 0000000000..3efdb859ad --- /dev/null +++ ql/src/test/queries/clientpositive/kafka_storage_handler.q @@ -0,0 +1,47 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE kafka_table +(`__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +; + +DESCRIBE EXTENDED kafka_table; + +Select `__partition` , `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table; + +Select count(*) FROM kafka_table; + +Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 1533960760123; +Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 533960760123; + +Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__offset` > 7 and `__partition` = 0 OR +`__offset` = 4 and `__partition` = 0 OR `__offset` <= 1 and `__partition` = 0; + +CREATE EXTERNAL TABLE kafka_table_2 +(`__time` timestamp with local time zone , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092"); + +Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +FROM kafka_table_2; + +Select count(*) FROM kafka_table_2; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out new file mode 100644 index 0000000000..a936a26aa7 --- /dev/null +++ ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out @@ -0,0 +1,196 @@ +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table +(`__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table +(`__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table +PREHOOK: query: DESCRIBE EXTENDED kafka_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@kafka_table +POSTHOOK: query: DESCRIBE EXTENDED kafka_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@kafka_table +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +__partition int from deserializer +__offset bigint from deserializer +__timestamp bigint from deserializer + +#### A masked pattern was here #### +PREHOOK: query: Select `__partition` , `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition` , `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select count(*) FROM kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 1533960760123 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 1533960760123 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 533960760123 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 533960760123 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__offset` > 7 and `__partition` = 0 OR +`__offset` = 4 and `__partition` = 0 OR `__offset` <= 1 and `__partition` = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__offset` > 7 and `__partition` = 0 OR +`__offset` = 4 and `__partition` = 0 OR `__offset` <= 1 and `__partition` = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_2 +(`__time` timestamp with local time zone , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_2 +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table_2 +(`__time` timestamp with local time zone , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_2 +PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +FROM kafka_table_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_2 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +FROM kafka_table_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_2 +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 2013-08-30 18:02:33.0 US/Pacific Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 2013-08-30 20:32:45.0 US/Pacific Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 2013-08-31 00:11:21.0 US/Pacific Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 2013-08-31 04:58:39.0 US/Pacific Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 2013-08-31 05:41:27.0 US/Pacific Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 2013-08-31 18:02:33.0 US/Pacific Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 2013-08-31 20:32:45.0 US/Pacific Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 2013-09-01 00:11:21.0 US/Pacific Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 2013-09-01 04:58:39.0 US/Pacific Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 2013-09-01 05:41:27.0 US/Pacific Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select count(*) FROM kafka_table_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_2 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM kafka_table_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_2 +POSTHOOK: Output: hdfs://### HDFS PATH ### +10