From 1199928f98f2dea14d06e33ecc76c0220a5afcdf Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 4 Oct 2018 00:44:39 +0530 Subject: [PATCH] [HIVE-20686] Sync query IDs between druid and Hive --- .../druid/io/DruidQueryBasedInputFormat.java | 21 +++++++++++-------- .../TestHiveDruidQueryBasedInputFormat.java | 8 +++---- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index f5009a2776..1e33c2072b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.druid.io; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.collect.Lists; import io.druid.java.util.http.client.Request; import io.druid.query.BaseQuery; @@ -110,6 +108,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS ); + String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); if (StringUtils.isEmpty(address)) { throw new IOException("Druid broker address not specified in configuration"); } @@ -135,6 +134,11 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } } + // Add Hive Query ID to Druid Query + if (queryId != null) { + druidQuery = withQueryId(druidQuery, queryId); + } + // hive depends on FileSplits Job job = new Job(conf); JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); @@ -147,8 +151,8 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) case Query.TIMESERIES: case Query.TOPN: case Query.GROUP_BY: - return new HiveDruidSplit[] { new HiveDruidSplit(deserializeSerialize(druidQuery), - paths[0], new String[] {address}) }; + return new HiveDruidSplit[] { + new HiveDruidSplit(druidQuery, paths[0], new String[] { address }) }; case Query.SELECT: SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( druidQuery, SelectQuery.class); @@ -269,11 +273,10 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) return segmentDescriptors; } - private static String deserializeSerialize(String druidQuery) - throws JsonParseException, JsonMappingException, IOException { - BaseQuery deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( - druidQuery, BaseQuery.class); - return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(deserializedQuery); + private static String withQueryId(String druidQuery, String queryId) throws IOException { + Query queryWithId = + DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, BaseQuery.class).withId(queryId); + return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(queryWithId); } @Override diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index bb43d512c4..7da3a307db 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -49,7 +49,7 @@ + "\"granularity\":\"DAY\"," + "\"aggregations\":[]," + "\"postAggregations\":[]," - + "\"context\":null}, [localhost:8082]}]"; + + "\"context\":{\"queryId\":\"\"}}, [localhost:8082]}]"; private static final String TOPN_QUERY = "{ \"queryType\": \"topN\", " @@ -86,7 +86,7 @@ + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\",\"expression\":null}," + "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\",\"expression\":null}]," + "\"postAggregations\":[]," - + "\"context\":null," + + "\"context\":{\"queryId\":\"\"}," + "\"descending\":false}, [localhost:8082]}]"; private static final String GROUP_BY_QUERY = @@ -119,7 +119,7 @@ + "\"having\":null," + "\"limitSpec\":{\"type\":\"default\",\"columns\":[{\"dimension\":\"country\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}," + "{\"dimension\":\"data_transfer\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}],\"limit\":5000}," - + "\"context\":null," + + "\"context\":{\"queryId\":\"\"}," + "\"descending\":false}, [localhost:8082]}]"; private static final String SELECT_QUERY = @@ -149,7 +149,7 @@ + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"]," + "\"virtualColumns\":[]," + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false}," - + "\"context\":{\"druid.query.fetch\":true}}, [localhost:8082]}]"; + + "\"context\":{\"druid.query.fetch\":true,\"queryId\":\"\"}}, [localhost:8082]}]"; @Test public void testTimeZone() throws Exception { -- 2.17.1 (Apple Git-112)