From b0640cc60b2dd8c7b54e0c831fa0d3106127ed29 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 23 Jan 2018 23:06:11 +0530 Subject: [PATCH] [HIVE-18518] Upgrade to druid version 0.11 --- druid-handler/pom.xml | 2 +- .../hive/druid/DruidStorageHandlerUtils.java | 2 +- .../druid/serde/HiveDruidSerializationModule.java | 3 ++ .../hadoop/hive/druid/TestDruidStorageHandler.java | 45 +++++++++++----------- .../druid/TestHiveDruidQueryBasedInputFormat.java | 16 ++++---- .../hadoop/hive/ql/io/TestDruidRecordWriter.java | 2 +- itests/qtest-druid/pom.xml | 4 +- pom.xml | 4 +- 8 files changed, 41 insertions(+), 37 deletions(-) diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index 2a62b90eea..670d82b6f9 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -29,7 +29,7 @@ .. - 0.27.10 + 1.3.2 16.0.1 diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 9de0097fa5..2f956b179b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -24,12 +24,12 @@ import com.google.common.base.Throwables; import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import com.metamx.common.JodaUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java index f72fd0d107..8a110ae6e9 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/HiveDruidSerializationModule.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hive.druid.serde; import io.druid.java.util.common.granularity.PeriodGranularity; +import io.druid.query.spec.LegacySegmentSpec; import com.fasterxml.jackson.core.util.VersionUtil; import com.fasterxml.jackson.databind.module.SimpleModule; +import org.joda.time.Interval; + /** * This class is used to define/override any serde behavior for classes from druid. * Currently it is used to override the default behavior when serializing PeriodGranularity to include user timezone. diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 6f7fc78b69..6a496c20df 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -45,6 +45,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -88,7 +89,7 @@ private DruidStorageHandler druidStorageHandler; private DataSegment createSegment(String location) throws IOException { - return createSegment(location, new Interval(100, 170), "v1", new LinearShardSpec(0)); + return createSegment(location, new Interval(100, 170, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); } private DataSegment createSegment(String location, Interval interval, String version, @@ -321,7 +322,7 @@ public void testCommitInsertOverwriteTable() throws MetaException, IOException { // This create and publish the segment to be overwritten List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v0", new LinearShardSpec(0))); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, @@ -332,7 +333,7 @@ public void testCommitInsertOverwriteTable() throws MetaException, IOException { // This creates and publish new segment DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(180, 250), "v1", new LinearShardSpec(0)); + new Interval(180, 250, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) @@ -374,7 +375,7 @@ public void testCommitMultiInsertOverwriteTable() throws MetaException, IOExcept // This create and publish the segment to be overwritten List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v0", new LinearShardSpec(0))); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, @@ -425,7 +426,7 @@ public void testCommitMultiInsertOverwriteTable() throws MetaException, IOExcept // #5 DataSegment dataSegment1 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(180, 250), "v1", new LinearShardSpec(0)); + new Interval(180, 250, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath1 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment1, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -440,7 +441,7 @@ public void testCommitMultiInsertOverwriteTable() throws MetaException, IOExcept // #6 DataSegment dataSegment2 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(200, 250), "v1", new LinearShardSpec(0)); + new Interval(200, 250, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath2 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment2, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -455,7 +456,7 @@ public void testCommitMultiInsertOverwriteTable() throws MetaException, IOExcept // #7 DataSegment dataSegment3 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 200), "v1", new LinearShardSpec(0)); + new Interval(100, 200, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath3 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment3, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -514,7 +515,7 @@ public void testCommitInsertIntoTable() throws MetaException, IOException { Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v0", new LinearShardSpec(1))); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(1))); HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); @@ -527,7 +528,7 @@ public void testCommitInsertIntoTable() throws MetaException, IOException { dataSegmentPusher ); DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v1", new LinearShardSpec(0)); + new Interval(100, 150, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -571,7 +572,7 @@ public void testInsertIntoAppendOneMorePartition() throws MetaException, IOExcep List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v0", new LinearShardSpec(0))); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, @@ -581,7 +582,7 @@ public void testInsertIntoAppendOneMorePartition() throws MetaException, IOExcep ); DataSegment dataSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v0", new LinearShardSpec(0)); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -621,7 +622,7 @@ public void testCommitInsertIntoWhenDestinationSegmentFileExist() Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), - new Interval(100, 150), "v0", new LinearShardSpec(1))); + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(1))); HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); @@ -633,7 +634,7 @@ public void testCommitInsertIntoWhenDestinationSegmentFileExist() dataSegmentPusher ); DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), - new Interval(100, 150), "v1", new LinearShardSpec(0)); + new Interval(100, 150, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -641,7 +642,7 @@ public void testCommitInsertIntoWhenDestinationSegmentFileExist() // Create segment file at the destination location with LinearShardSpec(2) DataSegment segment = createSegment(new Path(taskDirPath, "index_conflict.zip").toString(), - new Interval(100, 150), "v1", new LinearShardSpec(1)); + new Interval(100, 150, DateTimeZone.UTC), "v1", new LinearShardSpec(1)); Path segmentPath = new Path(dataSegmentPusher.getPathForHadoop(), dataSegmentPusher.makeIndexPathName(segment, DruidStorageHandlerUtils.INDEX_ZIP)); FileUtils.writeStringToFile(new File(segmentPath.toUri()), "dummy"); @@ -682,13 +683,13 @@ public void testCommitInsertIntoWithConflictingIntervalSegment() Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); List existingSegments = Arrays.asList( createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), - new Interval(100, 150), + new Interval(100, 150, DateTimeZone.UTC), "v0", new LinearShardSpec(0)), createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), - new Interval(150, 200), + new Interval(150, 200, DateTimeZone.UTC), "v0", new LinearShardSpec(0)), createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), - new Interval(200, 300), + new Interval(200, 300, DateTimeZone.UTC), "v0", new LinearShardSpec(0))); HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); pusherConfig.setStorageDirectory(taskDirPath.toString()); @@ -703,7 +704,7 @@ public void testCommitInsertIntoWithConflictingIntervalSegment() // Try appending segment with conflicting interval DataSegment conflictingSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 300), "v1", new LinearShardSpec(0)); + new Interval(100, 300, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils .makeSegmentDescriptorOutputPath(conflictingSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) @@ -723,11 +724,11 @@ public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); List existingSegments = Arrays .asList(createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), - new Interval(100, 150), "v0", new NoneShardSpec()), + new Interval(100, 150, DateTimeZone.UTC), "v0", new NoneShardSpec()), createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), - new Interval(200, 250), "v0", new LinearShardSpec(0)), + new Interval(200, 250, DateTimeZone.UTC), "v0", new LinearShardSpec(0)), createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), - new Interval(250, 300), "v0", new LinearShardSpec(0))); + new Interval(250, 300, DateTimeZone.UTC), "v0", new LinearShardSpec(0))); HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); pusherConfig.setStorageDirectory(taskDirPath.toString()); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); @@ -741,7 +742,7 @@ public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, // Try appending to non extendable shard spec DataSegment conflictingSegment = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), - new Interval(100, 150), "v1", new LinearShardSpec(0)); + new Interval(100, 150, DateTimeZone.UTC), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils .makeSegmentDescriptorOutputPath(conflictingSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index 514dba3aff..bb43d512c4 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -38,11 +38,11 @@ + " \"dataSource\": \"sample_datasource\", " + " \"granularity\": \"DAY\", " + " \"descending\": \"true\", " - + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]}"; + + " \"intervals\": [ \"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\" ]}"; private static final String TIMESERIES_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"timeseries\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T08:00:00.000Z/2012-01-03T08:00:00.000Z\"]}," + "\"descending\":true," + "\"virtualColumns\":[]," + "\"filter\":null," @@ -71,7 +71,7 @@ + " ], " + " \"granularity\": \"all\", " + " \"intervals\": [ " - + " \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\" " + + " \"2013-08-31T00:00:00.000-07:00/2013-09-03T00:00:00.000-07:00\" " + " ]}"; private static final String TOPN_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"topN\"," @@ -80,7 +80,7 @@ + "\"dimension\":{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"sample_dim\",\"outputName\":\"sample_dim\",\"outputType\":\"STRING\"}," + "\"metric\":{\"type\":\"LegacyTopNMetricSpec\",\"metric\":\"count\"}," + "\"threshold\":5," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T00:00:00.000-07:00/2013-09-03T00:00:00.000-07:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-08-31T07:00:00.000Z/2013-09-03T07:00:00.000Z\"]}," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," + "\"aggregations\":[{\"type\":\"longSum\",\"name\":\"count\",\"fieldName\":\"count\",\"expression\":null}," @@ -102,12 +102,12 @@ + " { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" }, " + " { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" } " + " ], " - + " \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]" + + " \"intervals\": [ \"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\" ]" + " }"; private static final String GROUP_BY_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"groupBy\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"sample_datasource\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T00:00:00.000-08:00/2012-01-03T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2012-01-01T08:00:00.000Z/2012-01-03T08:00:00.000Z\"]}," + "\"virtualColumns\":[]," + "\"filter\":null," + "\"granularity\":\"DAY\"," @@ -128,13 +128,13 @@ + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\",\"newpage\",\"user\"], " + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " + " \"granularity\": \"all\", " - + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " + + " \"intervals\": [ \"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\" ], " + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, " + " \"context\":{\"druid.query.fetch\":true}}"; private static final String SELECT_QUERY_SPLIT = "[HiveDruidSplit{{\"queryType\":\"select\"," + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\"]}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T08:00:00.000Z/2013-01-02T08:00:00.000Z\"]}," + "\"descending\":false," + "\"filter\":null," + "\"granularity\":{\"type\":\"all\"}," diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index e0a11e98d3..8fca03b0be 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -230,7 +230,7 @@ private void verifyRows(List> expectedRows, Assert.assertEquals( (Double) expected.get("unique_hosts"), (Double) HyperUniquesAggregatorFactory - .estimateCardinality(actual.getRaw("unique_hosts")), + .estimateCardinality(actual.getRaw("unique_hosts"), false), 0.001 ); } diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index a807d03e36..870e3654e6 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -37,7 +37,7 @@ ../.. - 2.11.0 + 4.0.0 1.19.3 9.3.19.v20170502 10.11.1.1 @@ -252,4 +252,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 65ed2220ba..6ee9bcfb5f 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 10.11.1.1 3.1.0 0.1.2 - 0.10.1 + 0.11.0 19.0 2.4.11 1.3.166 @@ -173,7 +173,7 @@ 2.22.2 2.12 1.1 - 2.8.1 + 2.9.9 3.5.2 1.8 4.11 -- 2.11.0 (Apple Git-81)