From b9a1b8e51808aee6f9e0b35b6369870955e8f632 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 23 Jan 2018 21:55:25 +0530 Subject: [PATCH] [HIVE-18518] Upgrade to druid version 0.11 --- druid-handler/pom.xml | 2 +- .../hive/druid/DruidStorageHandlerUtils.java | 2 +- .../druid/serde/HiveDruidSerializationModule.java | 5 ++ .../hive/druid/serde/IntervalSerializer.java | 30 +++++++++ .../druid/serde/LegacySegmentSpecDeSerializer.java | 71 ++++++++++++++++++++++ .../druid/TestHiveDruidQueryBasedInputFormat.java | 8 +-- .../hadoop/hive/ql/io/TestDruidRecordWriter.java | 2 +- itests/qtest-druid/pom.xml | 4 +- pom.xml | 4 +- 9 files changed, 117 insertions(+), 11 deletions(-) create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/serde/IntervalSerializer.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/serde/LegacySegmentSpecDeSerializer.java diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index 2a62b90eea..670d82b6f9 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -29,7 +29,7 @@ .. - 0.27.10 + 1.3.2 16.0.1 diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 9de0097fa5..2f956b179b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -24,12 +24,12 @@ import com.google.common.base.Throwables; import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import com.metamx.common.JodaUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java index f72fd0d107..ff369b3643 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hive.druid.serde; import io.druid.java.util.common.granularity.PeriodGranularity; +import io.druid.query.spec.LegacySegmentSpec; import com.fasterxml.jackson.core.util.VersionUtil; import com.fasterxml.jackson.databind.module.SimpleModule; +import org.joda.time.Interval; + /** * This class is used to define/override any serde behavior for classes from druid. * Currently it is used to override the default behavior when serializing PeriodGranularity to include user timezone. @@ -33,5 +36,7 @@ public HiveDruidSerializationModule() { super(NAME, VERSION_UTIL.version()); addSerializer(PeriodGranularity.class, new PeriodGranularitySerializer()); + addDeserializer(LegacySegmentSpec.class, new LegacySegmentSpecDeSerializer()); + addDeserializer(Interval.class, new IntervalDeserializer()); } } \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/IntervalSerializer.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/IntervalSerializer.java new file mode 100644 index 0000000000..80489aee29 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/IntervalSerializer.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.hive.druid.serde; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import org.joda.time.Interval; + +import java.io.IOException; + +/** + * In druid 0.11.0 onwards, Druid deserializes intervals in UTC timezone. + * So doing a serde of Druid does not convert intervals to user local timezone as expected by HIVE. + * This is a workaround for making the query serde pickup hive local timezone in interval. + */ +class IntervalDeserializer extends StdDeserializer +{ + public IntervalDeserializer() + { + super(Interval.class); + } + + @Override + public Interval deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException, JsonProcessingException + { + return new Interval(jsonParser.getText()); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/LegacySegmentSpecDeSerializer.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/LegacySegmentSpecDeSerializer.java new file mode 100644 index 0000000000..55f9af6d16 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/LegacySegmentSpecDeSerializer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import io.druid.query.spec.LegacySegmentSpec; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.google.common.collect.Lists; + +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + * In druid 0.11.0 onwards, Druid deserializes intervals in UTC timezone. + * So doing a serde of Druid does not convert intervals to user local timezone as expected by HIVE. + * This is a workaround for making the query serde pickup hive local timezone in interval. + */ +public class LegacySegmentSpecDeSerializer extends JsonDeserializer { + + @Override + public LegacySegmentSpec deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + List intervals = Lists.newArrayList(); + boolean jsonObject = false; + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) { + // Case when json is of object form instead of legacy string. + jsonObject = true; + jsonParser.nextToken(); + } + while (!jsonParser.getCurrentName().equals("intervals")) { + jsonParser.nextToken(); + } + if (jsonParser.getCurrentToken() != JsonToken.START_ARRAY) { + throw new IllegalStateException(); + } + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + intervals.add(jsonParser.getValueAsString()); + } + if (jsonObject) { + while (jsonParser.nextToken() != JsonToken.END_OBJECT) { + // We are only interested in intervals ignore all other fields + continue; + } + } + return new LegacySegmentSpec( + Lists.transform(intervals, interval -> new Interval(interval))); + } +} + + diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index 514dba3aff..1f167dbc41 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -42,7 +42,7 @@ private static final String TIMESERIES_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"timeseries\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T08:00:00.000Z/2012-01-03T08:00:00.000Z\"]}," + "\"descending\":true," + "\"virtualColumns\":[]," + "\"filter\":null," @@ -80,7 +80,7 @@ + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\",\"outputType\":\"STRING\"}," + "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"}," + "\"threshold\":5," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T00:00:00.000-07:00/2013-09-03T00:00:00.000-07:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T07:00:00.000Z/2013-09-03T07:00:00.000Z\"]}," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\",\"expression\":null}," @@ -107,7 +107,7 @@ private static final String GROUP_BY_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"groupBy\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T08:00:00.000Z/2012-01-03T08:00:00.000Z\"]}," + "\"virtualColumns\":[]," + "\"filter\":null," + "\"granularity\":\"DAY\"," @@ -134,7 +134,7 @@ private static final String SELECT_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"select\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T08:00:00.000Z/2013-01-02T08:00:00.000Z\"]}," + "\"descending\":false," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index e0a11e98d3..8fca03b0be 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -230,7 +230,7 @@ private void verifyRows(List> expectedRows, Assert.assertEquals( (Double) expected.get("unique_hosts"), (Double) HyperUniquesAggregatorFactory - .estimateCardinality(actual.getRaw("unique_hosts")), + .estimateCardinality(actual.getRaw("unique_hosts"), false), 0.001 ); } diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index a807d03e36..870e3654e6 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -37,7 +37,7 @@ ../.. - 2.11.0 + 4.0.0 1.19.3 9.3.19.v20170502 10.11.1.1 @@ -252,4 +252,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 65ed2220ba..6ee9bcfb5f 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 10.11.1.1 3.1.0 0.1.2 - 0.10.1 + 0.11.0 19.0 2.4.11 1.3.166 @@ -173,7 +173,7 @@ 2.22.2 2.12 1.1 - 2.8.1 + 2.9.9 3.5.2 1.8 4.11 -- 2.11.0 (Apple Git-81)