From 18c5f6a516bca521fca476468878cca7eac81944 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 14 Jun 2018 02:41:38 +0530 Subject: [PATCH] [HIVE-19885] Allow user to set kafka consumer properties --- .../java/org/apache/hadoop/hive/conf/Constants.java | 1 + .../apache/hadoop/hive/druid/DruidStorageHandler.java | 16 ++++++++++++++-- .../hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java | 2 -- .../org/apache/hadoop/hive/cli/control/CliConfigs.java | 2 -- .../main/java/org/apache/hadoop/hive/ql/QTestUtil.java | 2 +- .../test/queries/clientpositive/druidkafkamini_basic.q | 5 +++-- .../clientpositive/druid/druidkafkamini_basic.q.out | 18 +++++++++++------- 7 files changed, 30 insertions(+), 16 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 3d79eec2fd..807d6bc5cc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -53,6 +53,7 @@ public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; /* Kafka Ingestion state - valid values - START/STOP/RESET */ public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index fc5a5fa062..57e4800bad 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -407,8 +407,7 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), - ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, - kafka_servers), // Mandatory Property + getKafkaConsumerProperties(table, kafka_servers), // Mandatory Property getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), @@ -420,6 +419,19 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String ); } + private static Map getKafkaConsumerProperties(Table table, String kafka_servers) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafka_servers); + for (Map.Entry entry : table.getParameters().entrySet()) { + if (entry.getKey().startsWith(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) { + String propertyName = entry.getKey() + .substring(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length()); + builder.put(propertyName, entry.getValue()); + } + } + return builder.build(); + } + private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { try { String task = JSON_MAPPER.writeValueAsString(spec); diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java index e2d26ab726..4768975225 100644 --- a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -22,7 +22,6 @@ import org.junit.ClassRule; import org.junit.Rule; -import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TestRule; import org.junit.runner.RunWith; @@ -32,7 +31,6 @@ import java.io.File; import java.util.List; -@Ignore("HIVE-19509: Disable tests that are failing continuously") @RunWith(Parameterized.class) public class TestMiniDruidKafkaCliDriver { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index fddd40fa43..59a78d93dc 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -199,8 +199,6 @@ public MiniDruidKafkaCliConfig() { includesFrom(testConfigProps, "druid.kafka.query.files"); - excludeQuery("druidkafkamini_basic.q"); // Disabled in HIVE-19509 - setResultsDir("ql/src/test/results/clientpositive/druid"); setLogDir("itests/qtest/target/tmp/log"); diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 2365fb76bd..f19a3ad208 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -772,7 +772,7 @@ public void shutdown() throws Exception { cleanUp(); } - if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ && SessionState.get().getTezSession() != null) { SessionState.get().getTezSession().destroy(); } if (druidCluster != null) { diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q index 229a20cf81..814890a64f 100644 --- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -9,8 +9,9 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT20S", - "druid.kafka.ingestion.period" = "PT1S" + "druid.kafka.ingestion.taskDuration" = "PT60S", + "druid.kafka.ingestion.period" = "PT1S", + "druid.kafka.ingestion.consumer.retries" = "2" ); ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out index 2e6d768d6b..07439743f0 100644 --- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -8,8 +8,9 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, ` "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT20S", - "druid.kafka.ingestion.period" = "PT1S" + "druid.kafka.ingestion.taskDuration" = "PT60S", + "druid.kafka.ingestion.period" = "PT1S", + "druid.kafka.ingestion.consumer.retries" = "2" ) PREHOOK: type: CREATETABLE PREHOOK: Output: database:default @@ -24,8 +25,9 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT20S", - "druid.kafka.ingestion.period" = "PT1S" + "druid.kafka.ingestion.taskDuration" = "PT60S", + "druid.kafka.ingestion.period" = "PT1S", + "druid.kafka.ingestion.consumer.retries" = "2" ) POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default @@ -141,7 +143,7 @@ kafkaPartitions=1 activeTasks=[] publishingTasks=[] latestOffsets={0=10} -minimumLag={} +minimumLag={0=0} aggregateLag=0 #### A masked pattern was here #### PREHOOK: query: Select count(*) FROM druid_kafka_test @@ -346,10 +348,11 @@ STAGE PLANS: druid.datasource default.druid_kafka_test druid.fieldNames language,user druid.fieldTypes string,string + druid.kafka.ingestion.consumer.retries 2 druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT20S + druid.kafka.ingestion.taskDuration PT60S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} @@ -385,10 +388,11 @@ STAGE PLANS: druid.datasource default.druid_kafka_test druid.fieldNames language,user druid.fieldTypes string,string + druid.kafka.ingestion.consumer.retries 2 druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT20S + druid.kafka.ingestion.taskDuration PT60S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} -- 2.15.1 (Apple Git-101)