diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 76540b76b3..c0feb8d138 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -49,6 +49,7 @@ import io.druid.metadata.storage.derby.DerbyConnector; import io.druid.metadata.storage.derby.DerbyMetadataStorage; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.metadata.storage.mysql.MySQLConnectorConfig; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexSpec; @@ -335,6 +336,7 @@ private void updateKafkaIngestion(Table table){ inputParser, dimensionsAndAggregates.rhs, granularitySpec, + null, DruidStorageHandlerUtils.JSON_MAPPER ); @@ -880,7 +882,7 @@ public String getPassword() { if (dbType.equals("mysql")) { connector = new MySQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) - ); + , new MySQLConnectorConfig()); } else if (dbType.equals("postgresql")) { connector = new PostgreSQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 14242378ff..1aef565cf3 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -26,6 +26,7 @@ import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.query.Druids; import io.druid.query.expression.LikeExprMacro; import io.druid.query.expression.RegexpExtractExprMacro; import io.druid.query.expression.TimestampCeilExprMacro; @@ -35,7 +36,9 @@ import io.druid.query.expression.TimestampParseExprMacro; import io.druid.query.expression.TimestampShiftExprMacro; import io.druid.query.expression.TrimExprMacro; +import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQueryConfig; +import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.query.aggregation.AggregatorFactory; @@ -48,6 +51,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; @@ -124,8 +128,10 @@ import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -168,6 +174,7 @@ * Mapper to use to serialize/deserialize Druid objects (SMILE) */ public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); + private static final int DEFAULT_MAX_TRIES = 10; static { // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig @@ -187,7 +194,8 @@ new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro() ))) - .addValue(ObjectMapper.class, JSON_MAPPER); + .addValue(ObjectMapper.class, JSON_MAPPER) + .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); @@ -214,13 +222,14 @@ /** * Used by druid to perform IO on indexes */ - public static final IndexIO INDEX_IO = new IndexIO(JSON_MAPPER, () -> 0); + public static final IndexIO INDEX_IO = + new IndexIO(JSON_MAPPER, TmpFileSegmentWriteOutMediumFactory.instance(), () -> 0); /** * Used by druid to merge indexes */ public static final IndexMergerV9 INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO + DruidStorageHandlerUtils.INDEX_IO,TmpFileSegmentWriteOutMediumFactory.instance() ); /** @@ -606,7 +615,7 @@ public static void disableDataSourceWithHandle(Handle handle, } } ) - , 3, SQLMetadataConnector.DEFAULT_MAX_TRIES); + , 3, DEFAULT_MAX_TRIES); return segmentList; } @@ -637,6 +646,19 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, ); } + public static String createSelectStarQuery(String dataSource) throws IOException { + // Create Select query + Druids.SelectQueryBuilder builder = new Druids.SelectQueryBuilder(); + builder.dataSource(dataSource); + final List intervals = Arrays.asList(DEFAULT_INTERVAL); + builder.intervals(new MultipleIntervalSegmentSpec(intervals)); + builder.pagingSpec(PagingSpec.newSpec(1)); + Map context = new HashMap<>(); + context.put(Constants.DRUID_QUERY_FETCH, false); + builder.context(context); + return JSON_MAPPER.writeValueAsString(builder.build()); + } + /** * Simple interface for retry operations */ diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 15a08eb4d3..ecb4360623 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -129,6 +129,7 @@ inputParser, dimensionsAndAggregates.rhs, granularitySpec, + null, DruidStorageHandlerUtils.JSON_MAPPER ); @@ -156,7 +157,8 @@ 0, true, null, - 0L + 0L, + null ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index c097a13194..c2d3fe5a49 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -22,9 +22,7 @@ import java.net.URL; import java.net.URLEncoder; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; @@ -53,7 +51,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,8 +61,6 @@ import com.metamx.http.client.Request; import io.druid.query.BaseQuery; -import io.druid.query.Druids; -import io.druid.query.Druids.SelectQueryBuilder; import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.SegmentDescriptor; @@ -133,7 +128,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) throw new IOException("Druid data source cannot be empty or null"); } //@FIXME https://issues.apache.org/jira/browse/HIVE-19023 use scan instead of Select - druidQuery = createSelectStarQuery(dataSource); + druidQuery = DruidStorageHandlerUtils.createSelectStarQuery(dataSource); druidQueryType = Query.SELECT; } else { druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE); @@ -169,19 +164,6 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } } - private static String createSelectStarQuery(String dataSource) throws IOException { - // Create Select query - SelectQueryBuilder builder = new Druids.SelectQueryBuilder(); - builder.dataSource(dataSource); - final List intervals = Arrays.asList(DruidStorageHandlerUtils.DEFAULT_INTERVAL); - builder.intervals(intervals); - builder.pagingSpec(PagingSpec.newSpec(1)); - Map context = new HashMap<>(); - context.put(Constants.DRUID_QUERY_FETCH, false); - builder.context(context); - return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build()); - } - /* New method that distributes the Select query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java index ea23dddf59..1ec8b5c238 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java @@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; +import javax.annotation.Nullable; import java.io.File; /** @@ -131,6 +131,10 @@ public File getBasePersistDirectory() return basePersistDirectory; } + @Nullable @Override public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { + return null; + } + @Override @JsonProperty public int getMaxPendingPersists() diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index c1bd3322e1..cb8fa3919b 100644 --- druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -144,13 +144,14 @@ public void testWrite() throws IOException, SegmentLoadingException { new UniformGranularitySpec( Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL) ), + null, objectMapper ); IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, 0, 0, null, null, - 0L + 0L, null ); LocalFileSystem localFileSystem = FileSystem.getLocal(config); DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher( @@ -198,6 +199,7 @@ public DruidWritable apply(@Nullable ImmutableMap input Firehose firehose = new IngestSegmentFirehose( ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())), + null, ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), null @@ -228,7 +230,7 @@ private void verifyRows(List> expectedRows, actual.getTimestamp().getMillis() ); Assert.assertEquals(expected.get("host"), actual.getDimension("host")); - Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum")); Assert.assertEquals( (Double) expected.get("unique_hosts"), (Double) HyperUniquesAggregatorFactory diff --git pom.xml pom.xml index 5802bd3a35..26721ed957 100644 --- pom.xml +++ pom.xml @@ -140,7 +140,7 @@ 10.14.1.0 3.1.0 0.1.2 - 0.11.0 + 0.12.0 19.0 2.4.11 1.3.166