From f676e2dd4e5acce2ee31ddfa4b7a685dcfb9909f Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 28 Apr 2017 00:33:41 +0530 Subject: [PATCH] [HIVE-16474] Upgrade druid to 0.10 --- .../hive/druid/DruidStorageHandlerUtils.java | 66 ++++++++++++++++++---- .../hadoop/hive/druid/io/DruidOutputFormat.java | 7 +-- .../hadoop/hive/druid/io/DruidRecordWriter.java | 2 +- .../hive/druid/DerbyConnectorTestUtility.java | 4 +- .../apache/hadoop/hive/druid/TestDruidSerDe.java | 2 +- .../hadoop/hive/druid/TestDruidStorageHandler.java | 2 +- .../druid/TestHiveDruidQueryBasedInputFormat.java | 46 +++++++++------ .../hadoop/hive/ql/io/TestDruidRecordWriter.java | 10 ++-- pom.xml | 2 +- 9 files changed, 100 insertions(+), 41 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 8d48e14..6a498ab 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,8 +17,16 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.util.VersionUtil; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -34,10 +42,12 @@ import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.query.BaseQuery; +import io.druid.query.select.SelectQueryConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; @@ -47,13 +57,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -70,17 +81,12 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Reader; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.InetAddress; import java.net.URL; -import java.net.URLDecoder; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -88,10 +94,6 @@ import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; - -import static org.apache.hadoop.hive.ql.exec.Utilities.jarFinderGetJar; /** * Utils class for Druid storage handler. @@ -112,6 +114,50 @@ */ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); + public static class PeriodGranularitySerializer extends JsonSerializer{ + + @Override + public void serialize(PeriodGranularity granularity, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + // Set timezone based on user timezone if origin is not already set + // as it is default Hive time semantics to consider user timezone. + PeriodGranularity granularityWithUserTimezone = new PeriodGranularity( + granularity.getPeriod(), + granularity.getOrigin(), + DateTimeZone.getDefault() + ); + granularityWithUserTimezone.serialize(jsonGenerator, serializerProvider); + } + + @Override + public void serializeWithType(PeriodGranularity value, JsonGenerator gen, + SerializerProvider serializers, TypeSerializer typeSer) throws IOException { + serialize(value, gen, serializers); + } + } + + public static class HiveDruidSerializationModule extends SimpleModule { + private static final String NAME = "HiveDruidSerializationModule"; + private static final VersionUtil VERSION_UTIL = new VersionUtil() {}; + + public HiveDruidSerializationModule() { + super(NAME, VERSION_UTIL.version()); + addSerializer(PeriodGranularity.class, new PeriodGranularitySerializer()); + } + } + + static + { + InjectableValues.Std injectableValues = new InjectableValues.Std().addValue( + SelectQueryConfig.class, + new SelectQueryConfig(false) + ); + JSON_MAPPER.setInjectableValues(injectableValues); + SMILE_MAPPER.setInjectableValues(injectableValues); + JSON_MAPPER.registerModule(new HiveDruidSerializationModule()); + SMILE_MAPPER.registerModule(new HiveDruidSerializationModule()); + } + private static final int NUM_RETRIES = 8; private static final int SECONDS_BETWEEN_RETRIES = 2; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index fbdd4c9..31db86a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -21,7 +21,6 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.metamx.common.Granularity; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -29,7 +28,7 @@ import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; -import io.druid.granularity.QueryGranularity; +import io.druid.java.util.common.granularity.Granularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -106,8 +105,8 @@ hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER); final GranularitySpec granularitySpec = new UniformGranularitySpec( - Granularity.valueOf(segmentGranularity), - QueryGranularity.fromString( + Granularity.fromString(segmentGranularity), + Granularity.fromString( tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null ? "NONE" : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 8d22df6..e97f588 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -25,10 +25,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; -import com.metamx.common.Granularity; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.java.util.common.granularity.Granularity; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java index f9304a5..627f078 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java @@ -23,6 +23,8 @@ import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.storage.derby.DerbyConnector; +import io.druid.metadata.storage.derby.DerbyMetadataStorage; + import org.junit.Assert; import org.junit.rules.ExternalResource; import org.skife.jdbi.v2.DBI; @@ -46,7 +48,7 @@ protected DerbyConnectorTestUtility( Supplier dbTables, String jdbcUri ) { - super(config, dbTables, new DBI(jdbcUri + ";create=true")); + super(new DerbyMetadataStorage(config.get()), config, dbTables, new DBI(jdbcUri + ";create=true")); this.jdbcUri = jdbcUri; } diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java index a67afdb..1bd5d84 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidSerDe.java @@ -554,7 +554,7 @@ private static void deserializeQueryResults(DruidSerDe serDe, String queryType, Query query = null; DruidQueryRecordReader reader = null; List resultsList = null; - ObjectMapper mapper = new DefaultObjectMapper(); + ObjectMapper mapper = DruidStorageHandlerUtils.JSON_MAPPER; switch (queryType) { case Query.TIMESERIES: query = mapper.readValue(jsonQuery, TimeseriesQuery.class); diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index da6610a..50eed8e 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -195,7 +195,7 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path segmentOutputPath = JobHelper - .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment); + .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, JobHelper.INDEX_ZIP); Path indexPath = new Path(segmentOutputPath, "index.zip"); DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec( ImmutableMap.of("path", indexPath)).build(); diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index bb4011b..ac6782f 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.calcite.avatica.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -29,12 +30,19 @@ import org.apache.hadoop.hive.druid.io.HiveDruidSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.joda.time.Interval; +import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; import org.junit.Test; +import io.druid.java.util.common.granularity.DurationGranularity; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.Query; import junit.framework.TestCase; +import com.fasterxml.jackson.core.JsonProcessingException; + public class TestHiveDruidQueryBasedInputFormat extends TestCase { @SuppressWarnings("unchecked") @@ -147,8 +155,9 @@ public void testCreateSplitsIntervals() throws Exception { + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + "\"descending\":true," + + "\"virtualColumns\":[]," + "\"filter\":null," - + "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"}," + + "\"granularity\":{\"type\":\"period\",\"period\":\"P1D\",\"timeZone\":\"America/Los_Angeles\",\"origin\":null}," + "\"aggregations\":[]," + "\"postAggregations\":[]," + "\"context\":null}, [localhost:8082]}]"; @@ -178,14 +187,15 @@ public void testCreateSplitsIntervals() throws Exception { private static final String TOPN_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"topN\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_data\"}," - + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\"}," + + "\"virtualColumns\":[]," + + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\",\"outputType\":\"STRING\"}," + "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"}," + "\"threshold\":5," + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T00:00:00.000-07:00/2013-09-03T00:00:00.000-07:00\"]}," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," - + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\"}," - + "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\"}]," + + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\",\"expression\":null}," + + "{\"type\":\"doubleSum\",\"name\":\"some_metric\",\"fieldName\":\"some_metric\",\"expression\":null}]," + "\"postAggregations\":[]," + "\"context\":null," + "\"descending\":false}, [localhost:8082]}]"; @@ -209,12 +219,13 @@ public void testCreateSplitsIntervals() throws Exception { "[HiveDruidSplit{{\"queryType\":\"groupBy\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"virtualColumns\":[]," + "\"filter\":null," - + "\"granularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1969-12-31T16:00:00.000-08:00\"}," - + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"country\",\"outputName\":\"country\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"device\",\"outputName\":\"device\"}]," - + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"total_usage\",\"fieldName\":\"user_count\"}," - + "{\"type\":\"doubleSum\",\"name\":\"data_transfer\",\"fieldName\":\"data_transfer\"}]," + + "\"granularity\":{\"type\":\"period\",\"period\":\"P1D\",\"timeZone\":\"America/Los_Angeles\",\"origin\":null}," + + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"country\",\"outputName\":\"country\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"device\",\"outputName\":\"device\",\"outputType\":\"STRING\"}]," + + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"total_usage\",\"fieldName\":\"user_count\",\"expression\":null}," + + "{\"type\":\"doubleSum\",\"name\":\"data_transfer\",\"fieldName\":\"data_transfer\",\"expression\":null}]," + "\"postAggregations\":[]," + "\"having\":null," + "\"limitSpec\":{\"type\":\"default\",\"columns\":[{\"dimension\":\"country\",\"direction\":\"ascending\",\"dimensionOrder\":{\"type\":\"lexicographic\"}}," @@ -238,15 +249,16 @@ public void testCreateSplitsIntervals() throws Exception { + "\"descending\":false," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," - + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\"}]," + + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\",\"outputType\":\"STRING\"}," + + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\",\"outputType\":\"STRING\"}]," + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"]," + + "\"virtualColumns\":[]," + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false}," + "\"context\":{\"druid.query.fetch\":true}}, [localhost:8082]}]"; diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index d9e01fe..c7c0050 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.metamx.common.Granularity; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionSchema; @@ -33,7 +32,8 @@ import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; -import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.Granularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -136,7 +136,7 @@ public void testWrite() throws IOException, SegmentLoadingException { new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts") }, new UniformGranularitySpec( - Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL) + Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL) ), objectMapper ); @@ -167,7 +167,7 @@ public DruidWritable apply(@Nullable ImmutableMap input ) { return new DruidWritable(ImmutableMap.builder().putAll(input) .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - Granularity.DAY.truncate( + Granularities.DAY.bucketStart( new DateTime((long) input .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN))) .getMillis() @@ -194,7 +194,7 @@ public DruidWritable apply(@Nullable ImmutableMap input ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), null, - QueryGranularities.NONE + Granularities.NONE ); List rows = Lists.newArrayList(); diff --git a/pom.xml b/pom.xml index e0aae27..9bc6fef 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ 10.10.2.0 3.1.0 0.1.2 - 0.9.2 + 0.10.0 14.0.1 2.4.4 1.3.166 -- 2.8.4 (Apple Git-73)