From 9adbb414c018f7c7cf36bee2fadeaa663f21cea1 Mon Sep 17 00:00:00 2001 From: Nishant Bangarwa Date: Mon, 13 Apr 2020 22:29:26 +0530 Subject: [PATCH] [HIVE-23184] Upgrade druid to 0.17.1 --- data/scripts/kafka_init_data.csv | 2 +- druid-handler/pom.xml | 6 + .../hadoop/hive/druid/DruidKafkaUtils.java | 44 +-- .../hive/druid/DruidStorageHandler.java | 61 ++-- .../hive/druid/DruidStorageHandlerUtils.java | 63 ++-- .../hive/druid/io/DruidOutputFormat.java | 22 +- .../druid/io/DruidQueryBasedInputFormat.java | 58 +-- .../hive/druid/io/DruidRecordWriter.java | 10 +- .../json/KafkaIndexTaskTuningConfig.java | 136 +++++++ .../hive/druid/json/KafkaSupervisorSpec.java | 20 +- .../json/KafkaSupervisorTuningConfig.java | 184 +++------- .../hive/druid/json/KafkaTuningConfig.java | 307 ---------------- .../SeekableStreamIndexTaskTuningConfig.java | 332 ++++++++++++++++++ .../SeekableStreamSupervisorTuningConfig.java | 71 ++++ .../serde/DruidSelectQueryRecordReader.java | 92 ----- .../hive/druid/TestDruidStorageHandler.java | 8 +- .../TestHiveDruidQueryBasedInputFormat.java | 44 --- .../hive/druid/serde/TestDruidSerDe.java | 159 +-------- .../hive/ql/io/TestDruidRecordWriter.java | 21 +- itests/qtest-druid/pom.xml | 22 ++ pom.xml | 2 +- .../druid/druidkafkamini_delimited.q.out | 2 +- ...uidmini_semijoin_reduction_all_types.q.out | 24 +- 23 files changed, 724 insertions(+), 966 deletions(-) create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java delete mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java delete mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv index 5dc094ed21..d818144115 100644 --- a/data/scripts/kafka_init_data.csv +++ b/data/scripts/kafka_init_data.csv @@ -1,4 +1,4 @@ -"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-08-31T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 "2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 "2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 "2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index 7d5c3f4ea1..84312abac5 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -260,6 +260,12 @@ + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + test + diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java index b56d48aa4f..85d62416b3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java @@ -29,18 +29,13 @@ import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.hive.druid.json.AvroParseSpec; -import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; -import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; -import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; +import org.apache.hadoop.hive.druid.json.*; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -78,17 +73,14 @@ static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxBytesInMemory"), DruidStorageHandlerUtils.getIntegerProperty(table, - DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), - DruidStorageHandlerUtils.getLongProperty(table, - DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), DruidStorageHandlerUtils + .getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), - null, + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null, // basePersistDirectory - use druid default, no need to be configured by user - DruidStorageHandlerUtils.getIntegerProperty(table, - DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), - indexSpec, - null, + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), + indexSpec, null, null, // buildV9Directly - use druid default, no need to be configured by user DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), @@ -96,9 +88,8 @@ static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), - TmpFileSegmentWriteOutMediumFactory.instance(), - DruidStorageHandlerUtils.getIntegerProperty(table, - DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), + TmpFileSegmentWriteOutMediumFactory.instance(), DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), DruidStorageHandlerUtils.getLongProperty(table, @@ -161,14 +152,11 @@ static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec String task = JSON_MAPPER.writeValueAsString(spec); CONSOLE.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); - FullResponseHolder - response = - DruidStorageHandlerUtils.getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), - new Request(HttpMethod.POST, - new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))).setContent( - "application/json", - JSON_MAPPER.writeValueAsBytes(spec)), - new FullResponseHandler(Charset.forName("UTF-8"))); + StringFullResponseHolder response = DruidStorageHandlerUtils + .getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.POST, + new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) + .setContent("application/json", JSON_MAPPER.writeValueAsBytes(spec)), + new StringFullResponseHandler(Charset.forName("UTF-8"))); if (response.getStatus().equals(HttpResponseStatus.OK)) { String msg = diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index fe55eff222..beaf249d34 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -39,8 +39,8 @@ import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; @@ -50,6 +50,7 @@ import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig; import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector; import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig; +import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.Query; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -365,15 +366,12 @@ private void updateKafkaIngestion(Table table) { private void resetKafkaIngestion(String overlordAddress, String dataSourceName) { try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), - new Request(HttpMethod.POST, - new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", - overlordAddress, - dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), - input -> input instanceof IOException, + new Request(HttpMethod.POST, new URL( + String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, dataSourceName))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { CONSOLE.printInfo("Druid Kafka Ingestion Reset successful."); @@ -389,15 +387,12 @@ private void resetKafkaIngestion(String overlordAddress, String dataSourceName) private void stopKafkaIngestion(String overlordAddress, String dataSourceName) { try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), - new Request(HttpMethod.POST, - new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", - overlordAddress, - dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), - input -> input instanceof IOException, + new Request(HttpMethod.POST, new URL( + String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, dataSourceName))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { CONSOLE.printInfo("Druid Kafka Ingestion shutdown successful."); @@ -423,13 +418,12 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), - input -> input instanceof IOException, + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { return JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorSpec.class); @@ -465,15 +459,12 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - FullResponseHolder + StringFullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), - new Request(HttpMethod.GET, - new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/status", - overlordAddress, - dataSourceName))), - new FullResponseHandler(Charset.forName("UTF-8"))), - input -> input instanceof IOException, + new Request(HttpMethod.GET, new URL( + String.format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, dataSourceName))), + new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { return DruidStorageHandlerUtils.JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorReport.class); @@ -546,9 +537,8 @@ private void checkLoadStatus(List segments) { coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, new URL(String.format("http://%s/status", coordinatorAddress))), - new FullResponseHandler(Charset.forName("UTF-8"))).getContent(), - input -> input instanceof IOException, - maxTries); + new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(), + input -> input instanceof IOException, maxTries); } catch (Exception e) { CONSOLE.printInfo("Will skip waiting for data loading, coordinator unavailable"); return; @@ -578,11 +568,9 @@ private void checkLoadStatus(List segments) { while (numRetries++ < maxTries && !urlsOfUnloadedSegments.isEmpty()) { urlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(urlsOfUnloadedSegments, input -> { try { - String - result = - DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(), - new Request(HttpMethod.GET, input), - new FullResponseHandler(Charset.forName("UTF-8"))).getContent(); + String result = DruidStorageHandlerUtils + .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input), + new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(); LOG.debug("Checking segment [{}] response is [{}]", input, result); return Strings.isNullOrEmpty(result); @@ -878,9 +866,8 @@ private SQLMetadataConnector buildConnector() { case "postgresql": connector = new PostgreSQLConnector(storageConnectorConfigSupplier, - Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), - new PostgreSQLConnectorConfig() - ); + Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new PostgreSQLConnectorConfig(), + new PostgreSQLTablesConfig()); break; case "derby": diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 1d7009b5af..a1b4af8c85 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -33,6 +33,7 @@ import com.google.common.collect.Ordering; import org.apache.calcite.adapter.druid.DruidQuery; import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.guice.BloomFilterSerializersModule; @@ -46,9 +47,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.FullResponseHandler; -import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; @@ -68,6 +69,7 @@ import org.apache.druid.query.expression.TimestampParseExprMacro; import org.apache.druid.query.expression.TimestampShiftExprMacro; import org.apache.druid.query.expression.TrimExprMacro; +import org.apache.druid.query.extraction.StringFormatExtractionFn; import org.apache.druid.query.filter.AndDimFilter; import org.apache.druid.query.filter.BloomDimFilter; import org.apache.druid.query.filter.BloomKFilter; @@ -79,8 +81,6 @@ import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.SelectQuery; -import org.apache.druid.query.select.SelectQueryConfig; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.TopNQuery; @@ -225,25 +225,18 @@ private DruidStorageHandlerUtils() { private static final int DEFAULT_MAX_TRIES = 10; static { + // This is needed to initliaze NullHandling for druid without guice. + NullHandling.initializeForTests(); // This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig - InjectableValues.Std - injectableValues = - new InjectableValues.Std().addValue(SelectQueryConfig.class, new SelectQueryConfig(false)) - // Expressions macro table used when we deserialize the query from calcite plan - .addValue(ExprMacroTable.class, - new ExprMacroTable(ImmutableList.of(new LikeExprMacro(), - new RegexpExtractExprMacro(), - new TimestampCeilExprMacro(), - new TimestampExtractExprMacro(), - new TimestampFormatExprMacro(), - new TimestampParseExprMacro(), - new TimestampShiftExprMacro(), - new TimestampFloorExprMacro(), - new TrimExprMacro.BothTrimExprMacro(), - new TrimExprMacro.LeftTrimExprMacro(), - new TrimExprMacro.RightTrimExprMacro()))) - .addValue(ObjectMapper.class, JSON_MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + InjectableValues.Std injectableValues = new InjectableValues.Std() + // Expressions macro table used when we deserialize the query from calcite plan + .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList + .of(new LikeExprMacro(), new RegexpExtractExprMacro(), new TimestampCeilExprMacro(), + new TimestampExtractExprMacro(), new TimestampFormatExprMacro(), new TimestampParseExprMacro(), + new TimestampShiftExprMacro(), new TimestampFloorExprMacro(), new TrimExprMacro.BothTrimExprMacro(), + new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro()))) + .addValue(ObjectMapper.class, JSON_MAPPER) + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); @@ -331,10 +324,9 @@ public static InputStream submitRequest(HttpClient client, Request request) thro } - static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, - Request request, - FullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException { - FullResponseHolder responseHolder = client.go(request, fullResponseHandler).get(); + static StringFullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request, + StringFullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException { + StringFullResponseHolder responseHolder = client.go(request, fullResponseHandler).get(); if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) { String redirectUrlStr = responseHolder.getResponse().headers().get("Location"); LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); @@ -342,9 +334,9 @@ static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, try { redirectUrl = new URL(redirectUrlStr); } catch (MalformedURLException ex) { - throw new ExecutionException(String.format( - "Malformed redirect location is found in response from url[%s], new location[%s].", - request.getUrl(), + throw new ExecutionException(String + .format("Malformed redirect location is found in response from url[%s], new location[%s].", + request.getUrl(), redirectUrlStr), ex); } responseHolder = client.go(withUrl(request, redirectUrl), fullResponseHandler).get(); @@ -638,12 +630,11 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, Pa } public static String createScanAllQuery(String dataSourceName, List columns) throws JsonProcessingException { - final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder(); + final Druids.ScanQueryBuilder scanQueryBuilder = Druids.newScanQueryBuilder(); final List intervals = Collections.singletonList(DEFAULT_INTERVAL); ScanQuery scanQuery = - scanQueryBuilder.dataSource(dataSourceName) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + scanQueryBuilder.dataSource(dataSourceName).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .intervals(new MultipleIntervalSegmentSpec(intervals)) .columns(columns) .build(); @@ -977,11 +968,7 @@ public static IndexSpec getIndexSpec(Configuration jc) { .setVirtualColumns(VirtualColumns.create(virtualColumns)).build(); break; case org.apache.druid.query.Query.SCAN: - rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query).filters(filter) - .virtualColumns(VirtualColumns.create(virtualColumns)).build(); - break; - case org.apache.druid.query.Query.SELECT: - rv = Druids.SelectQueryBuilder.copy((SelectQuery) query).filters(filter) + rv = Druids.ScanQueryBuilder.copy((ScanQuery) query).filters(filter) .virtualColumns(VirtualColumns.create(virtualColumns)).build(); break; default: @@ -1140,8 +1127,6 @@ public static VirtualColumns getVirtualColumns(org.apache.druid.query.Query quer return ((GroupByQuery) query).getVirtualColumns(); case org.apache.druid.query.Query.SCAN: return ((ScanQuery) query).getVirtualColumns(); - case org.apache.druid.query.Query.SELECT: - return ((SelectQuery) query).getVirtualColumns(); default: throw new UnsupportedOperationException("Unsupported Query type " + query); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 6cf3ef2562..d90db9cbda 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -152,25 +152,9 @@ Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY); IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc); - RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, - null, - null, - null, - new File(basePersistDirectory, dataSource), - new CustomVersioningPolicy(version), - null, - null, - null, - indexSpec, - true, - 0, - 0, - true, - null, - 0L, - null, - null - ); + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, null, null, null, + new File(basePersistDirectory, dataSource), new CustomVersioningPolicy(version), null, null, null, indexSpec, + null, true, 0, 0, true, null, 0L, null, null); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); return new DruidRecordWriter(dataSchema, realtimeTuningConfig, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 82a1f11f3f..b66c7575e5 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -25,8 +25,6 @@ import org.apache.druid.query.Query; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.select.PagingSpec; -import org.apache.druid.query.select.SelectQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; @@ -37,13 +35,7 @@ import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; -import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.druid.serde.*; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; @@ -88,8 +80,6 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) return new DruidTopNQueryRecordReader(); case Query.GROUP_BY: return new DruidGroupByQueryRecordReader(); - case Query.SELECT: - return new DruidSelectQueryRecordReader(); case Query.SCAN: return new DruidScanQueryRecordReader(); default: @@ -152,9 +142,6 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) case Query.TOPN: case Query.GROUP_BY: return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], new String[] {address})}; - case Query.SELECT: - SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); - return distributeSelectQuery(address, selectQuery, paths[0]); case Query.SCAN: ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class); return distributeScanQuery(address, scanQuery, paths[0]); @@ -163,54 +150,13 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } } - /* New method that distributes the Select query by creating splits containing - * information about different Druid nodes that have the data for the given - * query. */ - private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath) - throws IOException { - // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false); - if (isFetch) { - return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), - dummyPath, - new String[] {address})}; - } - - final List segmentDescriptors = fetchLocatedSegmentDescriptors(address, query); - - // Create one input split for each segment - final int numSplits = segmentDescriptors.size(); - final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; - for (int i = 0; i < numSplits; i++) { - final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); - final String[] hosts = new String[locatedSD.getLocations().size()]; - for (int j = 0; j < locatedSD.getLocations().size(); j++) { - hosts[j] = locatedSD.getLocations().get(j).getHost(); - } - // Create partial Select query - final SegmentDescriptor - newSD = - new SegmentDescriptor(locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); - //@TODO This is fetching all the rows at once from broker or multiple historical nodes - // Move to use scan query to avoid GC back pressure on the nodes - // https://issues.apache.org/jira/browse/HIVE-17627 - final SelectQuery - partialQuery = - query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))) - .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); - splits[i] = - new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts); - } - return splits; - } - /* New method that distributes the Scan query by creating splits containing * information about different Druid nodes that have the data for the given * query. */ private static HiveDruidSplit[] distributeScanQuery(String address, ScanQuery query, Path dummyPath) throws IOException { // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getLimit() < Long.MAX_VALUE; + final boolean isFetch = query.getScanRowsLimit() < Long.MAX_VALUE; if (isFetch) { return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 248b59aae6..dc16c4e3f7 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -103,13 +103,9 @@ public DruidRecordWriter(DataSchema dataSchema, "realtimeTuningConfig is null"); this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null"); - appenderator = - Appenderators.createOffline(this.dataSchema, - tuningConfig, - new FireDepartmentMetrics(), - dataSegmentPusher, - DruidStorageHandlerUtils.JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO, + appenderator = Appenderators + .createOffline("hive-offline-appenderator", this.dataSchema, tuningConfig, false, new FireDepartmentMetrics(), + dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9); this.maxPartitionSize = maxPartitionSize; appenderator.startJob(); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java new file mode 100644 index 0000000000..4220cf1423 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig +{ + @JsonCreator + public KafkaIndexTaskTuningConfig( + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + maxRowsInMemory, + maxBytesInMemory, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + true, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new KafkaIndexTaskTuningConfig( + getMaxRowsInMemory(), + getMaxBytesInMemory(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + true, + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + + + @Override + public String toString() + { + return "KafkaIndexTaskTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", basePersistDirectory=" + getBasePersistDirectory() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; + } + +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java index d230832243..18177b6640 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java @@ -52,24 +52,8 @@ null, null, null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null); + null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.context = context; } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java index b4d38b97f4..e0d7efe568 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java @@ -35,8 +35,10 @@ * in order to avoid adding additional dependencies on druid-indexing-service. */ @SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) }) -public class KafkaSupervisorTuningConfig extends KafkaTuningConfig { + @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) }) public class KafkaSupervisorTuningConfig + extends KafkaIndexTaskTuningConfig implements SeekableStreamSupervisorTuningConfig { + private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; + private final Integer workerThreads; private final Integer chatThreads; private final Long chatRetries; @@ -44,174 +46,98 @@ private final Duration shutdownTimeout; private final Duration offsetFetchPeriod; + public static KafkaSupervisorTuningConfig defaultConfig() { + return new KafkaSupervisorTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, null); + } + public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, - @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, - @JsonProperty("maxTotalRows") Long maxTotalRows, + @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") Long maxTotalRows, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, - @JsonProperty("maxPendingPersists") Integer maxPendingPersists, - @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @JsonProperty("workerThreads") Integer workerThreads, - @JsonProperty("chatThreads") Integer chatThreads, - @JsonProperty("chatRetries") Long chatRetries, - @JsonProperty("httpTimeout") Period httpTimeout, + @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, + @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout, @JsonProperty("shutdownTimeout") Period shutdownTimeout, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) { - super(maxRowsInMemory, - maxBytesInMemory, - maxRowsPerSegment, - maxTotalRows, - intermediatePersistPeriod, - basePersistDirectory, - maxPendingPersists, - indexSpec, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - segmentWriteOutMediumFactory, - intermediateHandoffPeriod, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions); - + super(maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, maxTotalRows, intermediatePersistPeriod, + basePersistDirectory, maxPendingPersists, indexSpec, indexSpecForIntermediatePersists, true, + reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, segmentWriteOutMediumFactory, + intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, maxSavedParseExceptions); this.workerThreads = workerThreads; this.chatThreads = chatThreads; - this.chatRetries = (chatRetries != null ? chatRetries : 8); - this.httpTimeout = defaultDuration(httpTimeout, "PT10S"); - this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S"); - this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S"); + this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); + this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT); + this.shutdownTimeout = + SeekableStreamSupervisorTuningConfig.defaultDuration(shutdownTimeout, DEFAULT_SHUTDOWN_TIMEOUT); + this.offsetFetchPeriod = + SeekableStreamSupervisorTuningConfig.defaultDuration(offsetFetchPeriod, DEFAULT_OFFSET_FETCH_PERIOD); } - @JsonProperty public Integer getWorkerThreads() { + @Override @JsonProperty public Integer getWorkerThreads() { return workerThreads; } - @JsonProperty public Integer getChatThreads() { + @Override @JsonProperty public Integer getChatThreads() { return chatThreads; } - @JsonProperty public Long getChatRetries() { + @Override @JsonProperty public Long getChatRetries() { return chatRetries; } - @JsonProperty public Duration getHttpTimeout() { + @Override @JsonProperty public Duration getHttpTimeout() { return httpTimeout; } - @JsonProperty public Duration getShutdownTimeout() { + @Override @JsonProperty public Duration getShutdownTimeout() { return shutdownTimeout; } + @Override public Duration getRepartitionTransitionDuration() { + // Stopping tasks early for Kafka ingestion on partition set change is not supported yet, + // just return a default for now. + return SeekableStreamSupervisorTuningConfig + .defaultDuration(null, SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION); + } + @JsonProperty public Duration getOffsetFetchPeriod() { return offsetFetchPeriod; } @Override public String toString() { - return "KafkaSupervisorTuningConfig{" - + "maxRowsInMemory=" - + getMaxRowsInMemory() - + ", maxRowsPerSegment=" - + getMaxRowsPerSegment() - + ", maxTotalRows=" - + getMaxTotalRows() - + ", maxBytesInMemory=" - + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) - + ", intermediatePersistPeriod=" - + getIntermediatePersistPeriod() - + ", basePersistDirectory=" - + getBasePersistDirectory() - + ", maxPendingPersists=" - + getMaxPendingPersists() - + ", indexSpec=" - + getIndexSpec() - + ", reportParseExceptions=" - + isReportParseExceptions() - + ", handoffConditionTimeout=" - + getHandoffConditionTimeout() - + ", resetOffsetAutomatically=" - + isResetOffsetAutomatically() - + ", segmentWriteOutMediumFactory=" - + getSegmentWriteOutMediumFactory() - + ", workerThreads=" - + workerThreads - + ", chatThreads=" - + chatThreads - + ", chatRetries=" - + chatRetries - + ", httpTimeout=" - + httpTimeout - + ", shutdownTimeout=" - + shutdownTimeout - + ", offsetFetchPeriod=" - + offsetFetchPeriod - + ", intermediateHandoffPeriod=" - + getIntermediateHandoffPeriod() - + ", logParseExceptions=" - + isLogParseExceptions() - + ", maxParseExceptions=" - + getMaxParseExceptions() - + ", maxSavedParseExceptions=" - + getMaxSavedParseExceptions() - + '}'; - } - - private static Duration defaultDuration(final Period period, final String theDefault) { - return (period == null ? new Period(theDefault) : period).toStandardDuration(); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - - KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o; - - if (workerThreads != null ? !workerThreads.equals(that.workerThreads) : that.workerThreads != null) { - return false; - } - if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null) { - return false; - } - if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null) { - return false; - } - if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null) { - return false; - } - if (shutdownTimeout != null ? !shutdownTimeout.equals(that.shutdownTimeout) : that.shutdownTimeout != null) { - return false; - } - return offsetFetchPeriod != null ? - offsetFetchPeriod.equals(that.offsetFetchPeriod) : - that.offsetFetchPeriod == null; + return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment=" + + getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" + TuningConfigs + .getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod=" + + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory() + + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec() + + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout=" + + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + ", workerThreads=" + workerThreads + + ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout + + ", shutdownTimeout=" + shutdownTimeout + ", offsetFetchPeriod=" + offsetFetchPeriod + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", logParseExceptions=" + + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + + getMaxSavedParseExceptions() + '}'; } - @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0); - result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0); - result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0); - result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0); - result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0); - result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0); - return result; + @Override public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() { + return new KafkaIndexTaskTuningConfig(getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(), + getMaxTotalRows(), getIntermediatePersistPeriod(), getBasePersistDirectory(), getMaxPendingPersists(), + getIndexSpec(), getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), + getHandoffConditionTimeout(), isResetOffsetAutomatically(), getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions()); } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java deleted file mode 100644 index 45ac77ba35..0000000000 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.druid.json; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfig; -import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; -import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.joda.time.Period; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.Objects; - -/** - * This class is copied from druid source code - * in order to avoid adding additional dependencies on druid-indexing-service. - */ -public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; - private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; - - private final int maxRowsInMemory; - private final long maxBytesInMemory; - private final int maxRowsPerSegment; - @Nullable private final Long maxTotalRows; - private final Period intermediatePersistPeriod; - private final File basePersistDirectory; - @Deprecated private final int maxPendingPersists; - private final IndexSpec indexSpec; - private final boolean reportParseExceptions; - @Deprecated private final long handoffConditionTimeout; - private final boolean resetOffsetAutomatically; - @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; - private final Period intermediateHandoffPeriod; - - private final boolean logParseExceptions; - private final int maxParseExceptions; - private final int maxSavedParseExceptions; - - @JsonCreator public KafkaTuningConfig(@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, - @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, - @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, - @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, - @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, - @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, - // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. - @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, - @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, - @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, - @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, - @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) { - // Cannot be a static because default basePersistDirectory is unique per-instance - final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); - - this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; - this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; - // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) - this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; - this.maxTotalRows = maxTotalRows; - this.intermediatePersistPeriod = - intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod; - this.basePersistDirectory = defaults.getBasePersistDirectory(); - this.maxPendingPersists = 0; - this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; - this.reportParseExceptions = - reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; - this.handoffConditionTimeout = - handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout; - this.resetOffsetAutomatically = - resetOffsetAutomatically == null ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; - this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; - this.intermediateHandoffPeriod = - intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod; - - if (this.reportParseExceptions) { - this.maxParseExceptions = 0; - this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); - } else { - this.maxParseExceptions = - maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions; - this.maxSavedParseExceptions = - maxSavedParseExceptions == null ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS : maxSavedParseExceptions; - } - this.logParseExceptions = - logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; - } - - public static KafkaTuningConfig copyOf(KafkaTuningConfig config) { - return new KafkaTuningConfig(config.maxRowsInMemory, - config.maxBytesInMemory, - config.maxRowsPerSegment, - config.maxTotalRows, - config.intermediatePersistPeriod, - config.basePersistDirectory, - config.maxPendingPersists, - config.indexSpec, - true, - config.reportParseExceptions, - config.handoffConditionTimeout, - config.resetOffsetAutomatically, - config.segmentWriteOutMediumFactory, - config.intermediateHandoffPeriod, - config.logParseExceptions, - config.maxParseExceptions, - config.maxSavedParseExceptions); - } - - @Override @JsonProperty public int getMaxRowsInMemory() { - return maxRowsInMemory; - } - - @Override @JsonProperty public long getMaxBytesInMemory() { - return maxBytesInMemory; - } - - @Override @JsonProperty public Integer getMaxRowsPerSegment() { - return maxRowsPerSegment; - } - - @JsonProperty @Override @Nullable public Long getMaxTotalRows() { - return maxTotalRows; - } - - @Override @JsonProperty public Period getIntermediatePersistPeriod() { - return intermediatePersistPeriod; - } - - @Override @JsonProperty public File getBasePersistDirectory() { - return basePersistDirectory; - } - - @Override @JsonProperty @Deprecated public int getMaxPendingPersists() { - return maxPendingPersists; - } - - @Override @JsonProperty public IndexSpec getIndexSpec() { - return indexSpec; - } - - /** - * Always returns true, doesn't affect the version being built. - */ - @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() { - return true; - } - - @Override @JsonProperty public boolean isReportParseExceptions() { - return reportParseExceptions; - } - - @JsonProperty public long getHandoffConditionTimeout() { - return handoffConditionTimeout; - } - - @JsonProperty public boolean isResetOffsetAutomatically() { - return resetOffsetAutomatically; - } - - @Override @JsonProperty @Nullable public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { - return segmentWriteOutMediumFactory; - } - - @JsonProperty public Period getIntermediateHandoffPeriod() { - return intermediateHandoffPeriod; - } - - @JsonProperty public boolean isLogParseExceptions() { - return logParseExceptions; - } - - @JsonProperty public int getMaxParseExceptions() { - return maxParseExceptions; - } - - @JsonProperty public int getMaxSavedParseExceptions() { - return maxSavedParseExceptions; - } - - public KafkaTuningConfig withBasePersistDirectory(File dir) { - return new KafkaTuningConfig(maxRowsInMemory, - maxBytesInMemory, - maxRowsPerSegment, - maxTotalRows, - intermediatePersistPeriod, - dir, - maxPendingPersists, - indexSpec, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - segmentWriteOutMediumFactory, - intermediateHandoffPeriod, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KafkaTuningConfig that = (KafkaTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory - && maxRowsPerSegment == that.maxRowsPerSegment - && maxBytesInMemory == that.maxBytesInMemory - && Objects.equals(maxTotalRows, that.maxTotalRows) - && maxPendingPersists == that.maxPendingPersists - && reportParseExceptions == that.reportParseExceptions - && handoffConditionTimeout == that.handoffConditionTimeout - && resetOffsetAutomatically == that.resetOffsetAutomatically - && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) - && Objects.equals(basePersistDirectory, that.basePersistDirectory) - && Objects.equals(indexSpec, that.indexSpec) - && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) - && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod) - && logParseExceptions == that.logParseExceptions - && maxParseExceptions == that.maxParseExceptions - && maxSavedParseExceptions == that.maxSavedParseExceptions; - } - - @Override public int hashCode() { - return Objects.hash(maxRowsInMemory, - maxRowsPerSegment, - maxBytesInMemory, - maxTotalRows, - intermediatePersistPeriod, - basePersistDirectory, - maxPendingPersists, - indexSpec, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - segmentWriteOutMediumFactory, - intermediateHandoffPeriod, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions); - } - - @Override public String toString() { - return "KafkaTuningConfig{" - + "maxRowsInMemory=" - + maxRowsInMemory - + ", maxRowsPerSegment=" - + maxRowsPerSegment - + ", maxTotalRows=" - + maxTotalRows - + ", maxBytesInMemory=" - + maxBytesInMemory - + ", intermediatePersistPeriod=" - + intermediatePersistPeriod - + ", basePersistDirectory=" - + basePersistDirectory - + ", maxPendingPersists=" - + maxPendingPersists - + ", indexSpec=" - + indexSpec - + ", reportParseExceptions=" - + reportParseExceptions - + ", handoffConditionTimeout=" - + handoffConditionTimeout - + ", resetOffsetAutomatically=" - + resetOffsetAutomatically - + ", segmentWriteOutMediumFactory=" - + segmentWriteOutMediumFactory - + ", intermediateHandoffPeriod=" - + intermediateHandoffPeriod - + ", logParseExceptions=" - + logParseExceptions - + ", maxParseExceptions=" - + maxParseExceptions - + ", maxSavedParseExceptions=" - + maxSavedParseExceptions - + '}'; - } -} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java new file mode 100644 index 0000000000..b7cf712221 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig +{ + private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; + private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; + + private final int maxRowsInMemory; + private final long maxBytesInMemory; + private final DynamicPartitionsSpec partitionsSpec; + private final Period intermediatePersistPeriod; + private final File basePersistDirectory; + @Deprecated + private final int maxPendingPersists; + private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; + private final boolean reportParseExceptions; + private final long handoffConditionTimeout; + private final boolean resetOffsetAutomatically; + @Nullable + private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final Period intermediateHandoffPeriod; + private final boolean skipSequenceNumberAvailabilityCheck; + + private final boolean logParseExceptions; + private final int maxParseExceptions; + private final int maxSavedParseExceptions; + + public SeekableStreamIndexTaskTuningConfig( + @Nullable Integer maxRowsInMemory, + @Nullable Long maxBytesInMemory, + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable Period intermediatePersistPeriod, + @Nullable File basePersistDirectory, + @Nullable Integer maxPendingPersists, + @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @Deprecated @Nullable Boolean reportParseExceptions, + @Nullable Long handoffConditionTimeout, + @Nullable Boolean resetOffsetAutomatically, + Boolean skipSequenceNumberAvailabilityCheck, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @Nullable Period intermediateHandoffPeriod, + @Nullable Boolean logParseExceptions, + @Nullable Integer maxParseExceptions, + @Nullable Integer maxSavedParseExceptions + ) + { + // Cannot be a static because default basePersistDirectory is unique per-instance + final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + + this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; + this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.intermediatePersistPeriod = intermediatePersistPeriod == null + ? defaults.getIntermediatePersistPeriod() + : intermediatePersistPeriod; + this.basePersistDirectory = defaults.getBasePersistDirectory(); + this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; + this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; + this.reportParseExceptions = reportParseExceptions == null + ? defaults.isReportParseExceptions() + : reportParseExceptions; + this.handoffConditionTimeout = handoffConditionTimeout == null + ? defaults.getHandoffConditionTimeout() + : handoffConditionTimeout; + this.resetOffsetAutomatically = resetOffsetAutomatically == null + ? DEFAULT_RESET_OFFSET_AUTOMATICALLY + : resetOffsetAutomatically; + this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + this.intermediateHandoffPeriod = intermediateHandoffPeriod == null + ? new Period().withDays(Integer.MAX_VALUE) + : intermediateHandoffPeriod; + this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null + ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK + : skipSequenceNumberAvailabilityCheck; + + if (this.reportParseExceptions) { + this.maxParseExceptions = 0; + this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); + } else { + this.maxParseExceptions = maxParseExceptions == null + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; + this.maxSavedParseExceptions = maxSavedParseExceptions == null + ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS + : maxSavedParseExceptions; + } + this.logParseExceptions = logParseExceptions == null + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; + } + + @Override + @JsonProperty + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @Override + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + + @Override + @JsonProperty + public Integer getMaxRowsPerSegment() + { + return partitionsSpec.getMaxRowsPerSegment(); + } + + @JsonProperty + @Override + @Nullable + public Long getMaxTotalRows() + { + return partitionsSpec.getMaxTotalRows(); + } + + @Override + public DynamicPartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + @Override + @JsonProperty + public Period getIntermediatePersistPeriod() + { + return intermediatePersistPeriod; + } + + @Override + @JsonProperty + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + + @Override + @JsonProperty + @Deprecated + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @Override + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + + /** + * Always returns true, doesn't affect the version being built. + */ + @Deprecated + @JsonProperty + public boolean getBuildV9Directly() + { + return true; + } + + @Override + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @JsonProperty + public long getHandoffConditionTimeout() + { + return handoffConditionTimeout; + } + + @JsonProperty + public boolean isResetOffsetAutomatically() + { + return resetOffsetAutomatically; + } + + @Override + @JsonProperty + @Nullable + public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() + { + return segmentWriteOutMediumFactory; + } + + @JsonProperty + public Period getIntermediateHandoffPeriod() + { + return intermediateHandoffPeriod; + } + + @JsonProperty + public boolean isLogParseExceptions() + { + return logParseExceptions; + } + + @JsonProperty + public int getMaxParseExceptions() + { + return maxParseExceptions; + } + + @JsonProperty + public int getMaxSavedParseExceptions() + { + return maxSavedParseExceptions; + } + + @JsonProperty + public boolean isSkipSequenceNumberAvailabilityCheck() + { + return skipSequenceNumberAvailabilityCheck; + } + + @Override + public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; + return maxRowsInMemory == that.maxRowsInMemory && + maxBytesInMemory == that.maxBytesInMemory && + maxPendingPersists == that.maxPendingPersists && + reportParseExceptions == that.reportParseExceptions && + handoffConditionTimeout == that.handoffConditionTimeout && + resetOffsetAutomatically == that.resetOffsetAutomatically && + skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && + Objects.equals(partitionsSpec, that.partitionsSpec) && + Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && + Objects.equals(basePersistDirectory, that.basePersistDirectory) && + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && + Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); + } + + @Override + public int hashCode() + { + return Objects.hash( + maxRowsInMemory, + maxBytesInMemory, + partitionsSpec, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + skipSequenceNumberAvailabilityCheck, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public abstract String toString(); +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java new file mode 100644 index 0000000000..a75baac7b2 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.joda.time.Duration; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public interface SeekableStreamSupervisorTuningConfig +{ + + int DEFAULT_CHAT_RETRIES = 8; + String DEFAULT_HTTP_TIMEOUT = "PT10S"; + String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S"; + String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M"; + + static Duration defaultDuration(final Period period, final String theDefault) + { + return (period == null ? new Period(theDefault) : period).toStandardDuration(); + } + + @JsonProperty + Integer getWorkerThreads(); + + @JsonProperty + Integer getChatThreads(); + + @JsonProperty + Long getChatRetries(); + + @JsonProperty + Duration getHttpTimeout(); + + @JsonProperty + Duration getShutdownTimeout(); + + @JsonProperty + Duration getRepartitionTransitionDuration(); + + SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); +} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java deleted file mode 100644 index 907558fe48..0000000000 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.druid.serde; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; - -import com.fasterxml.jackson.databind.JavaType; -import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; -import org.apache.hadoop.hive.druid.conf.DruidConstants; -import org.apache.hadoop.io.NullWritable; - -import com.fasterxml.jackson.core.type.TypeReference; - -import org.apache.druid.query.Result; -import org.apache.druid.query.select.EventHolder; -import org.apache.druid.query.select.SelectResultValue; - -/** - * Record reader for results for Druid SelectQuery. - */ -public class DruidSelectQueryRecordReader extends DruidQueryRecordReader> { - - private static final TypeReference> - TYPE_REFERENCE = - new TypeReference>() { - }; - - private Iterator values = Collections.emptyIterator(); - - @Override protected JavaType getResultTypeDef() { - return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); - } - - @Override public boolean nextKeyValue() throws IOException { - if (values.hasNext()) { - return true; - } - if (getQueryResultsIterator().hasNext()) { - Result current = getQueryResultsIterator().next(); - values = current.getValue().getEvents().iterator(); - return nextKeyValue(); - } - return false; - } - - @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { - return NullWritable.get(); - } - - @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException { - // Create new value - DruidWritable value = new DruidWritable(false); - EventHolder e = values.next(); - value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); - value.getValue().putAll(e.getEvent()); - return value; - } - - @Override public boolean next(NullWritable key, DruidWritable value) throws IOException { - if (nextKeyValue()) { - // Update value - value.getValue().clear(); - EventHolder e = values.next(); - value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); - value.getValue().putAll(e.getEvent()); - return true; - } - return false; - } - - @Override public float getProgress() { - return getQueryResultsIterator().hasNext() || values.hasNext() ? 0 : 1; - } - -} diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 0b2072c2e1..7d94f1afc9 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -93,12 +93,8 @@ private DataSegment createSegment(String location) throws IOException { private DataSegment createSegment(String location, Interval interval, String version, ShardSpec shardSpec) throws IOException { FileUtils.writeStringToFile(new File(location), "dummySegmentData"); - return DataSegment.builder() - .dataSource(DATA_SOURCE_NAME) - .version(version) - .interval(interval) - .shardSpec(shardSpec) - .loadSpec(ImmutableMap.of("path", location)) + return DataSegment.builder().dataSource(DATA_SOURCE_NAME).version(version).interval(interval).shardSpec(shardSpec) + .loadSpec(ImmutableMap.of("path", location)).size(1000L) .build(); } diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java index 58f4a443fc..2bcbb14f9f 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java @@ -141,47 +141,6 @@ + "\"context\":{\"queryId\":\"\"}," + "\"descending\":false}, [localhost:8082]}]"; - private static final String - SELECT_QUERY = - "{ \"queryType\": \"select\", " - + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " - + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\"," - + "\"newpage\",\"user\"], " - + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " - + " \"granularity\": \"all\", " - + " \"intervals\": [ \"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\" ], " - + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, " - + " \"context\":{\"druid.query.fetch\":true}}"; - private static final String - SELECT_QUERY_SPLIT = - "[HiveDruidSplit{{\"queryType\":\"select\"," - + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"}," - + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T08:00:00" - + ".000Z/2013-01-02T08:00:00.000Z\"]}," - + "\"descending\":false," - + "\"filter\":null," - + "\"granularity\":{\"type\":\"all\"}," - + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\"," - + "\"outputType\":\"STRING\"}," - + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\"," - + "\"outputType\":\"STRING\"}]," - + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"]," - + "\"virtualColumns\":[]," - + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false}," - + "\"context\":{\"druid.query.fetch\":true,\"queryId\":\"\"}}, [localhost:8082]}]"; - @Test public void testTimeZone() throws Exception { DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat(); @@ -202,9 +161,6 @@ public void testTimeZone() throws Exception { resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); assertEquals(GROUP_BY_QUERY_SPLIT, Arrays.toString(resultSplits)); - conf = createPropertiesQuery("sample_datasource", Query.SELECT, SELECT_QUERY); - resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf); - assertEquals(SELECT_QUERY_SPLIT, Arrays.toString(resultSplits)); } private static Configuration createPropertiesQuery(String dataSource, String queryType, String jsonQuery) { diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java index 74576975a4..009782955e 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -83,7 +83,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.query.Query; import org.apache.druid.query.Result; -import org.apache.druid.query.select.SelectResultValue; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.topn.TopNResultValue; import org.junit.rules.ExpectedException; @@ -455,147 +454,15 @@ private static final String GB_MONTH_EXTRACTIONS_COLUMN_NAMES = "timestamp,extract_month,$f1"; private static final String GB_MONTH_EXTRACTIONS_COLUMN_TYPES = "timestamp with local time zone,int,bigint"; - // Select query - private static final String - SELECT_QUERY = - "{ \"queryType\": \"select\", " - + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", " - + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\"," - + "\"newpage\",\"user\"], " - + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], " - + " \"granularity\": \"all\", " - + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], " - + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }"; - - // Select query results - private static final String - SELECT_QUERY_RESULTS = - "[{ " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"result\" : { " - + " \"pagingIdentifiers\" : { " - + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, " - + " \"events\" : [ { " - + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00" - + ".000Z_2013-01-10T08:13:47.830Z_v9\", " - + " \"offset\" : 0, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : 1, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"11._korpus_(NOVJ)\", " - + " \"language\" : \"sl\", " - + " \"newpage\" : \"0\", " - + " \"user\" : \"EmausBot\", " - + " \"count\" : 1.0, " - + " \"added\" : 39.0, " - + " \"delta\" : 39.0, " - + " \"variation\" : 39.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 1, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"112_U.S._580\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 2, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._243\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 77.0, " - + " \"delta\" : 77.0, " - + " \"variation\" : 77.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 3, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._73\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 70.0, " - + " \"delta\" : 70.0, " - + " \"variation\" : 70.0, " - + " \"deleted\" : 0.0 " - + " } " - + " }, { " - + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47" - + ".830Z_v9\", " - + " \"offset\" : 4, " - + " \"event\" : { " - + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", " - + " \"robot\" : 0, " - + " \"namespace\" : \"article\", " - + " \"anonymous\" : \"0\", " - + " \"unpatrolled\" : \"0\", " - + " \"page\" : \"113_U.S._756\", " - + " \"language\" : \"en\", " - + " \"newpage\" : \"1\", " - + " \"user\" : \"MZMcBride\", " - + " \"count\" : 1.0, " - + " \"added\" : 68.0, " - + " \"delta\" : 68.0, " - + " \"variation\" : 68.0, " - + " \"deleted\" : 0.0 " - + " } " - + " } ] }} ]"; - - // Select query results as records (types defined by metastore) - private static final String - SELECT_COLUMN_NAMES = + private static final String SCAN_COLUMN_NAMES = "__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted"; - private static final String - SELECT_COLUMN_TYPES = + private static final String SCAN_COLUMN_TYPES = "timestamp with local time zone,boolean,string,string,string,string,string,string,string,double,double,float," + "float,float"; - private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{ - new Object[]{ - (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE, - "article", - "0", - "0", - "11._korpus_(NOVJ)", - "sl", - "0", - "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F }, - new Object[]{ + private static final Object[][] SCAN_QUERY_RESULTS_RECORDS = new Object[][] { + new Object[] { (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE, + "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F }, + new Object[] { (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.FALSE, "article", "0", @@ -691,12 +558,6 @@ GB_MONTH_EXTRACTIONS_RESULTS, new TypeReference>() { })); - selectQueryResults = - DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue( - SELECT_QUERY_RESULTS, - new TypeReference>>() { - })); - scanQueryResults = DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue( SCAN_QUERY_RESULTS, @@ -765,15 +626,11 @@ GB_MONTH_EXTRACTIONS, groupByMonthExtractQueryResults, GB_MONTH_EXTRACTION_RESULTS_RECORDS); - // Select query - tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, selectQueryResults, SELECT_QUERY_RESULTS_RECORDS); // Scan query -- results should be same as select query - tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES); + tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SCAN_COLUMN_NAMES, SCAN_COLUMN_TYPES); SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SELECT_QUERY_RESULTS_RECORDS); + deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SCAN_QUERY_RESULTS_RECORDS); } private static Properties createPropertiesQuery(String dataSource, diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index 91b5f8bc29..3fb7bdf6ca 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -150,29 +150,14 @@ RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(null, - null, - null, - null, - temporaryFolder.newFolder(), - null, - null, - null, - null, - indexSpec, - null, - 0, - 0, - null, - null, - 0L, - null, - null); + null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, null, 0, 0, null, + null, 0L, null, null); LocalFileSystem localFileSystem = FileSystem.getLocal(config); DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() { @Override public File getStorageDirectory() { return segmentOutputDir; } - }, objectMapper); + }); Path segmentDescriptorPath = diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 6da72733e7..2d29923b49 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -37,6 +37,7 @@ ../.. + 1.15.0 4.0.0 1.19.3 9.4.10.v20180503 @@ -44,6 +45,7 @@ 16.0.1 4.1.0 2.0.0 + 4.1.0 1.7.30 @@ -156,6 +158,26 @@ curator-recipes ${druid.curator.version} + + org.apache.calcite.avatica + avatica + ${avatica.version} + + + org.apache.calcite.avatica + avatica-core + ${avatica.version} + + + org.apache.calcite.avatica + avatica-metrics + ${avatica.version} + + + org.apache.calcite.avatica + avatica-server + ${avatica.version} + com.sun.jersey jersey-bundle diff --git a/pom.xml b/pom.xml index 579e74568f..17ef6b57a4 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 10.14.1.0 3.1.0 0.1.2 - 0.14.0-incubating + 0.17.1 1.6.0.1 19.0 2.4.11 diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out index f6a417b6c3..2331fba13e 100644 --- a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out @@ -120,7 +120,7 @@ POSTHOOK: query: Select page FROM druid_kafka_test_delimited POSTHOOK: type: QUERY POSTHOOK: Input: default@druid_kafka_test_delimited POSTHOOK: Output: hdfs://### HDFS PATH ### - "Gypsy Danger" +"Gypsy Danger" "Striker Eureka" "Cherno Alpha" "Crimson Typhoon" diff --git a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out index 25abb74a0e..0dd2295b73 100644 --- a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out +++ b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out @@ -179,7 +179,7 @@ STAGE PLANS: properties: druid.fieldNames cstring1 druid.fieldTypes string - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstring1_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cstring1_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cstring1","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cstring1","value":null,"extractionFn":null}}]},"columns":["cstring1"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstring1_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cstring1_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cstring1","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cstring1","value":null,"extractionFn":null}}]},"columns":["cstring1"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -340,7 +340,7 @@ STAGE PLANS: properties: druid.fieldNames ctinyint druid.fieldTypes tinyint - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctinyint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_ctinyint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"ctinyint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"ctinyint","value":null,"extractionFn":null}}]},"columns":["ctinyint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctinyint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_ctinyint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"ctinyint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"ctinyint","value":null,"extractionFn":null}}]},"columns":["ctinyint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -501,7 +501,7 @@ STAGE PLANS: properties: druid.fieldNames csmallint druid.fieldTypes smallint - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csmallint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_csmallint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"csmallint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"csmallint","value":null,"extractionFn":null}}]},"columns":["csmallint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csmallint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_csmallint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"csmallint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"csmallint","value":null,"extractionFn":null}}]},"columns":["csmallint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -662,7 +662,7 @@ STAGE PLANS: properties: druid.fieldNames cint druid.fieldTypes int - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cint","value":null,"extractionFn":null}}]},"columns":["cint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cint","value":null,"extractionFn":null}}]},"columns":["cint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -823,7 +823,7 @@ STAGE PLANS: properties: druid.fieldNames cbigint druid.fieldTypes bigint - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cbigint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cbigint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cbigint","value":null,"extractionFn":null}}]},"columns":["cbigint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cbigint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cbigint","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cbigint","value":null,"extractionFn":null}}]},"columns":["cbigint"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -984,7 +984,7 @@ STAGE PLANS: properties: druid.fieldNames cfloat druid.fieldTypes float - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1145,7 +1145,7 @@ STAGE PLANS: properties: druid.fieldNames cdouble druid.fieldTypes double - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cdouble_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cdouble","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cdouble","value":null,"extractionFn":null}}]},"columns":["cdouble"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cdouble_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"numeric"}},{"type":"bloom","dimension":"cdouble","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cdouble","value":null,"extractionFn":null}}]},"columns":["cdouble"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1295,7 +1295,7 @@ STAGE PLANS: properties: druid.fieldNames vc druid.fieldTypes timestamp with local time zone - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":null,"columns":["vc"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":null,"columns":["vc"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 348640 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1456,7 +1456,7 @@ STAGE PLANS: properties: druid.fieldNames cboolean1 druid.fieldTypes boolean - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1617,7 +1617,7 @@ STAGE PLANS: properties: druid.fieldNames cintstring druid.fieldTypes string - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cintstring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cintstring","value":null,"extractionFn":null}}]},"columns":["cintstring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cint_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cintstring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cintstring","value":null,"extractionFn":null}}]},"columns":["cintstring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1778,7 +1778,7 @@ STAGE PLANS: properties: druid.fieldNames cdoublestring druid.fieldTypes string - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cdouble_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cdoublestring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cdoublestring","value":null,"extractionFn":null}}]},"columns":["cdoublestring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cdouble_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cdoublestring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cdoublestring","value":null,"extractionFn":null}}]},"columns":["cdoublestring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE Filter Operator @@ -1939,7 +1939,7 @@ STAGE PLANS: properties: druid.fieldNames cfloatstring druid.fieldTypes string - druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_cfloat_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cfloat_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cfloatstring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cfloatstring","value":null,"extractionFn":null}}]},"columns":["cfloatstring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} + druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_cfloat_min)","upper":"DynamicValue(RS_4_alltypesorc_small_cfloat_max)","lowerStrict":false,"upperStrict":false,"extractionFn":null,"ordering":{"type":"lexicographic"}},{"type":"bloom","dimension":"cfloatstring","bloomKFilter":"BAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","extractionFn":null}]},{"type":"not","field":{"type":"selector","dimension":"cfloatstring","value":null,"extractionFn":null}}]},"columns":["cfloatstring"],"legacy":null,"context":null,"descending":false,"granularity":{"type":"all"}} druid.query.type scan Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE Filter Operator -- 2.20.1 (Apple Git-117)