diff --git itests/qtest-druid/pom.xml itests/qtest-druid/pom.xml
index e566fcf4d7..19cdf918b2 100644
--- itests/qtest-druid/pom.xml
+++ itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
10.11.1.116.0.14.1.0
- 1.0.1
+ 2.0.0
@@ -207,6 +207,11 @@
kafka_2.11${kafka.version}
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+
diff --git kafka-handler/README.md kafka-handler/README.md
new file mode 100644
index 0000000000..706c77ae25
--- /dev/null
+++ kafka-handler/README.md
@@ -0,0 +1,217 @@
+#Kafka Storage Handler Module
+
+Storage Handler that allows user to Connect/Analyse/Transform Kafka topics.
+The workflow is as follow, first the user will create an external table that is a view over one Kafka topic,
+then the user will be able to run any SQL query including write back to the same table or different kafka backed table.
+
+## Usage
+
+### Create Table
+Use following statement to create table:
+```sql
+CREATE EXTERNAL TABLE kafka_table
+(`timestamp` timestamp , `page` string, `newPage` boolean,
+ added int, deleted bigint, delta double)
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
+```
+Table property `kafka.topic` is the Kafka Topic to connect to and `kafka.bootstrap.servers` is the Broker connection string.
+Both properties are mandatory.
+On the write path if such a topic does not exists the topic will be created if Kafka broker admin policy allow such operation.
+
+By default the serializer and deserializer is Json `org.apache.hadoop.hive.serde2.JsonSerDe`.
+If you want to switch serializer/deserializer classes you can use alter table.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+List of supported Serializer Deserializer:
+
+|Supported Serializer Deserializer|
+|-----|
+|org.apache.hadoop.hive.serde2.JsonSerDe|
+|org.apache.hadoop.hive.serde2.OpenCSVSerde|
+|org.apache.hadoop.hive.serde2.avro.AvroSerDe|
+|org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe|
+|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|
+
+#### Table definition
+In addition to the user defined payload schema Kafka Storage Handler will append additional columns allowing user to query the Kafka metadata fields:
+- `__key` Kafka record key (byte array)
+- `__partition` Kafka record partition identifier (int 32)
+- `__offset` Kafka record offset (int 64)
+- `__timestamp` Kafka record timestamp (int 64)
+
+
+### Query Table
+
+List the table properties and all the partition/offsets information for the topic.
+```sql
+Describe extended kafka_table;
+```
+
+Count the number of records with Kafka record timestamp within the last 10 minutes interval.
+
+```sql
+SELECT count(*) from kafka_table
+where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
+```
+The storage handler allow filter push-down read optimization,
+for instance the query above will only read the records with timestamp satisfying the filter predicate.
+Please note that such time based seek is only viable if the Kafka broker allow time based lookup (Kafka 0.11 or later versions)
+
+In addition to **time based seek**, the storage handler reader is able to seek to a particular partition offset using the SQL WHERE clause.
+Currently only support OR/AND with (<, <=, >=, >)
+
+```sql
+SELECT count(*) from kafka_table
+where (`__offset` < 10 and `__offset`>3 and `__partition` = 0)
+or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99)
+or (`__offset` = 109);
+```
+
+User can define a view to take of the last 15 minutes and mask what ever column as follow:
+
+```sql
+CREATE VIEW last_15_minutes_of_kafka_table as select `timestamp`, `user`, delta, added from kafka_table
+where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES);
+```
+
+Join the Kafka Stream to Hive table. For instance assume you want to join the last 15 minutes of stream to dimension table like the following.
+```sql
+CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
+```
+
+Join the view of the last 15 minutes to `user_table`, group by user gender column and compute aggregates
+over metrics from fact table and dimension table.
+
+```sql
+SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, avg(age) as avg_age , gender
+FROM last_15_minutes_of_kafka_table join user_table on `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
+GROUP BY gender limit 10;
+```
+
+
+Join the Stream to the Stream it self. In cases where you want to perform some Ad-Hoc query over the last 15 minutes view.
+In the following example we show how you can perform classical user retention analysis over the Kafka Stream.
+```sql
+-- Steam join over the view it self
+-- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
+-- Assuming l15min_wiki is a view of the last 15 minutes
+select count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from l15min_wiki as activity
+left join l15min_wiki as future_activity on
+ activity.`user` = future_activity.`user`
+ and activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ;
+
+-- Stream to stream join
+-- Assuming wiki_kafka_hive is the entire stream.
+select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from wiki_kafka_hive as activity
+left join wiki_kafka_hive as future_activity on
+ activity.`user` = future_activity.`user`
+ and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour group by floor_hour(activity.`timestamp`);
+
+```
+
+#Configuration
+
+## Table Properties
+
+| Property | Description | Mandatory | Default |
+|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
+| kafka.topic | Kafka topic name to map the table to. | Yes | null |
+| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
+| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
+| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
+| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
+| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
+| kafka.write.semantic | Writer semantic, allowed values (BEST_EFFORT, AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
+| hive.kafka.optimistic.commit | Boolean value indicate the if the producer should commit during task or delegate the commit to HS2. | No | true |
+
+### Setting Extra Consumer/Producer properties.
+User can inject custom Kafka consumer/producer properties via the Table properties.
+To do so user can add any key/value pair of Kafka config to the Hive table property
+by prefixing the key with `kafka.consumer` for consumer configs and `kafka.producer` for producer configs.
+For instance the following alter table query adds the table property `"kafka.consumer.max.poll.records" = "5000"`
+and will inject `max.poll.records=5000` to the Kafka Consumer.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.consumer.max.poll.records"="5000");
+```
+
+#Kafka to Hive ETL PIPE LINE
+
+load form Kafka every Record exactly once
+Goal is to read data and commit both data and its offsets in a single Transaction
+
+First create the offset table.
+```sql
+Drop table kafka_table_offsets;
+create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
+```
+
+Initialize the table
+```sql
+insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP
+from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ;
+```
+Create the end target table on the Hive warehouse.
+```sql
+Drop table orc_kafka_table;
+Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint,
+ `timestamp` timestamp , `page` string, `user` string, `diffurl` string,
+ `isrobot` boolean, added int, deleted int, delta bigint
+) stored as ORC;
+```
+This an example tp insert up to offset = 2 only
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+Double check the insert
+```sql
+select max(`koffset`) from orc_kafka_table limit 10;
+select count(*) as c from orc_kafka_table group by partition_id, koffset having c > 1;
+```
+
+Repeat this periodically to insert all data.
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+#ETL from Hive to Kafka
+
+##INSERT INTO
+First create the table in have that will be the target table. Now all the inserts will go to the topic mapped by this Table.
+
+```sql
+CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
+(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "moving_avg_wiki_kafka_hive_2",
+"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092",
+-- STORE AS AVRO IN KAFKA
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+
+Then insert data into the table. Keep in mind that Kafka is an append only, thus you can not use insert overwrite.
+```sql
+insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`,
+avg(delta) over (order by `timestamp` asc rows between 60 preceding and current row) as avg_delta,
+null as `__key`, null as `__partition`, -1, -1,-1, -1 from l15min_wiki;
+```
diff --git kafka-handler/pom.xml kafka-handler/pom.xml
index 6c58bf1df1..f907e9ddf0 100644
--- kafka-handler/pom.xml
+++ kafka-handler/pom.xml
@@ -30,7 +30,7 @@
..
- 1.0.1
+ 2.0.0kafka-handler
@@ -38,12 +38,18 @@
Hive Kafka Storage Handler
-
+
org.apache.hivehive-execprovided${project.version}
+
+
+ org.slf4j
+ slf4j-api
+
+ com.google.guava
@@ -52,10 +58,22 @@
org.apache.hadoophadoop-common
+
+
+ org.slf4j
+ slf4j-api
+
+ org.apache.hadoophadoop-client
+
+
+ org.slf4j
+ slf4j-api
+
+ org.apache.kafka
@@ -90,6 +108,12 @@
${kafka.version}test
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+ test
+
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
deleted file mode 100644
index a0c79b3a9f..0000000000
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.JsonSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.rmi.server.UID;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
- * Json or any class that supports {@link BytesWritable}.
- */
-public class GenericKafkaSerDe extends AbstractSerDe {
- private static final Logger LOG = LoggerFactory.getLogger(GenericKafkaSerDe.class);
-
- private AbstractSerDe delegateSerDe;
- private ObjectInspector objectInspector;
- private final List columnNames = Lists.newArrayList();
- private StructObjectInspector delegateObjectInspector;
- private final UID uid = new UID();
- @SuppressWarnings("Guava") private Supplier> gdrSupplier;
-
- @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
- final String className = tbl.getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName());
- delegateSerDe = KafkaStreamingUtils.createDelegate(className);
- //noinspection deprecation
- delegateSerDe.initialize(conf, tbl);
- LOG.debug("Using SerDe instance {}", delegateSerDe.getClass().getCanonicalName());
-
- if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
- throw new SerDeException("Was expecting StructObject Inspector but have " + delegateSerDe.getObjectInspector()
- .getClass()
- .getName());
- }
- delegateObjectInspector = (StructObjectInspector) delegateSerDe.getObjectInspector();
-
- // Build column names Order matters here
- columnNames.addAll(delegateObjectInspector.getAllStructFieldRefs()
- .stream()
- .map(StructField::getFieldName)
- .collect(Collectors.toList()));
- columnNames.addAll(KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES);
-
- final List inspectors = new ArrayList<>(columnNames.size());
- inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs()
- .stream()
- .map(StructField::getFieldObjectInspector)
- .collect(Collectors.toList()));
- inspectors.addAll(KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS);
- objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
-
- // lazy supplier to read Avro Records if needed
- gdrSupplier = getReaderSupplier(tbl);
- }
-
- @Override public Class extends Writable> getSerializedClass() {
- return delegateSerDe.getSerializedClass();
- }
-
- @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- return delegateSerDe.serialize(obj, objInspector);
- }
-
- @Override public SerDeStats getSerDeStats() {
- return delegateSerDe.getSerDeStats();
- }
-
- @Override public Object deserialize(Writable blob) throws SerDeException {
- KafkaRecordWritable record = (KafkaRecordWritable) blob;
- // switch case the serde nature
- final Object row;
- if (delegateSerDe instanceof JsonSerDe) {
- //@TODO Text constructor copies the data, this op is not needed
- row = delegateSerDe.deserialize(new Text(record.getValue()));
- } else if (delegateSerDe instanceof AvroSerDe) {
- AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable();
- GenericRecord avroRecord;
- try {
- avroRecord = gdrSupplier.get().read(null, DecoderFactory.get().binaryDecoder(record.getValue(), null));
- avroGenericRecordWritable.setRecord(avroRecord);
- avroGenericRecordWritable.setRecordReaderID(uid);
- avroGenericRecordWritable.setFileSchema(avroRecord.getSchema());
- } catch (IOException e) {
- throw new SerDeException(e);
- }
- row = delegateSerDe.deserialize(avroGenericRecordWritable);
- } else {
- // default assuming delegate Serde know how to deal with
- row = delegateSerDe.deserialize(new BytesWritable(record.getValue()));
- }
-
- return columnNames.stream().map(name -> {
- Function metaColumnMapper = KafkaStreamingUtils.recordWritableFnMap.get(name);
- if (metaColumnMapper != null) {
- return metaColumnMapper.apply(record);
- }
- return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name));
- }).collect(Collectors.toList());
- }
-
- @Override public ObjectInspector getObjectInspector() {
- return objectInspector;
- }
-
- @SuppressWarnings("Guava") private Supplier> getReaderSupplier(Properties tbl) {
- return Suppliers.memoize(() -> {
- String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
- Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
- Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
- LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
- return new SpecificDatumReader<>(schema);
- });
- }
-}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
new file mode 100644
index 0000000000..2270e08e2c
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
+ * This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
+ * Class is mostly taken from Apache Flink Project:
+ * org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer
+ *
+ * @param key serializer class.
+ * @param value serializer class.
+ */
+class HiveKafkaProducer implements Producer {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducer.class);
+
+ private final KafkaProducer kafkaProducer;
+
+ @Nullable private final String transactionalId;
+
+ HiveKafkaProducer(Properties properties) {
+ transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+ @Override public void initTransactions() {
+ kafkaProducer.initTransactions();
+ }
+
+ @Override public void beginTransaction() throws ProducerFencedException {
+ kafkaProducer.beginTransaction();
+ }
+
+ @Override public void commitTransaction() throws ProducerFencedException {
+ kafkaProducer.commitTransaction();
+ }
+
+ @Override public void abortTransaction() throws ProducerFencedException {
+ kafkaProducer.abortTransaction();
+ }
+
+ @Override public void sendOffsetsToTransaction(Map offsets, String consumerGroupId)
+ throws ProducerFencedException {
+ kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ }
+
+ @Override public Future send(ProducerRecord record) {
+ return kafkaProducer.send(record);
+ }
+
+ @Override public Future send(ProducerRecord record, Callback callback) {
+ return kafkaProducer.send(record, callback);
+ }
+
+ @Override public List partitionsFor(String topic) {
+ return kafkaProducer.partitionsFor(topic);
+ }
+
+ @Override public Map metrics() {
+ return kafkaProducer.metrics();
+ }
+
+ @Override public void close() {
+ kafkaProducer.close();
+ }
+
+ @Override public void close(long timeout, TimeUnit unit) {
+ kafkaProducer.close(timeout, unit);
+ }
+
+ @Override public void flush() {
+ kafkaProducer.flush();
+ if (transactionalId != null) {
+ flushNewPartitions();
+ }
+ }
+
+ /**
+ * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
+ * so that we can resume transaction after a restart. Implementation of this method is based on
+ * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+ */
+ synchronized void resumeTransaction(long producerId, short epoch) {
+ Preconditions.checkState(producerId >= 0 && epoch >= 0,
+ "Incorrect values for producerId {} and epoch {}",
+ producerId,
+ epoch);
+ LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
+
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+
+ Object nextSequence = getValue(transactionManager, "nextSequence");
+ Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(nextSequence, "clear");
+ invoke(lastAckedSequence, "clear");
+
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted", true);
+ }
+
+ @Nullable String getTransactionalId() {
+ return transactionalId;
+ }
+
+ long getProducerId() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ return (long) getValue(producerIdAndEpoch, "producerId");
+ }
+
+ short getEpoch() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ return (short) getValue(producerIdAndEpoch, "epoch");
+ }
+
+ /**
+ * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
+ * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
+ * resumeTransaction simpler.
+ * Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" partitions.
+ */
+ private void flushNewPartitions() {
+ LOG.info("Flushing new partitions");
+ TransactionalRequestResult result = enqueueNewPartitions();
+ Object sender = getValue(kafkaProducer, "sender");
+ invoke(sender, "wakeup");
+ result.await();
+ }
+
+ private synchronized TransactionalRequestResult enqueueNewPartitions() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager,
+ "enqueueRequest",
+ new Class[] {txnRequestHandler.getClass().getSuperclass()},
+ new Object[] {txnRequestHandler});
+ return (TransactionalRequestResult) getValue(txnRequestHandler,
+ txnRequestHandler.getClass().getSuperclass(),
+ "result");
+ }
+
+ @SuppressWarnings("unchecked") private static Enum> getEnum(String enumFullName) {
+ @SuppressWarnings("RegExpRedundantEscape") String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+ if (x.length == 2) {
+ String enumClassName = x[0];
+ String enumName = x[1];
+ try {
+ Class cl = (Class) Class.forName(enumClassName);
+ return Enum.valueOf(cl, enumName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+ return null;
+ }
+
+ private static Object invoke(Object object, String methodName, Object... args) {
+ Class>[] argTypes = new Class[args.length];
+ for (int i = 0; i < args.length; i++) {
+ argTypes[i] = args[i].getClass();
+ }
+ return invoke(object, methodName, argTypes, args);
+ }
+
+ private static Object invoke(Object object, String methodName, Class>[] argTypes, Object[] args) {
+ try {
+ Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+ method.setAccessible(true);
+ return method.invoke(object, args);
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+
+ private static Object getValue(Object object, String fieldName) {
+ return getValue(object, object.getClass(), fieldName);
+ }
+
+ private static Object getValue(Object object, Class> clazz, String fieldName) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(object);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+
+ private static void setValue(Object object, String fieldName, Object value) {
+ try {
+ Field field = object.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(object, value);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
similarity index 80%
rename from kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
rename to kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
index 2d5637d430..c401df9850 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
@@ -55,13 +55,13 @@
* The input split will contain the set of topic partition and start/end offsets.
* Records will be returned as bytes array.
*/
-public class KafkaPullerInputFormat extends InputFormat
- implements org.apache.hadoop.mapred.InputFormat {
+@SuppressWarnings("WeakerAccess") public class KafkaInputFormat extends InputFormat
+ implements org.apache.hadoop.mapred.InputFormat {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaInputFormat.class);
@Override public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
- List inputSplits;
+ List inputSplits;
try {
inputSplits = computeSplits(jobConf);
} catch (InterruptedException e) {
@@ -83,14 +83,19 @@
*
* @return full scan input split collection based on Kafka metadata APIs
*/
- private static List buildFullScanFromKafka(String topic,
+ private static List buildFullScanFromKafka(String topic,
KafkaConsumer consumer,
- Path[] tablePaths) {
+ Path[] tablePaths, int maxTries) {
final Map starOffsetsMap;
final Map endOffsetsMap;
final List topicPartitions;
- topicPartitions = fetchTopicPartitions(topic, consumer);
+ RetryUtils.Task> fetchTPTask = () -> fetchTopicPartitions(topic, consumer);
+ try {
+ topicPartitions = RetryUtils.retry(fetchTPTask, (error) -> !KafkaUtils.exceptionIsFatal(error), maxTries);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
starOffsetsMap = consumer.beginningOffsets(topicPartitions);
endOffsetsMap = consumer.endOffsets(topicPartitions);
@@ -101,7 +106,7 @@
endOffsetsMap.forEach((tp, end) -> LOG.info("TPartition [{}],End offsets [{}]", tp, end));
}
return topicPartitions.stream()
- .map(topicPartition -> new KafkaPullerInputSplit(topicPartition.topic(),
+ .map(topicPartition -> new KafkaInputSplit(topicPartition.topic(),
topicPartition.partition(),
starOffsetsMap.get(topicPartition),
endOffsetsMap.get(topicPartition),
@@ -109,25 +114,23 @@
.collect(Collectors.toList());
}
- private List computeSplits(Configuration configuration)
+ private List computeSplits(Configuration configuration)
throws IOException, InterruptedException {
- // this will be used to harness some KAFKA blocking calls
+ // ExecutorService is used to harness some KAFKA blocking calls and interrupt after some duration
final ExecutorService execService = Executors.newSingleThreadExecutor();
- try (KafkaConsumer consumer = new KafkaConsumer(KafkaStreamingUtils.consumerProperties(configuration))) {
- final String topic = configuration.get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
- final long
- timeoutMs =
- configuration.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
- KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
+
+ try (KafkaConsumer consumer = new KafkaConsumer(KafkaUtils.consumerProperties(configuration))) {
+ final String topic = configuration.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+ final long timeoutMs = configuration.getLong(KafkaTableProperties.KAFKA_FETCH_METADATA_TIMEOUT.getName(), -1);
+ final int maxTries = configuration.getInt(KafkaTableProperties.MAX_RETRIES.getName(), -1);
// hive depends on FileSplits
JobConf jobConf = new JobConf(configuration);
Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
+ final Future> futureFullHouse;
//noinspection unchecked
- Future>
- futureFullHouse =
- execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths));
- List fullHouse;
+ futureFullHouse = execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths, maxTries));
+ final List fullHouse;
try {
fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException e) {
@@ -137,7 +140,7 @@
throw new IOException(e);
}
- @SuppressWarnings("unchecked") final ImmutableMap.Builder
+ @SuppressWarnings("unchecked") final ImmutableMap.Builder
fullHouseMapBuilder =
new ImmutableMap.Builder();
fullHouse.forEach(input -> fullHouseMapBuilder.put(new TopicPartition(input.getTopic(), input.getPartition()),
@@ -149,14 +152,14 @@
if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) {
ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
LOG.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString());
- Callable>
+ Callable>
trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr)
.entrySet()
.stream()
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- Future> futureTinyHouse = execService.submit(trimmerWorker);
+ Future> futureTinyHouse = execService.submit(trimmerWorker);
try {
return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS)
.stream()
@@ -184,10 +187,10 @@
return partitions.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
}
- @Override public RecordReader getRecordReader(InputSplit inputSplit,
+ @Override public RecordReader getRecordReader(InputSplit inputSplit,
JobConf jobConf,
Reporter reporter) {
- return new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplit, jobConf);
+ return new KafkaRecordReader((KafkaInputSplit) inputSplit, jobConf);
}
@Override public List getSplits(JobContext jobContext)
@@ -197,9 +200,9 @@
.collect(Collectors.toList());
}
- @Override public org.apache.hadoop.mapreduce.RecordReader createRecordReader(
+ @Override public org.apache.hadoop.mapreduce.RecordReader createRecordReader(
org.apache.hadoop.mapreduce.InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
- return new KafkaPullerRecordReader();
+ return new KafkaRecordReader();
}
}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
similarity index 79%
rename from kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
rename to kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
index 697469c9e0..cb1f4df1f5 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
@@ -34,18 +34,18 @@
/**
* Kafka Hadoop Input Split Class.
*/
-@SuppressWarnings("WeakerAccess") public class KafkaPullerInputSplit extends FileSplit
+@SuppressWarnings("WeakerAccess") public class KafkaInputSplit extends FileSplit
implements org.apache.hadoop.mapred.InputSplit {
private String topic;
private long startOffset;
private int partition;
private long endOffset;
- public KafkaPullerInputSplit() {
+ public KafkaInputSplit() {
super(null, 0, 0, (String[]) null);
}
- public KafkaPullerInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
+ public KafkaInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
super(dummyPath, 0, 0, (String[]) null);
this.topic = topic;
this.startOffset = startOffset;
@@ -109,8 +109,8 @@ public long getEndOffset() {
*
* @return new split that represents range intersection or null if it is not overlapping
*/
- @Nullable public static KafkaPullerInputSplit intersectRange(KafkaPullerInputSplit split1,
- KafkaPullerInputSplit split2) {
+ @Nullable public static KafkaInputSplit intersectRange(KafkaInputSplit split1,
+ KafkaInputSplit split2) {
assert (split1.topic.equals(split2.topic));
assert (split1.partition == split2.partition);
final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset());
@@ -119,7 +119,7 @@ public long getEndOffset() {
// there is no overlapping
return null;
}
- return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+ return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
}
/**
@@ -130,22 +130,22 @@ public long getEndOffset() {
*
* @return new split with a range including both splits.
*/
- public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, KafkaPullerInputSplit split2) {
+ public static KafkaInputSplit unionRange(KafkaInputSplit split1, KafkaInputSplit split2) {
assert (split1.topic.equals(split2.topic));
assert (split1.partition == split2.partition);
final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset());
final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset());
- return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+ return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
}
@Override public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (!(o instanceof KafkaPullerInputSplit)) {
+ if (!(o instanceof KafkaInputSplit)) {
return false;
}
- KafkaPullerInputSplit that = (KafkaPullerInputSplit) o;
+ KafkaInputSplit that = (KafkaInputSplit) o;
return Objects.equal(getTopic(), that.getTopic())
&& Objects.equal(getStartOffset(), that.getStartOffset())
&& Objects.equal(getPartition(), that.getPartition())
@@ -157,7 +157,7 @@ public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, Kaf
}
@Override public String toString() {
- return "KafkaPullerInputSplit{"
+ return "KafkaInputSplit{"
+ "topic='"
+ topic
+ '\''
@@ -172,24 +172,20 @@ public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, Kaf
+ '}';
}
- public static KafkaPullerInputSplit copyOf(KafkaPullerInputSplit other) {
- return new KafkaPullerInputSplit(other.getTopic(),
+ public static KafkaInputSplit copyOf(KafkaInputSplit other) {
+ return new KafkaInputSplit(other.getTopic(),
other.getPartition(),
other.getStartOffset(),
other.getEndOffset(),
other.getPath());
}
- @SuppressWarnings("MethodDoesntCallSuperMethod") public KafkaPullerInputSplit clone() {
- return copyOf(this);
- }
-
- public static List slice(long sliceSize, final KafkaPullerInputSplit split) {
+ public static List slice(long sliceSize, final KafkaInputSplit split) {
if (split.getEndOffset() - split.getStartOffset() > sliceSize) {
- ImmutableList.Builder builder = ImmutableList.builder();
+ ImmutableList.Builder builder = ImmutableList.builder();
long start = split.getStartOffset();
while (start < split.getEndOffset() - sliceSize) {
- builder.add(new KafkaPullerInputSplit(split.topic,
+ builder.add(new KafkaInputSplit(split.topic,
split.partition,
start,
start + sliceSize + 1,
@@ -198,7 +194,7 @@ public static KafkaPullerInputSplit copyOf(KafkaPullerInputSplit other) {
}
// last split
if (start < split.getEndOffset()) {
- builder.add(new KafkaPullerInputSplit(split.topic,
+ builder.add(new KafkaInputSplit(split.topic,
split.partition,
start,
split.getEndOffset(),
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
index 5f0143de09..228225cc2b 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
@@ -78,15 +78,14 @@
* Basic JsonSerDe to make use of such storage handler smooth and easy and testing basic primitive Json.
* For production please use Hive native JsonSerde.
*/
-public class KafkaJsonSerDe extends AbstractSerDe {
+@SuppressWarnings("unused") class KafkaJsonSerDe extends AbstractSerDe {
private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonSerDe.class);
private static final ThreadLocal
TS_PARSER =
ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser);
- private static final Function
- typeInfoToObjectInspector =
- typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
- typeInfo.getTypeName()));
+ private static final Function TYPEINFO_TO_OI =
+ typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()));
private List columnNames;
private List columnTypes;
private ObjectInspector inspector;
@@ -118,7 +117,7 @@
LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes);
}
- inspectors = columnTypes.stream().map(typeInfoToObjectInspector).collect(Collectors.toList());
+ inspectors = columnTypes.stream().map(TYPEINFO_TO_OI).collect(Collectors.toList());
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -236,8 +235,8 @@ private static DateTimeFormatter createAutoParser() {
DateTimeParser
timeOrOffset =
new DateTimeFormatterBuilder().append(null,
- new DateTimeParser[] { new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
- new DateTimeFormatterBuilder().appendLiteral(' ').toParser() })
+ new DateTimeParser[] {new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
+ new DateTimeFormatterBuilder().appendLiteral(' ').toParser()})
.appendOptional(ISODateTimeFormat.timeElementParser().getParser())
.appendOptional(offsetElement.getParser())
.toParser();
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
new file mode 100644
index 0000000000..950f7315c2
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Kafka Hive Output Format class used to write Hive Rows to a Kafka Queue.
+ */
+public class KafkaOutputFormat implements HiveOutputFormat {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class);
+
+ @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc,
+ Path finalOutPath,
+ Class extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress) {
+ final String topic = jc.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+ final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), true);
+ final WriteSemantic
+ writeSemantic =
+ WriteSemantic.valueOf(jc.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()));
+ final Properties producerProperties = KafkaUtils.producerProperties(jc);
+ final FileSinkOperator.RecordWriter recordWriter;
+ switch (writeSemantic) {
+ case BEST_EFFORT:
+ recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), false, producerProperties);
+ break;
+ case AT_LEAST_ONCE:
+ recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), true, producerProperties);
+ break;
+ case EXACTLY_ONCE:
+ FileSystem fs;
+ try {
+ fs = finalOutPath.getFileSystem(jc);
+ } catch (IOException e) {
+ LOG.error("Can not construct file system instance", e);
+ throw new RuntimeException(e);
+ }
+ final String queryId = Preconditions.checkNotNull(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname, null));
+ recordWriter =
+ new TransactionalKafkaWriter(topic, producerProperties,
+ new Path(Preconditions.checkNotNull(finalOutPath), queryId),
+ fs,
+ optimisticCommit);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown delivery semantic [%s]", writeSemantic.toString()));
+ }
+ return recordWriter;
+ }
+
+ @Override public RecordWriter getRecordWriter(FileSystem fileSystem,
+ JobConf jobConf,
+ String s,
+ Progressable progressable) {
+ throw new RuntimeException("this is not suppose to be here");
+ }
+
+ @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) {
+
+ }
+
+ /**
+ * Possible write semantic supported by the Record Writer.
+ */
+ enum WriteSemantic {
+ /**
+ * Best effort delivery with no guarantees at all, user can set Producer properties as they wish,
+ * will carry on when possible unless it is a fatal exception.
+ */
+ BEST_EFFORT,
+ /**
+ * Deliver all the record at least once unless the job fails.
+ * Therefore duplicates can be introduced due to lost ACKs or Tasks retries.
+ * Currently this is the default.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Deliver every record exactly once.
+ */
+ EXACTLY_ONCE,
+ }
+}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index c252455348..2225f19a4d 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -25,24 +25,31 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
- * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end.
+ * This class implements an Iterator over a single Kafka topic partition.
+ *
+ * Notes:
+ * The user of this class has to provide a functional Kafka Consumer and then has to clean it afterward.
+ * The user of this class is responsible for thread safety if the provided consumer is shared across threads.
+ *
*/
-public class KafkaRecordIterator implements Iterator> {
+class KafkaRecordIterator implements Iterator> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
private static final String
POLL_TIMEOUT_HINT =
String.format("Try increasing poll timeout using Hive Table property [%s]",
- KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT);
+ KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName());
private static final String
ERROR_POLL_TIMEOUT_FORMAT =
"Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] "
@@ -54,30 +61,38 @@
private final long endOffset;
private final long startOffset;
private final long pollTimeoutMs;
+ private final Duration pollTimeoutDurationMs;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private ConsumerRecords records;
+ /**
+ * Holds the kafka consumer position after the last poll() call.
+ */
private long consumerPosition;
private ConsumerRecord nextRecord;
private boolean hasMore = true;
+ /**
+ * On each Kafka Consumer poll() call we get a batch of records, this Iterator will be used to loop over it.
+ */
private Iterator> consumerRecordIterator = null;
/**
- * Iterator over Kafka Records over a single {@code topicPartition} inclusive {@code startOffset},
- * up to exclusive {@code endOffset}.
- *
- * If {@code requestedStartOffset} is not null will seek up to that offset
- * Else If {@code requestedStartOffset} is null will seek to beginning see
- * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
- *
- * When provided with {@code requestedEndOffset}, will return records up to consumer position == endOffset
- * Else If {@code requestedEndOffset} is null it will read up to the current end of the stream
- * {@link org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)}
- *
- * @param consumer functional kafka consumer.
- * @param topicPartition kafka topic partition.
- * @param requestedStartOffset requested start position.
- * @param requestedEndOffset requested end position. If null will read up to current last
- * @param pollTimeoutMs poll time out in ms.
+ * Kafka record Iterator pulling from a single {@code topicPartition} an inclusive {@code requestedStartOffset},
+ * up to exclusive {@code requestedEndOffset}.
+ *
+ * This iterator can block on polling up to a designated timeout.
+ *
+ * If no record is returned by brokers after poll timeout duration such case will be considered as an exception.
+ * Although the timeout exception it is a retryable exception, therefore users of this class can retry if needed.
+ *
+ * @param consumer Functional kafka consumer, user must initialize this and close it.
+ * @param topicPartition Target Kafka topic partition.
+ * @param requestedStartOffset Requested start offset position, if NULL iterator will seek to beginning using:
+ * {@link Consumer#seekToBeginning(java.util.Collection)}.
+ *
+ * @param requestedEndOffset Requested end position. If null will read up to last available offset,
+ * such position is given by:
+ * {@link Consumer#seekToEnd(java.util.Collection)}.
+ * @param pollTimeoutMs positive number indicating poll time out in ms.
*/
KafkaRecordIterator(Consumer consumer,
TopicPartition topicPartition,
@@ -87,6 +102,7 @@
this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null");
this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
this.pollTimeoutMs = pollTimeoutMs;
+ this.pollTimeoutDurationMs = Duration.ofMillis(pollTimeoutMs);
Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number");
final List topicPartitionList = Collections.singletonList(topicPartition);
// assign topic partition to consumer
@@ -138,7 +154,12 @@
}
/**
- * @throws IllegalStateException if the kafka consumer poll call can not reach the target offset.
+ * Check if there is more records to be consumed and pull more from the broker if current batch of record is empty.
+ * This method might block up to {@link this#pollTimeoutMs} to pull records from Kafka Broker.
+ *
+ * @throws PollTimeoutException if poll returns 0 record and consumer position did not reach requested endOffset.
+ * Such an exception is a retryable exception, and it can be a transient exception that if retried may succeed.
+ *
* @return true if has more records to be consumed.
*/
@Override public boolean hasNext() {
@@ -158,20 +179,20 @@ Need to poll at least one more record (consumerPosition < endOffset) AND consume
/**
* Poll more records from the Kafka Broker.
*
- * @throws IllegalStateException if no records returned before consumer position reaches target end offset.
+ * @throws PollTimeoutException if poll returns 0 record and consumer's position < requested endOffset.
*/
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
- records = consumer.poll(pollTimeoutMs);
+ records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
- throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
+ throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
pollTimeoutMs,
topicPartition.toString(),
startOffset,
@@ -201,4 +222,12 @@ private void findNext() {
nextRecord = null;
}
}
+
+ static final class PollTimeoutException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ PollTimeoutException(String message) {
+ super(message);
+ }
+ }
}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
similarity index 81%
rename from kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
rename to kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
index 06a10b4b8b..746de61273 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
@@ -36,14 +36,14 @@
/**
* Kafka Records Reader implementation.
*/
-@SuppressWarnings("UnstableApiUsage") public class KafkaPullerRecordReader extends RecordReader
- implements org.apache.hadoop.mapred.RecordReader {
+@SuppressWarnings("WeakerAccess") public class KafkaRecordReader extends RecordReader
+ implements org.apache.hadoop.mapred.RecordReader {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerRecordReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
private KafkaConsumer consumer = null;
private Configuration config = null;
- private KafkaRecordWritable currentWritableValue;
+ private KafkaWritable currentWritableValue;
private Iterator> recordsCursor = null;
private long totalNumberRecords = 0L;
@@ -53,13 +53,13 @@
private long startOffset = -1L;
private long endOffset = Long.MAX_VALUE;
- @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader() {
+ @SuppressWarnings("WeakerAccess") public KafkaRecordReader() {
}
private void initConsumer() {
if (consumer == null) {
LOG.info("Initializing Kafka Consumer");
- final Properties properties = KafkaStreamingUtils.consumerProperties(config);
+ final Properties properties = KafkaUtils.consumerProperties(config);
String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
Preconditions.checkNotNull(brokerString, "broker end point can not be null");
LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString);
@@ -67,25 +67,26 @@ private void initConsumer() {
}
}
- @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader(KafkaPullerInputSplit inputSplit,
+ @SuppressWarnings("WeakerAccess") public KafkaRecordReader(KafkaInputSplit inputSplit,
Configuration jobConf) {
initialize(inputSplit, jobConf);
}
- private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configuration jobConf) {
+ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) {
if (!started) {
this.config = jobConf;
startOffset = inputSplit.getStartOffset();
endOffset = inputSplit.getEndOffset();
TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
- "Start [%s] has to be positive and less or equal than End [%s]", startOffset, endOffset);
+ "Start [%s] has to be positive and less or equal than End [%s]",
+ startOffset,
+ endOffset);
totalNumberRecords += endOffset - startOffset;
initConsumer();
long
pollTimeout =
- config.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
- KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
+ config.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1);
LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
this.recordsCursor =
startOffset == endOffset ?
@@ -95,12 +96,11 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura
}
}
- @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit,
- TaskAttemptContext context) {
- initialize((KafkaPullerInputSplit) inputSplit, context.getConfiguration());
+ @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext context) {
+ initialize((KafkaInputSplit) inputSplit, context.getConfiguration());
}
- @Override public boolean next(NullWritable nullWritable, KafkaRecordWritable bytesWritable) {
+ @Override public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) {
if (started && recordsCursor.hasNext()) {
ConsumerRecord record = recordsCursor.next();
bytesWritable.set(record, startOffset, endOffset);
@@ -115,8 +115,8 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura
return NullWritable.get();
}
- @Override public KafkaRecordWritable createValue() {
- return new KafkaRecordWritable();
+ @Override public KafkaWritable createValue() {
+ return new KafkaWritable();
}
@Override public long getPos() {
@@ -124,7 +124,7 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura
}
@Override public boolean nextKeyValue() {
- currentWritableValue = new KafkaRecordWritable();
+ currentWritableValue = new KafkaWritable();
if (next(NullWritable.get(), currentWritableValue)) {
return true;
}
@@ -136,7 +136,7 @@ private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configura
return NullWritable.get();
}
- @Override public KafkaRecordWritable getCurrentValue() {
+ @Override public KafkaWritable getCurrentValue() {
return Preconditions.checkNotNull(currentWritableValue);
}
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
index 8fbdfdab11..256796a9a4 100644
--- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -63,14 +63,14 @@
*/
class KafkaScanTrimmer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaScanTrimmer.class);
- private final Map fullHouse;
+ private final Map fullHouse;
private final KafkaConsumer kafkaConsumer;
/**
* @param fullHouse initial full scan to be pruned, this is a map of Topic partition to input split.
* @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed
*/
- KafkaScanTrimmer(Map fullHouse, KafkaConsumer kafkaConsumer) {
+ KafkaScanTrimmer(Map fullHouse, KafkaConsumer kafkaConsumer) {
this.fullHouse = fullHouse;
this.kafkaConsumer = kafkaConsumer;
}
@@ -83,8 +83,8 @@
*
* @return tiny house of of the full house based on filter expression
*/
- Map computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
- Map optimizedScan = parseAndOptimize(filterExpression);
+ Map computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
+ Map optimizedScan = parseAndOptimize(filterExpression);
if (LOG.isDebugEnabled()) {
if (optimizedScan != null) {
@@ -113,7 +113,7 @@
*
* @return Map of optimized kafka range scans or null if it is impossible to optimize.
*/
- @Nullable private Map parseAndOptimize(ExprNodeDesc expression) {
+ @Nullable private Map parseAndOptimize(ExprNodeDesc expression) {
if (expression.getClass() != ExprNodeGenericFuncDesc.class) {
return null;
}
@@ -154,7 +154,7 @@
*
* @return leaf scan or null if can not figure out push down
*/
- @Nullable private Map pushLeaf(ExprNodeGenericFuncDesc expr,
+ @Nullable private Map pushLeaf(ExprNodeGenericFuncDesc expr,
PredicateLeaf.Operator operator,
boolean negation) {
if (expr.getChildren().size() != 2) {
@@ -192,8 +192,7 @@
constantDesc = (ExprNodeConstantDesc) extracted[0];
}
-
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.PARTITION.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.PARTITION.getName())) {
return buildScanFromPartitionPredicate(fullHouse,
operator,
((Number) constantDesc.getValue()).intValue(),
@@ -201,7 +200,7 @@
negation);
}
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.OFFSET.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.OFFSET.getName())) {
return buildScanFromOffsetPredicate(fullHouse,
operator,
((Number) constantDesc.getValue()).longValue(),
@@ -209,7 +208,7 @@
negation);
}
- if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.TIMESTAMP.getName())) {
+ if (columnDesc.getColumn().equals(MetadataColumn.TIMESTAMP.getName())) {
long timestamp = ((Number) constantDesc.getValue()).longValue();
//noinspection unchecked
return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer);
@@ -229,8 +228,8 @@
* @return filtered kafka scan
*/
- @VisibleForTesting static Map buildScanFromPartitionPredicate(
- Map fullScan,
+ @VisibleForTesting static Map buildScanFromPartitionPredicate(Map fullScan,
PredicateLeaf.Operator operator,
int partitionConst,
boolean flip,
@@ -262,12 +261,12 @@
predicate = topicPartition -> true;
}
- ImmutableMap.Builder builder = ImmutableMap.builder();
+ ImmutableMap.Builder builder = ImmutableMap.builder();
// Filter full scan based on predicate
fullScan.entrySet()
.stream()
.filter(entry -> predicate.test(entry.getKey()))
- .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone()));
+ .forEach(entry -> builder.put(entry.getKey(), KafkaInputSplit.copyOf(entry.getValue())));
return builder.build();
}
@@ -280,8 +279,8 @@
*
* @return optimized kafka scan
*/
- @VisibleForTesting static Map buildScanFromOffsetPredicate(Map fullScan,
+ @VisibleForTesting static Map buildScanFromOffsetPredicate(Map fullScan,
PredicateLeaf.Operator operator,
long offsetConst,
boolean flip,
@@ -320,54 +319,50 @@
endOffset = -1;
}
- final Map newScan = new HashMap<>();
+ final Map newScan = new HashMap<>();
fullScan.forEach((tp, existingInputSplit) -> {
- final KafkaPullerInputSplit newInputSplit;
+ final KafkaInputSplit newInputSplit;
if (startOffset != -1 && endOffset == -1) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(),
+ newInputSplit = new KafkaInputSplit(tp.topic(),
tp.partition(),
//if the user ask for start offset > max offset will replace with last offset
Math.min(startOffset, existingInputSplit.getEndOffset()),
existingInputSplit.getEndOffset(),
existingInputSplit.getPath());
} else if (endOffset != -1 && startOffset == -1) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
+ newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
//@TODO check this, if user ask for non existing end offset ignore it and position head on start
// This can be an issue when doing ingestion from kafka into Hive, what happen if there is some gaps
// Shall we fail the ingest or carry-on and ignore non existing offsets
Math.max(endOffset, existingInputSplit.getStartOffset()), existingInputSplit.getPath());
} else if (endOffset == startOffset + 1) {
if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) {
- newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(),
+ newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(),
// non existing offset will be seeking last offset
existingInputSplit.getEndOffset(), existingInputSplit.getEndOffset(), existingInputSplit.getPath());
} else {
newInputSplit =
- new KafkaPullerInputSplit(tp.topic(),
- tp.partition(),
- startOffset,
- endOffset,
- existingInputSplit.getPath());
+ new KafkaInputSplit(tp.topic(), tp.partition(), startOffset, endOffset, existingInputSplit.getPath());
}
} else {
newInputSplit =
- new KafkaPullerInputSplit(tp.topic(),
+ new KafkaInputSplit(tp.topic(),
tp.partition(),
existingInputSplit.getStartOffset(),
existingInputSplit.getEndOffset(),
existingInputSplit.getPath());
}
- newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit));
+ newScan.put(tp, KafkaInputSplit.intersectRange(newInputSplit, existingInputSplit));
});
return newScan;
}
- @Nullable private static Map buildScanForTimesPredicate(
- Map fullHouse,
+ @Nullable private static Map buildScanForTimesPredicate(
+ Map fullHouse,
PredicateLeaf.Operator operator,
long timestamp,
boolean flip,
@@ -385,11 +380,11 @@
// NULL will be returned for that partition If the message format version in a partition is before 0.10.0
Map offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap);
return Maps.toMap(fullHouse.keySet(), tp -> {
- KafkaPullerInputSplit existing = fullHouse.get(tp);
+ KafkaInputSplit existing = fullHouse.get(tp);
OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp);
//Null in case filter doesn't match or field not existing ie old broker thus return empty scan.
final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset();
- return new KafkaPullerInputSplit(Objects.requireNonNull(tp).topic(),
+ return new KafkaInputSplit(Objects.requireNonNull(tp).topic(),
tp.partition(),
startOffset,
existing.getEndOffset(),
@@ -410,21 +405,21 @@
*
* @return either full scan or an optimized sub scan.
*/
- private Map pushAndOp(ExprNodeGenericFuncDesc expr) {
- Map currentScan = new HashMap<>();
+ private Map pushAndOp(ExprNodeGenericFuncDesc expr) {
+ Map currentScan = new HashMap<>();
- fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaPullerInputSplit.copyOf(input)));
+ fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaInputSplit.copyOf(input)));
for (ExprNodeDesc child : expr.getChildren()) {
- Map scan = parseAndOptimize(child);
+ Map scan = parseAndOptimize(child);
if (scan != null) {
Set currentKeys = ImmutableSet.copyOf(currentScan.keySet());
currentKeys.forEach(key -> {
- KafkaPullerInputSplit newSplit = scan.get(key);
- KafkaPullerInputSplit oldSplit = currentScan.get(key);
+ KafkaInputSplit newSplit = scan.get(key);
+ KafkaInputSplit oldSplit = currentScan.get(key);
currentScan.remove(key);
if (newSplit != null) {
- KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit);
+ KafkaInputSplit intersectionSplit = KafkaInputSplit.intersectRange(newSplit, oldSplit);
if (intersectionSplit != null) {
currentScan.put(key, intersectionSplit);
}
@@ -436,18 +431,18 @@
return currentScan;
}
- @Nullable private Map pushOrOp(ExprNodeGenericFuncDesc expr) {
- final Map currentScan = new HashMap<>();
+ @Nullable private Map pushOrOp(ExprNodeGenericFuncDesc expr) {
+ final Map currentScan = new HashMap<>();
for (ExprNodeDesc child : expr.getChildren()) {
- Map scan = parseAndOptimize(child);
+ Map scan = parseAndOptimize(child);
if (scan == null) {
// if any of the children is unknown bailout
return null;
}
scan.forEach((tp, input) -> {
- KafkaPullerInputSplit existingSplit = currentScan.get(tp);
- currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
+ KafkaInputSplit existingSplit = currentScan.get(tp);
+ currentScan.put(tp, KafkaInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
});
}
return currentScan;
diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
new file mode 100644
index 0000000000..51cfa24929
--- /dev/null
+++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.rmi.server.UID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
+ * Json or any class that supports {@link BytesWritable}.
+ * I the user which to implement their own serde all they need is to implement a serde that extend
+ * {@link org.apache.hadoop.hive.serde2.AbstractSerDe} and accept {@link BytesWritable} as value
+ */
+@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES }) public class KafkaSerDe
+ extends AbstractSerDe {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSerDe.class);
+
+ /**
+ * Delegate SerDe used to Serialize and DeSerialize data form/to Kafka.
+ */
+ private AbstractSerDe delegateSerDe;
+
+ /**
+ * Delegate Object Inspector used to Deserialize the row, this OI is constructed by the {@code delegateSerDe}.
+ */
+ private StructObjectInspector delegateDeserializerOI;
+
+ /**
+ * Delegate Object Inspector used to Serialize the row as byte array.
+ */
+ private StructObjectInspector delegateSerializerOI;
+
+ /**
+ * Object Inspector of original row plus metadata.
+ */
+ private ObjectInspector objectInspector;
+ private final List columnNames = Lists.newArrayList();
+ private BytesConverter bytesConverter;
+
+ @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
+ //This method is called before {@link org.apache.hadoop.hive.kafka.KafkaStorageHandler.preCreateTable}
+ //Thus we need to default to org.apache.hadoop.hive.kafka.KafkaUtils.DEFAULT_PROPERTIES if any property is needed
+ final String
+ className =
+ tbl.getProperty(KafkaTableProperties.SERDE_CLASS_NAME.getName(),
+ KafkaTableProperties.SERDE_CLASS_NAME.getDefaultValue());
+ delegateSerDe = KafkaUtils.createDelegate(className);
+ //noinspection deprecation
+ delegateSerDe.initialize(conf, tbl);
+
+ if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
+ throw new SerDeException("Was expecting Struct Object Inspector but have " + delegateSerDe.getObjectInspector()
+ .getClass()
+ .getName());
+ }
+ delegateDeserializerOI = (StructObjectInspector) delegateSerDe.getObjectInspector();
+
+ // Build column names Order matters here
+ columnNames.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+ .stream()
+ .map(StructField::getFieldName)
+ .collect(Collectors.toList()));
+ columnNames.addAll(MetadataColumn.KAFKA_METADATA_COLUMN_NAMES);
+
+ final List inspectors = new ArrayList<>(columnNames.size());
+ inspectors.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+ .stream()
+ .map(StructField::getFieldObjectInspector)
+ .collect(Collectors.toList()));
+ inspectors.addAll(MetadataColumn.KAFKA_METADATA_INSPECTORS);
+ objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+
+ // Setup Read and Write Path From/To Kafka
+ if (delegateSerDe.getSerializedClass() == Text.class) {
+ bytesConverter = new TextBytesConverter();
+ } else if (delegateSerDe.getSerializedClass() == AvroGenericRecordWritable.class) {
+ String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
+ Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
+ Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
+ LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
+ bytesConverter = new AvroBytesConverter(schema);
+ } else {
+ bytesConverter = new BytesWritableConverter();
+ }
+ }
+
+ @Override public Class extends Writable> getSerializedClass() {
+ return delegateSerDe.getSerializedClass();
+ }
+
+ @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+
+ if (!(objInspector instanceof StructObjectInspector)) {
+ throw new SerDeException("Object inspector has to be "
+ + StructObjectInspector.class.getName()
+ + " but got "
+ + objInspector.getClass().getName());
+ }
+ StructObjectInspector structObjectInspector = (StructObjectInspector) objInspector;
+ List