From 3f68e79c7dc494c9d79c66d92f4c76afc717934c Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 4 Oct 2018 00:44:39 +0530 Subject: [PATCH] [HIVE-20686] Sync query IDs between druid and Hive --- .../hive/druid/io/DruidQueryBasedInputFormat.java | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index f5009a2776..587ee09a34 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.druid.io; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.collect.Lists; import io.druid.java.util.http.client.Request; import io.druid.query.BaseQuery; @@ -45,6 +43,7 @@ import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -110,6 +109,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS ); + String queryId = HiveConf.getVar(conf,HiveConf.ConfVars.HIVEQUERYID); if (StringUtils.isEmpty(address)) { throw new IOException("Druid broker address not specified in configuration"); } @@ -135,6 +135,11 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } } + // Add Hive Query ID to Druid Query + if(queryId != null){ + druidQuery = withQueryId(druidQuery, queryId); + } + // hive depends on FileSplits Job job = new Job(conf); JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); @@ -147,7 +152,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) case Query.TIMESERIES: case Query.TOPN: case Query.GROUP_BY: - return new HiveDruidSplit[] { new HiveDruidSplit(deserializeSerialize(druidQuery), + return new HiveDruidSplit[] { new HiveDruidSplit(druidQuery, paths[0], new String[] {address}) }; case Query.SELECT: SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( @@ -269,11 +274,11 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) return segmentDescriptors; } - private static String deserializeSerialize(String druidQuery) - throws JsonParseException, JsonMappingException, IOException { - BaseQuery deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( - druidQuery, BaseQuery.class); - return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(deserializedQuery); + private static String withQueryId(String druidQuery, String queryId) + throws IOException { + Query queryWithId = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + druidQuery, BaseQuery.class).withId(queryId); + return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(queryWithId); } @Override -- 2.15.2 (Apple Git-101.1)