From f9090216686a067b1ff70fc63b1db190dbce1c80 Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 17 Aug 2018 17:24:22 +0530 Subject: [PATCH] [HIVE-20349] Query next split location if one location is not available --- .../hive/druid/io/DruidQueryBasedInputFormat.java | 7 +- .../hive/druid/serde/DruidQueryRecordReader.java | 105 ++++++++++++--------- ...tDruidQueryBasedInputFormatToAddFaultyHost.java | 53 +++++++++++ .../QTestDruidStorageHandlerToAddFaultyHost.java | 34 +++++++ .../test/queries/clientpositive/druidmini_test1.q | 2 +- .../clientpositive/druid/druidmini_test1.q.out | 4 +- 6 files changed, 157 insertions(+), 48 deletions(-) create mode 100644 druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java create mode 100644 druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 4abe4b6ff3..61b6cea5ce 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -106,7 +106,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) } @SuppressWarnings("deprecation") - private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException { + protected HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException { String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS ); @@ -223,10 +223,13 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; for (int i = 0; i < numSplits; i++) { final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); - final String[] hosts = new String[locatedSD.getLocations().size()]; + final String[] hosts = new String[locatedSD.getLocations().size() + 1]; for (int j = 0; j < locatedSD.getLocations().size(); j++) { hosts[j] = locatedSD.getLocations().get(j).getHost(); } + // Default to broker if all other hosts fail. + hosts[locatedSD.getLocations().size()] = address; + // Create partial Select query final SegmentDescriptor newSD = new SegmentDescriptor( locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 78406bd506..4ce7dd163c 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -105,18 +105,53 @@ public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper Preconditions.checkNotNull(query); this.resultsType = getResultTypeDef(); this.httpClient = Preconditions.checkNotNull(httpClient, "need Http Client"); - // Execute query - LOG.debug("Retrieving data from druid using query:\n " + query); - final String address = hiveDruidSplit.getLocations()[0]; - if (Strings.isNullOrEmpty(address)) { - throw new IOException("can not fetch results form empty or null host value"); + final String[] locations = hiveDruidSplit.getLocations(); + boolean initlialized = false; + int currentLocationIndex = 0; + Exception ex = null; + while (!initlialized && currentLocationIndex < locations.length) { + String address = locations[currentLocationIndex++]; + if(Strings.isNullOrEmpty(address)) { + throw new IOException("can not fetch results from empty or null host value"); + } + // Execute query + LOG.debug("Retrieving data from druid location[{}] using query:[{}] ", address, query); + try { + Request request = DruidStorageHandlerUtils.createSmileRequest(address, query); + Future inputStreamFuture = this.httpClient + .go(request, new InputStreamResponseHandler()); + queryResultsIterator = new JsonParserIterator(this.smileMapper, resultsType, + inputStreamFuture, request.getUrl().toString(), query + ); + queryResultsIterator.init(); + initlialized = true; + } catch (IOException | ExecutionException | InterruptedException e) { + if(queryResultsIterator != null) { + // We got exception while querying results from this host. + queryResultsIterator.close(); + } + LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]", + query, + address, + e.getMessage() + ); + if(ex == null) { + ex = e; + } else { + ex.addSuppressed(e); + } + } + } + + if(!initlialized) { + throw new RE( + ex, + "Failure getting results for query[%s] from locations[%s] because of [%s]", + query, + locations, + ex.getMessage() + ); } - Request request = DruidStorageHandlerUtils.createSmileRequest(address, query); - Future inputStreamFuture = this.httpClient - .go(request, new InputStreamResponseHandler()); - queryResultsIterator = new JsonParserIterator(this.smileMapper, resultsType, inputStreamFuture, - request.getUrl().toString(), query - ); } public void initialize(InputSplit split, Configuration conf) throws IOException { @@ -207,8 +242,6 @@ public JsonParserIterator(ObjectMapper mapper, @Override public boolean hasNext() { - init(); - if (jp.isClosed()) { return false; } @@ -223,8 +256,6 @@ public boolean hasNext() @Override public R next() { - init(); - try { final R retVal = objectCodec.readValue(jp, typeRef); jp.nextToken(); @@ -241,35 +272,23 @@ public void remove() throw new UnsupportedOperationException(); } - private void init() - { - if (jp == null) { - try { - InputStream is = future.get(); - if (is == null) { - throw new IOException(String.format("query[%s] url[%s] timed out", query, url)); - } else { - jp = mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true); - } - final JsonToken nextToken = jp.nextToken(); - if (nextToken == JsonToken.START_OBJECT) { - QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); - throw new QueryInterruptedException(cause); - } else if (nextToken != JsonToken.START_ARRAY) { - throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); - } else { - jp.nextToken(); - objectCodec = jp.getCodec(); - } + private void init() throws IOException, ExecutionException, InterruptedException { + if(jp == null) { + InputStream is = future.get(); + if(is == null) { + throw new IOException(String.format("query[%s] url[%s] timed out", query, url)); + } else { + jp = mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true); } - catch (IOException | InterruptedException | ExecutionException e) { - throw new RE( - e, - "Failure getting results for query[%s] url[%s] because of [%s]", - query, - url, - e.getMessage() - ); + final JsonToken nextToken = jp.nextToken(); + if(nextToken == JsonToken.START_OBJECT) { + QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); + throw new QueryInterruptedException(cause); + } else if(nextToken != JsonToken.START_ARRAY) { + throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); + } else { + jp.nextToken(); + objectCodec = jp.getCodec(); } } } diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java new file mode 100644 index 0000000000..b47982f2a1 --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.HiveDruidSplit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This input format adds a faulty host as first input split location. + * Tests should be able to query results successfully by trying next split locations. + */ +public class QTestDruidQueryBasedInputFormatToAddFaultyHost extends DruidQueryBasedInputFormat { + + @Override + protected HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException { + HiveDruidSplit[] inputSplits = super.getInputSplits(conf); + List list = new ArrayList<>(); + for(HiveDruidSplit split : inputSplits) { + String[] locations = split.getLocations(); + List locationsWithFaultyHost = Lists.newArrayListWithCapacity(locations.length + 1); + // A non-queryable host location. + locationsWithFaultyHost.add("localhost:8081"); + locationsWithFaultyHost.addAll(Arrays.asList(locations)); + HiveDruidSplit hiveDruidSplit = new HiveDruidSplit(split.getDruidQuery(), split.getPath(), + locationsWithFaultyHost.toArray(new String[0]) + ); + list.add(hiveDruidSplit); + } + return list.toArray(new HiveDruidSplit[0]); + } +} diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java new file mode 100644 index 0000000000..e0dec01e06 --- /dev/null +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import org.apache.hadoop.mapred.InputFormat; + +/** + * Storage handler for Druid to be used in tests. + * It uses an input format that adds a faulty host as first input split location. + * Tests should be able to query results successfully by trying next split locations. + */ +@SuppressWarnings("deprecation") +public class QTestDruidStorageHandlerToAddFaultyHost extends DruidStorageHandler { + + @Override + public Class getInputFormatClass() { + return QTestDruidQueryBasedInputFormatToAddFaultyHost.class; + } +} diff --git a/ql/src/test/queries/clientpositive/druidmini_test1.q b/ql/src/test/queries/clientpositive/druidmini_test1.q index f53cc05389..63cbdfed90 100644 --- a/ql/src/test/queries/clientpositive/druidmini_test1.q +++ b/ql/src/test/queries/clientpositive/druidmini_test1.q @@ -3,7 +3,7 @@ SET hive.ctas.external.tables=true; SET hive.external.table.purge.default = true; CREATE EXTERNAL TABLE druid_table_n3 -STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost' TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") AS SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out index 6f8551525f..01b8b6a9db 100644 --- a/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out +++ b/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out @@ -1,5 +1,5 @@ PREHOOK: query: CREATE EXTERNAL TABLE druid_table_n3 -STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost' TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") AS SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, @@ -19,7 +19,7 @@ PREHOOK: Input: default@alltypesorc PREHOOK: Output: database:default PREHOOK: Output: default@druid_table_n3 POSTHOOK: query: CREATE EXTERNAL TABLE druid_table_n3 -STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost' TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") AS SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, -- 2.15.2 (Apple Git-101.1)