From 8ae38dcad7152a2b1e2ba3dc77c4441b5c7e8a64 Mon Sep 17 00:00:00 2001 From: Nishant Bangarwa Date: Mon, 13 Apr 2020 12:04:38 +0530 Subject: [PATCH] [HIVE-23184] Upgrade druid to 0.17.1 --- data/scripts/kafka_init_data.csv | 2 +- .../hadoop/hive/druid/DruidKafkaUtils.java | 16 +- .../hive/druid/DruidStorageHandler.java | 28 +- .../hive/druid/DruidStorageHandlerUtils.java | 33 +- .../hive/druid/io/DruidOutputFormat.java | 1 + .../druid/io/DruidQueryBasedInputFormat.java | 58 +-- .../hive/druid/io/DruidRecordWriter.java | 5 +- .../json/KafkaIndexTaskTuningConfig.java | 136 +++++++ .../hive/druid/json/KafkaSupervisorSpec.java | 1 + .../json/KafkaSupervisorTuningConfig.java | 247 +++++++------ .../hive/druid/json/KafkaTuningConfig.java | 307 ---------------- .../SeekableStreamIndexTaskTuningConfig.java | 332 ++++++++++++++++++ .../SeekableStreamSupervisorTuningConfig.java | 71 ++++ .../serde/DruidSelectQueryRecordReader.java | 92 ----- .../hive/druid/serde/TestDruidSerDe.java | 144 +------- .../hive/ql/io/TestDruidRecordWriter.java | 3 +- itests/qtest-druid/pom.xml | 22 ++ pom.xml | 2 +- .../druid/druidkafkamini_delimited.q.out | 2 +- 19 files changed, 753 insertions(+), 749 deletions(-) create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java delete mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java delete mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv index 5dc094ed21..d818144115 100644 --- a/data/scripts/kafka_init_data.csv +++ b/data/scripts/kafka_init_data.csv @@ -1,4 +1,4 @@ -"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-08-31T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 "2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 "2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 "2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java index b56d48aa4f..0f77cb2c8a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java @@ -29,18 +29,13 @@ import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.hive.druid.json.AvroParseSpec; -import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; -import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; +import org.apache.hadoop.hive.druid.json.*; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -89,6 +84,7 @@ static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), indexSpec, null, + null, // buildV9Directly - use druid default, no need to be configured by user DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), @@ -161,14 +157,14 @@ static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec String task = JSON_MAPPER.writeValueAsString(spec); CONSOLE.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); - FullResponseHolder + StringFullResponseHolder response = DruidStorageHandlerUtils.getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))).setContent( "application/json", JSON_MAPPER.writeValueAsBytes(spec)), - new FullResponseHandler(Charset.forName("UTF-8"))); + new StringFullResponseHandler(Charset.forName("UTF-8"))); if (response.getStatus().equals(HttpResponseStatus.OK)) { String msg = diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index fe55eff222..715b1051d9 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -39,8 +39,8 @@ import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; @@ -50,6 +50,7 @@ import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig; import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector; import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig; +import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.Query; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -365,14 +366,14 @@ private void updateKafkaIngestion(Table table) { private void resetKafkaIngestion(String overlordAddress, String dataSourceName) { try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -389,14 +390,14 @@ private void resetKafkaIngestion(String overlordAddress, String dataSourceName) private void stopKafkaIngestion(String overlordAddress, String dataSourceName) { try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -423,12 +424,12 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -465,14 +466,14 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -546,7 +547,7 @@ private void checkLoadStatus(List segments) { coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, new URL(String.format("http://%s/status", coordinatorAddress))), - new FullResponseHandler(Charset.forName("UTF-8"))).getContent(), + new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(), input -> input instanceof IOException, maxTries); } catch (Exception e) { @@ -582,7 +583,7 @@ private void checkLoadStatus(List segments) { result = DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input), - new FullResponseHandler(Charset.forName("UTF-8"))).getContent(); + new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(); LOG.debug("Checking segment [{}] response is [{}]", input, result); return Strings.isNullOrEmpty(result); @@ -879,7 +880,8 @@ private SQLMetadataConnector buildConnector() { connector = new PostgreSQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), - new PostgreSQLConnectorConfig() + new PostgreSQLConnectorConfig(), + new PostgreSQLTablesConfig() ); break; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 1d7009b5af..09e769cfa0 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -33,6 +33,7 @@ import com.google.common.collect.Ordering; import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.guice.BloomFilterSerializersModule; @@ -46,9 +47,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; @@ -68,6 +69,7 @@ import org.apache.druid.query.expression.TimestampParseExprMacro; import org.apache.druid.query.expression.TimestampShiftExprMacro; import org.apache.druid.query.expression.TrimExprMacro; +import org.apache.druid.query.extraction.StringFormatExtractionFn; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.BloomDimFilter; import org.apache.druid.query.filter.BloomKFilter; @@ -79,8 +81,6 @@ import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.SelectQuery; -import org.apache.druid.query.select.SelectQueryConfig; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.TopNQuery; @@ -225,10 +225,12 @@ private DruidStorageHandlerUtils() { private static final int DEFAULT_MAX_TRIES = 10; static { + // This is needed to initliaze NullHandling for druid without guice. + NullHandling.initializeForTests(); // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig InjectableValues.Std injectableValues = - new InjectableValues.Std().addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) + new InjectableValues.Std() // Expressions macro table used when we deserialize the query from calcite plan .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList.of(new LikeExprMacro(), @@ -242,8 +244,7 @@ private DruidStorageHandlerUtils() { new TrimExprMacro.BothTrimExprMacro(), new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro()))) - .addValue(ObjectMapper.class, JSON_MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + .addValue(ObjectMapper.class, JSON_MAPPER); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); @@ -331,10 +332,10 @@ public static InputStream submitRequest(HttpClient client, Request request) thro } - static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, + static StringFullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request, - FullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException { - FullResponseHolder responseHolder = client.go(request, fullResponseHandler).get(); + StringFullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException { + StringFullResponseHolder responseHolder = client.go(request, fullResponseHandler).get(); if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) { String redirectUrlStr = responseHolder.getResponse().headers().get("Location"); LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); @@ -638,12 +639,12 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, Pa } public static String createScanAllQuery(String dataSourceName, List columns) throws JsonProcessingException { - final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder(); + final Druids.ScanQueryBuilder scanQueryBuilder = Druids.newScanQueryBuilder(); final List intervals = Collections.singletonList(DEFAULT_INTERVAL); ScanQuery scanQuery = scanQueryBuilder.dataSource(dataSourceName) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .intervals(new MultipleIntervalSegmentSpec(intervals)) .columns(columns) .build(); @@ -977,11 +978,7 @@ public static IndexSpec getIndexSpec(Configuration jc) { .setVirtualColumns(VirtualColumns.create(virtualColumns)).build(); break; case org.apache.druid.query.Query.SCAN: - rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query).filters(filter) - .virtualColumns(VirtualColumns.create(virtualColumns)).build(); - break; - case org.apache.druid.query.Query.SELECT: - rv = Druids.SelectQueryBuilder.copy((SelectQuery) query).filters(filter) + rv = Druids.ScanQueryBuilder.copy((ScanQuery) query).filters(filter) .virtualColumns(VirtualColumns.create(virtualColumns)).build(); break; default: @@ -1140,8 +1137,6 @@ public static VirtualColumns getVirtualColumns(org.apache.druid.query.Query quer return ((GroupByQuery) query).getVirtualColumns(); case org.apache.druid.query.Query.SCAN: return ((ScanQuery) query).getVirtualColumns(); - case org.apache.druid.query.Query.SELECT: - return ((SelectQuery) query).getVirtualColumns(); default: throw new UnsupportedOperationException("Unsupported Query type " + query); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 6cf3ef2562..86e5d03e9d 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -162,6 +162,7 @@ null, null, indexSpec, + null, true, 0, 0, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 82a1f11f3f..b66c7575e5 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -25,8 +25,6 @@ import org.apache.druid.query.Query; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.PagingSpec; -import org.apache.druid.query.select.SelectQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; @@ -37,13 +35,7 @@ import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.druid.serde.*; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; @@ -88,8 +80,6 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) return new DruidTopNQueryRecordReader(); case Query.GROUP_BY: return new DruidGroupByQueryRecordReader(); - case Query.SELECT: - return new DruidSelectQueryRecordReader(); case Query.SCAN: return new DruidScanQueryRecordReader(); default: @@ -152,9 +142,6 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) case Query.TOPN: case Query.GROUP_BY: return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], new String[] {address})}; - case Query.SELECT: - SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); - return distributeSelectQuery(address, selectQuery, paths[0]); case Query.SCAN: ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class); return distributeScanQuery(address, scanQuery, paths[0]); @@ -163,54 +150,13 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } } - /* New method that distributes the Select query by creating splits containing - * information about different Druid nodes that have the data for the given - * query. */ - private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath) - throws IOException { - // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false); - if (isFetch) { - return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), - dummyPath, - new String[] {address})}; - } - - final List segmentDescriptors = fetchLocatedSegmentDescriptors(address, query); - - // Create one input split for each segment - final int numSplits = segmentDescriptors.size(); - final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; - for (int i = 0; i < numSplits; i++) { - final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); - final String[] hosts = new String[locatedSD.getLocations().size()]; - for (int j = 0; j < locatedSD.getLocations().size(); j++) { - hosts[j] = locatedSD.getLocations().get(j).getHost(); - } - // Create partial Select query - final SegmentDescriptor - newSD = - new SegmentDescriptor(locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); - //@TODO This is fetching all the rows at once from broker or multiple historical nodes - // Move to use scan query to avoid GC back pressure on the nodes - // https://issues.apache.org/jira/browse/HIVE-17627 - final SelectQuery - partialQuery = - query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))) - .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); - splits[i] = - new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts); - } - return splits; - } - /* New method that distributes the Scan query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ private static HiveDruidSplit[] distributeScanQuery(String address, ScanQuery query, Path dummyPath) throws IOException { // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getLimit() < Long.MAX_VALUE; + final boolean isFetch = query.getScanRowsLimit() < Long.MAX_VALUE; if (isFetch) { return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 248b59aae6..cab30775bf 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -104,8 +104,11 @@ public DruidRecordWriter(DataSchema dataSchema, this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null"); appenderator = - Appenderators.createOffline(this.dataSchema, + Appenderators.createOffline( + "hive-offline-appenderator", + this.dataSchema, tuningConfig, + false, new FireDepartmentMetrics(), dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java new file mode 100644 index 0000000000..4220cf1423 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig +{ + @JsonCreator + public KafkaIndexTaskTuningConfig( + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + maxRowsInMemory, + maxBytesInMemory, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + true, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new KafkaIndexTaskTuningConfig( + getMaxRowsInMemory(), + getMaxBytesInMemory(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + true, + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + + + @Override + public String toString() + { + return "KafkaIndexTaskTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", basePersistDirectory=" + getBasePersistDirectory() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; + } + +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java index d230832243..7dd54aa73a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java @@ -62,6 +62,7 @@ null, null, null, + null, null, null, null, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java index b4d38b97f4..cb200f3bef 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java @@ -36,7 +36,11 @@ */ @SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) }) -public class KafkaSupervisorTuningConfig extends KafkaTuningConfig { +public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig + implements SeekableStreamSupervisorTuningConfig +{ + private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; + private final Integer workerThreads; private final Integer chatThreads; private final Long chatRetries; @@ -44,7 +48,38 @@ private final Duration shutdownTimeout; private final Duration offsetFetchPeriod; - public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + public static KafkaSupervisorTuningConfig defaultConfig() + { + return new KafkaSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + public KafkaSupervisorTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") Long maxTotalRows, @@ -52,6 +87,7 @@ public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxR @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @@ -67,8 +103,11 @@ public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxR @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) { - super(maxRowsInMemory, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, maxTotalRows, @@ -76,6 +115,7 @@ public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxR basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -84,134 +124,125 @@ public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxR intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions); - + maxSavedParseExceptions + ); this.workerThreads = workerThreads; this.chatThreads = chatThreads; - this.chatRetries = (chatRetries != null ? chatRetries : 8); - this.httpTimeout = defaultDuration(httpTimeout, "PT10S"); - this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S"); - this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S"); + this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); + this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT); + this.shutdownTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration( + shutdownTimeout, + DEFAULT_SHUTDOWN_TIMEOUT + ); + this.offsetFetchPeriod = SeekableStreamSupervisorTuningConfig.defaultDuration( + offsetFetchPeriod, + DEFAULT_OFFSET_FETCH_PERIOD + ); } - @JsonProperty public Integer getWorkerThreads() { + @Override + @JsonProperty + public Integer getWorkerThreads() + { return workerThreads; } - @JsonProperty public Integer getChatThreads() { + @Override + @JsonProperty + public Integer getChatThreads() + { return chatThreads; } - @JsonProperty public Long getChatRetries() { + @Override + @JsonProperty + public Long getChatRetries() + { return chatRetries; } - @JsonProperty public Duration getHttpTimeout() { + @Override + @JsonProperty + public Duration getHttpTimeout() + { return httpTimeout; } - @JsonProperty public Duration getShutdownTimeout() { + @Override + @JsonProperty + public Duration getShutdownTimeout() + { return shutdownTimeout; } - @JsonProperty public Duration getOffsetFetchPeriod() { - return offsetFetchPeriod; + @Override + public Duration getRepartitionTransitionDuration() + { + // Stopping tasks early for Kafka ingestion on partition set change is not supported yet, + // just return a default for now. + return SeekableStreamSupervisorTuningConfig.defaultDuration( + null, + SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION + ); } - @Override public String toString() { - return "KafkaSupervisorTuningConfig{" - + "maxRowsInMemory=" - + getMaxRowsInMemory() - + ", maxRowsPerSegment=" - + getMaxRowsPerSegment() - + ", maxTotalRows=" - + getMaxTotalRows() - + ", maxBytesInMemory=" - + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) - + ", intermediatePersistPeriod=" - + getIntermediatePersistPeriod() - + ", basePersistDirectory=" - + getBasePersistDirectory() - + ", maxPendingPersists=" - + getMaxPendingPersists() - + ", indexSpec=" - + getIndexSpec() - + ", reportParseExceptions=" - + isReportParseExceptions() - + ", handoffConditionTimeout=" - + getHandoffConditionTimeout() - + ", resetOffsetAutomatically=" - + isResetOffsetAutomatically() - + ", segmentWriteOutMediumFactory=" - + getSegmentWriteOutMediumFactory() - + ", workerThreads=" - + workerThreads - + ", chatThreads=" - + chatThreads - + ", chatRetries=" - + chatRetries - + ", httpTimeout=" - + httpTimeout - + ", shutdownTimeout=" - + shutdownTimeout - + ", offsetFetchPeriod=" - + offsetFetchPeriod - + ", intermediateHandoffPeriod=" - + getIntermediateHandoffPeriod() - + ", logParseExceptions=" - + isLogParseExceptions() - + ", maxParseExceptions=" - + getMaxParseExceptions() - + ", maxSavedParseExceptions=" - + getMaxSavedParseExceptions() - + '}'; - } - - private static Duration defaultDuration(final Period period, final String theDefault) { - return (period == null ? new Period(theDefault) : period).toStandardDuration(); + @JsonProperty + public Duration getOffsetFetchPeriod() + { + return offsetFetchPeriod; } - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o; - - if (workerThreads != null ? !workerThreads.equals(that.workerThreads) : that.workerThreads != null) { - return false; - } - if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null) { - return false; - } - if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null) { - return false; - } - if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null) { - return false; - } - if (shutdownTimeout != null ? !shutdownTimeout.equals(that.shutdownTimeout) : that.shutdownTimeout != null) { - return false; - } - return offsetFetchPeriod != null ? - offsetFetchPeriod.equals(that.offsetFetchPeriod) : - that.offsetFetchPeriod == null; + @Override + public String toString() + { + return "KafkaSupervisorTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", basePersistDirectory=" + getBasePersistDirectory() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", workerThreads=" + workerThreads + + ", chatThreads=" + chatThreads + + ", chatRetries=" + chatRetries + + ", httpTimeout=" + httpTimeout + + ", shutdownTimeout=" + shutdownTimeout + + ", offsetFetchPeriod=" + offsetFetchPeriod + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; } - @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0); - result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0); - result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0); - result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0); - result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0); - result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0); - return result; + @Override + public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new KafkaIndexTaskTuningConfig( + getMaxRowsInMemory(), + getMaxBytesInMemory(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + getBasePersistDirectory(), + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + true, + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java deleted file mode 100644 index 45ac77ba35..0000000000 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.druid.json; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfig; -import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; -import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.joda.time.Period; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.Objects; - -/** - * This class is copied from druid source code - * in order to avoid adding additional dependencies on druid-indexing-service. - */ -public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; - private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; - - private final int maxRowsInMemory; - private final long maxBytesInMemory; - private final int maxRowsPerSegment; - @Nullable private final Long maxTotalRows; - private final Period intermediatePersistPeriod; - private final File basePersistDirectory; - @Deprecated private final int maxPendingPersists; - private final IndexSpec indexSpec; - private final boolean reportParseExceptions; - @Deprecated private final long handoffConditionTimeout; - private final boolean resetOffsetAutomatically; - @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; - private final Period intermediateHandoffPeriod; - - private final boolean logParseExceptions; - private final int maxParseExceptions; - private final int maxSavedParseExceptions; - - @JsonCreator public KafkaTuningConfig(@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, - @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, - @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, - @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, - @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, - @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, - // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. - @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, - @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, - @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, - @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, - @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) { - // Cannot be a static because default basePersistDirectory is unique per-instance - final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); - - this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; - this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; - // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) - this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; - this.maxTotalRows = maxTotalRows; - this.intermediatePersistPeriod = - intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod; - this.basePersistDirectory = defaults.getBasePersistDirectory(); - this.maxPendingPersists = 0; - this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; - this.reportParseExceptions = - reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; - this.handoffConditionTimeout = - handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout; - this.resetOffsetAutomatically = - resetOffsetAutomatically == null ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; - this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; - this.intermediateHandoffPeriod = - intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod; - - if (this.reportParseExceptions) { - this.maxParseExceptions = 0; - this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); - } else { - this.maxParseExceptions = - maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions; - this.maxSavedParseExceptions = - maxSavedParseExceptions == null ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS : maxSavedParseExceptions; - } - this.logParseExceptions = - logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; - } - - public static KafkaTuningConfig copyOf(KafkaTuningConfig config) { - return new KafkaTuningConfig(config.maxRowsInMemory, - config.maxBytesInMemory, - config.maxRowsPerSegment, - config.maxTotalRows, - config.intermediatePersistPeriod, - config.basePersistDirectory, - config.maxPendingPersists, - config.indexSpec, - true, - config.reportParseExceptions, - config.handoffConditionTimeout, - config.resetOffsetAutomatically, - config.segmentWriteOutMediumFactory, - config.intermediateHandoffPeriod, - config.logParseExceptions, - config.maxParseExceptions, - config.maxSavedParseExceptions); - } - - @Override @JsonProperty public int getMaxRowsInMemory() { - return maxRowsInMemory; - } - - @Override @JsonProperty public long getMaxBytesInMemory() { - return maxBytesInMemory; - } - - @Override @JsonProperty public Integer getMaxRowsPerSegment() { - return maxRowsPerSegment; - } - - @JsonProperty @Override @Nullable public Long getMaxTotalRows() { - return maxTotalRows; - } - - @Override @JsonProperty public Period getIntermediatePersistPeriod() { - return intermediatePersistPeriod; - } - - @Override @JsonProperty public File getBasePersistDirectory() { - return basePersistDirectory; - } - - @Override @JsonProperty @Deprecated public int getMaxPendingPersists() { - return maxPendingPersists; - } - - @Override @JsonProperty public IndexSpec getIndexSpec() { - return indexSpec; - } - - /** - * Always returns true, doesn't affect the version being built. - */ - @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() { - return true; - } - - @Override @JsonProperty public boolean isReportParseExceptions() { - return reportParseExceptions; - } - - @JsonProperty public long getHandoffConditionTimeout() { - return handoffConditionTimeout; - } - - @JsonProperty public boolean isResetOffsetAutomatically() { - return resetOffsetAutomatically; - } - - @Override @JsonProperty @Nullable public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { - return segmentWriteOutMediumFactory; - } - - @JsonProperty public Period getIntermediateHandoffPeriod() { - return intermediateHandoffPeriod; - } - - @JsonProperty public boolean isLogParseExceptions() { - return logParseExceptions; - } - - @JsonProperty public int getMaxParseExceptions() { - return maxParseExceptions; - } - - @JsonProperty public int getMaxSavedParseExceptions() { - return maxSavedParseExceptions; - } - - public KafkaTuningConfig withBasePersistDirectory(File dir) { - return new KafkaTuningConfig(maxRowsInMemory, - maxBytesInMemory, - maxRowsPerSegment, - maxTotalRows, - intermediatePersistPeriod, - dir, - maxPendingPersists, - indexSpec, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - segmentWriteOutMediumFactory, - intermediateHandoffPeriod, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KafkaTuningConfig that = (KafkaTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory - && maxRowsPerSegment == that.maxRowsPerSegment - && maxBytesInMemory == that.maxBytesInMemory - && Objects.equals(maxTotalRows, that.maxTotalRows) - && maxPendingPersists == that.maxPendingPersists - && reportParseExceptions == that.reportParseExceptions - && handoffConditionTimeout == that.handoffConditionTimeout - && resetOffsetAutomatically == that.resetOffsetAutomatically - && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) - && Objects.equals(basePersistDirectory, that.basePersistDirectory) - && Objects.equals(indexSpec, that.indexSpec) - && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) - && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod) - && logParseExceptions == that.logParseExceptions - && maxParseExceptions == that.maxParseExceptions - && maxSavedParseExceptions == that.maxSavedParseExceptions; - } - - @Override public int hashCode() { - return Objects.hash(maxRowsInMemory, - maxRowsPerSegment, - maxBytesInMemory, - maxTotalRows, - intermediatePersistPeriod, - basePersistDirectory, - maxPendingPersists, - indexSpec, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - segmentWriteOutMediumFactory, - intermediateHandoffPeriod, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions); - } - - @Override public String toString() { - return "KafkaTuningConfig{" - + "maxRowsInMemory=" - + maxRowsInMemory - + ", maxRowsPerSegment=" - + maxRowsPerSegment - + ", maxTotalRows=" - + maxTotalRows - + ", maxBytesInMemory=" - + maxBytesInMemory - + ", intermediatePersistPeriod=" - + intermediatePersistPeriod - + ", basePersistDirectory=" - + basePersistDirectory - + ", maxPendingPersists=" - + maxPendingPersists - + ", indexSpec=" - + indexSpec - + ", reportParseExceptions=" - + reportParseExceptions - + ", handoffConditionTimeout=" - + handoffConditionTimeout - + ", resetOffsetAutomatically=" - + resetOffsetAutomatically - + ", segmentWriteOutMediumFactory=" - + segmentWriteOutMediumFactory - + ", intermediateHandoffPeriod=" - + intermediateHandoffPeriod - + ", logParseExceptions=" - + logParseExceptions - + ", maxParseExceptions=" - + maxParseExceptions - + ", maxSavedParseExceptions=" - + maxSavedParseExceptions - + '}'; - } -} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java new file mode 100644 index 0000000000..b7cf712221 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig +{ + private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; + private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; + + private final int maxRowsInMemory; + private final long maxBytesInMemory; + private final DynamicPartitionsSpec partitionsSpec; + private final Period intermediatePersistPeriod; + private final File basePersistDirectory; + @Deprecated + private final int maxPendingPersists; + private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; + private final boolean reportParseExceptions; + private final long handoffConditionTimeout; + private final boolean resetOffsetAutomatically; + @Nullable + private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final Period intermediateHandoffPeriod; + private final boolean skipSequenceNumberAvailabilityCheck; + + private final boolean logParseExceptions; + private final int maxParseExceptions; + private final int maxSavedParseExceptions; + + public SeekableStreamIndexTaskTuningConfig( + @Nullable Integer maxRowsInMemory, + @Nullable Long maxBytesInMemory, + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable Period intermediatePersistPeriod, + @Nullable File basePersistDirectory, + @Nullable Integer maxPendingPersists, + @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @Deprecated @Nullable Boolean reportParseExceptions, + @Nullable Long handoffConditionTimeout, + @Nullable Boolean resetOffsetAutomatically, + Boolean skipSequenceNumberAvailabilityCheck, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @Nullable Period intermediateHandoffPeriod, + @Nullable Boolean logParseExceptions, + @Nullable Integer maxParseExceptions, + @Nullable Integer maxSavedParseExceptions + ) + { + // Cannot be a static because default basePersistDirectory is unique per-instance + final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + + this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; + this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.intermediatePersistPeriod = intermediatePersistPeriod == null + ? defaults.getIntermediatePersistPeriod() + : intermediatePersistPeriod; + this.basePersistDirectory = defaults.getBasePersistDirectory(); + this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; + this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; + this.reportParseExceptions = reportParseExceptions == null + ? defaults.isReportParseExceptions() + : reportParseExceptions; + this.handoffConditionTimeout = handoffConditionTimeout == null + ? defaults.getHandoffConditionTimeout() + : handoffConditionTimeout; + this.resetOffsetAutomatically = resetOffsetAutomatically == null + ? DEFAULT_RESET_OFFSET_AUTOMATICALLY + : resetOffsetAutomatically; + this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + this.intermediateHandoffPeriod = intermediateHandoffPeriod == null + ? new Period().withDays(Integer.MAX_VALUE) + : intermediateHandoffPeriod; + this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null + ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK + : skipSequenceNumberAvailabilityCheck; + + if (this.reportParseExceptions) { + this.maxParseExceptions = 0; + this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); + } else { + this.maxParseExceptions = maxParseExceptions == null + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; + this.maxSavedParseExceptions = maxSavedParseExceptions == null + ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS + : maxSavedParseExceptions; + } + this.logParseExceptions = logParseExceptions == null + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; + } + + @Override + @JsonProperty + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @Override + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + + @Override + @JsonProperty + public Integer getMaxRowsPerSegment() + { + return partitionsSpec.getMaxRowsPerSegment(); + } + + @JsonProperty + @Override + @Nullable + public Long getMaxTotalRows() + { + return partitionsSpec.getMaxTotalRows(); + } + + @Override + public DynamicPartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + @Override + @JsonProperty + public Period getIntermediatePersistPeriod() + { + return intermediatePersistPeriod; + } + + @Override + @JsonProperty + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + + @Override + @JsonProperty + @Deprecated + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @Override + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + + /** + * Always returns true, doesn't affect the version being built. + */ + @Deprecated + @JsonProperty + public boolean getBuildV9Directly() + { + return true; + } + + @Override + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @JsonProperty + public long getHandoffConditionTimeout() + { + return handoffConditionTimeout; + } + + @JsonProperty + public boolean isResetOffsetAutomatically() + { + return resetOffsetAutomatically; + } + + @Override + @JsonProperty + @Nullable + public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() + { + return segmentWriteOutMediumFactory; + } + + @JsonProperty + public Period getIntermediateHandoffPeriod() + { + return intermediateHandoffPeriod; + } + + @JsonProperty + public boolean isLogParseExceptions() + { + return logParseExceptions; + } + + @JsonProperty + public int getMaxParseExceptions() + { + return maxParseExceptions; + } + + @JsonProperty + public int getMaxSavedParseExceptions() + { + return maxSavedParseExceptions; + } + + @JsonProperty + public boolean isSkipSequenceNumberAvailabilityCheck() + { + return skipSequenceNumberAvailabilityCheck; + } + + @Override + public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; + return maxRowsInMemory == that.maxRowsInMemory && + maxBytesInMemory == that.maxBytesInMemory && + maxPendingPersists == that.maxPendingPersists && + reportParseExceptions == that.reportParseExceptions && + handoffConditionTimeout == that.handoffConditionTimeout && + resetOffsetAutomatically == that.resetOffsetAutomatically && + skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && + Objects.equals(partitionsSpec, that.partitionsSpec) && + Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && + Objects.equals(basePersistDirectory, that.basePersistDirectory) && + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && + Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); + } + + @Override + public int hashCode() + { + return Objects.hash( + maxRowsInMemory, + maxBytesInMemory, + partitionsSpec, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + skipSequenceNumberAvailabilityCheck, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public abstract String toString(); +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java new file mode 100644 index 0000000000..a75baac7b2 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public interface SeekableStreamSupervisorTuningConfig +{ + + int DEFAULT_CHAT_RETRIES = 8; + String DEFAULT_HTTP_TIMEOUT = "PT10S"; + String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S"; + String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M"; + + static Duration defaultDuration(final Period period, final String theDefault) + { + return (period == null ? new Period(theDefault) : period).toStandardDuration(); + } + + @JsonProperty + Integer getWorkerThreads(); + + @JsonProperty + Integer getChatThreads(); + + @JsonProperty + Long getChatRetries(); + + @JsonProperty + Duration getHttpTimeout(); + + @JsonProperty + Duration getShutdownTimeout(); + + @JsonProperty + Duration getRepartitionTransitionDuration(); + + SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); +} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java deleted file mode 100644 index 907558fe48..0000000000 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.druid.serde; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; - -import com.fasterxml.jackson.databind.JavaType; -import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.io.NullWritable; - -import com.fasterxml.jackson.core.type.TypeReference; - -import org.apache.druid.query.Result; -import org.apache.druid.query.select.EventHolder; -import org.apache.druid.query.select.SelectResultValue; - -/** - * Record reader for results for Druid SelectQuery. - */ -public class DruidSelectQueryRecordReader extends DruidQueryRecordReader> { - - private static final TypeReference> - TYPE_REFERENCE = - new TypeReference>() { - }; - - private Iterator values = Collections.emptyIterator(); - - @Override protected JavaType getResultTypeDef() { - return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); - } - - @Override public boolean nextKeyValue() throws IOException { - if (values.hasNext()) { - return true; - } - if (getQueryResultsIterator().hasNext()) { - Result current = getQueryResultsIterator().next(); - values = current.getValue().getEvents().iterator(); - return nextKeyValue(); - } - return false; - } - - @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { - return NullWritable.get(); - } - - @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { - // Create new value - DruidWritable value = new DruidWritable(false); - EventHolder e = values.next(); - value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); - value.getValue().putAll(e.getEvent()); - return value; - } - - @Override public boolean next(NullWritable key, DruidWritable value) throws IOException { - if (nextKeyValue()) { - // Update value - value.getValue().clear(); - EventHolder e = values.next(); - value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); - value.getValue().putAll(e.getEvent()); - return true; - } - return false; - } - - @Override public float getProgress() { - return getQueryResultsIterator().hasNext() || values.hasNext() ? 0 : 1; - } - -} diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java index 74576975a4..aaa35f8977 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -83,7 +83,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.query.Query; import org.apache.druid.query.Result; -import org.apache.druid.query.select.SelectResultValue; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.topn.TopNResultValue; import org.junit.rules.ExpectedException; @@ -455,137 +454,14 @@ private static final String GB_MONTH_EXTRACTIONS_COLUMN_NAMES = "timestamp,extract_month,$f1"; private static final String GB_MONTH_EXTRACTIONS_COLUMN_TYPES = "timestamp with local time zone,int,bigint"; - // Select query private static final String - SELECT_QUERY = - "{ \"queryType\": \"select\", " - + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " - + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\"," - + "\"newpage\",\"user\"], " - + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " - + " \"granularity\": \"all\", " - + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " - + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }"; - - // Select query results - private static final String - SELECT_QUERY_RESULTS = - "[{ " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"result\" : { " - + " \"pagingIdentifiers\" : { " - + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, " - + " \"events\" : [ { " - + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00" - + ".000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 0, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : 1, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"11._korpus_(NOVJ)\", " - + " \"language\" : \"sl\", " - + " \"newpage\" : \"0\", " - + " \"user\" : \"EmausBot\", " - + " \"count\" : 1.0, " - + " \"added\" : 39.0, " - + " \"delta\" : 39.0, " - + " \"variation\" : 39.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 1, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"112_U.S._580\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 2, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._243\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 77.0, " - + " \"delta\" : 77.0, " - + " \"variation\" : 77.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 3, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._73\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 4, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._756\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 68.0, " - + " \"delta\" : 68.0, " - + " \"variation\" : 68.0, " - + " \"deleted\" : 0.0 " - + " } " - + " } ] }} ]"; - - // Select query results as records (types defined by metastore) - private static final String - SELECT_COLUMN_NAMES = + SCAN_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted"; private static final String - SELECT_COLUMN_TYPES = + SCAN_COLUMN_TYPES = "timestamp with local time zone,boolean,string,string,string,string,string,string,string,double,double,float," + "float,float"; - private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{ + private static final Object[][] SCAN_QUERY_RESULTS_RECORDS = new Object[][]{ new Object[]{ (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE, "article", @@ -691,12 +567,6 @@ GB_MONTH_EXTRACTIONS_RESULTS, new TypeReference>() { })); - selectQueryResults = - DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue( - SELECT_QUERY_RESULTS, - new TypeReference>>() { - })); - scanQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue( SCAN_QUERY_RESULTS, @@ -765,15 +635,11 @@ GB_MONTH_EXTRACTIONS, groupByMonthExtractQueryResults, GB_MONTH_EXTRACTION_RESULTS_RECORDS); - // Select query - tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, selectQueryResults, SELECT_QUERY_RESULTS_RECORDS); // Scan query -- results should be same as select query - tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES); + tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SCAN_COLUMN_NAMES, SCAN_COLUMN_TYPES); SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SELECT_QUERY_RESULTS_RECORDS); + deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SCAN_QUERY_RESULTS_RECORDS); } private static Properties createPropertiesQuery(String dataSource, diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index 91b5f8bc29..abcdb75342 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -160,6 +160,7 @@ null, indexSpec, null, + null, 0, 0, null, @@ -172,7 +173,7 @@ @Override public File getStorageDirectory() { return segmentOutputDir; } - }, objectMapper); + }); Path segmentDescriptorPath = diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 6da72733e7..2d29923b49 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -37,6 +37,7 @@ ../.. + 1.15.0 4.0.0 1.19.3 9.4.10.v20180503 @@ -44,6 +45,7 @@ 16.0.1 4.1.0 2.0.0 + 4.1.0 1.7.30 @@ -156,6 +158,26 @@ curator-recipes ${druid.curator.version} + + org.apache.calcite.avatica + avatica + ${avatica.version} + + + org.apache.calcite.avatica + avatica-core + ${avatica.version} + + + org.apache.calcite.avatica + avatica-metrics + ${avatica.version} + + + org.apache.calcite.avatica + avatica-server + ${avatica.version} + com.sun.jersey jersey-bundle diff --git a/pom.xml b/pom.xml index 579e74568f..17ef6b57a4 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 10.14.1.0 3.1.0 0.1.2 - 0.14.0-incubating + 0.17.1 1.6.0.1 19.0 2.4.11 diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out index f6a417b6c3..2331fba13e 100644 --- a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out @@ -120,7 +120,7 @@ POSTHOOK: query: Select page FROM druid_kafka_test_delimited POSTHOOK: type: QUERY POSTHOOK: Input: default@druid_kafka_test_delimited POSTHOOK: Output: hdfs://### HDFS PATH ### - "Gypsy Danger" +"Gypsy Danger" "Striker Eureka" "Cherno Alpha" "Crimson Typhoon" -- 2.20.1 (Apple Git-117)