From 023620bcde7b94e44e9c12f1df5bcabb42e6afb2 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 9 Aug 2018 22:24:56 +0530 Subject: [PATCH] [HIVE-20353] Follow redirects for Druid overlord/coordinator --- .../hadoop/hive/druid/DruidStorageHandler.java | 143 +++++++++++++-------- .../hive/druid/DruidStorageHandlerUtils.java | 41 ++++++ 2 files changed, 129 insertions(+), 55 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 53d93e1b74..9f34b7b6fe 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -33,9 +33,8 @@ import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; - +import com.metamx.http.client.response.FullResponseHandler; +import com.metamx.http.client.response.FullResponseHolder; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -60,7 +59,6 @@ import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -108,6 +106,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -120,10 +119,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import javax.annotation.Nullable; - import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; /** @@ -407,14 +405,16 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv String task = JSON_MAPPER.writeValueAsString(spec); console.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); - - StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, - new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) - .setContent( - "application/json", - JSON_MAPPER.writeValueAsBytes(spec)), - new StatusResponseHandler( - Charset.forName("UTF-8"))).get(); + FullResponseHolder response = DruidStorageHandlerUtils + .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor", overlordAddress)) + ) + .setContent( + "application/json", + JSON_MAPPER.writeValueAsBytes(spec) + ), new FullResponseHandler( + Charset.forName("UTF-8"))); if (response.getStatus().equals(HttpResponseStatus.OK)) { String msg = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", spec.getDataSchema().getDataSource()); LOG.info(msg); @@ -431,16 +431,22 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv private void resetKafkaIngestion(String overlordAddress, String dataSourceName) { try { - StatusResponseHolder response = RetryUtils - .retry(() -> getHttpClient().go(new Request(HttpMethod.POST, - new URL(String - .format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, - dataSourceName))), - new StatusResponseHandler( - Charset.forName("UTF-8"))).get(), - input -> input instanceof IOException, - getMaxRetryCount()); - if (response.getStatus().equals(HttpResponseStatus.OK)) { + FullResponseHolder response = RetryUtils + .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader( + getHttpClient(), + new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/reset", + overlordAddress, + dataSourceName + )) + ), new FullResponseHandler( + Charset.forName("UTF-8")) + ), + input -> input instanceof IOException, + getMaxRetryCount() + ); + if(response.getStatus().equals(HttpResponseStatus.OK)) { console.printInfo("Druid Kafka Ingestion Reset successful."); } else { throw new IOException(String @@ -454,15 +460,21 @@ private void resetKafkaIngestion(String overlordAddress, String dataSourceName) private void stopKafkaIngestion(String overlordAddress, String dataSourceName) { try { - StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient() - .go(new Request(HttpMethod.POST, - new URL(String - .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, - dataSourceName))), - new StatusResponseHandler( - Charset.forName("UTF-8"))).get(), - input -> input instanceof IOException, - getMaxRetryCount()); + FullResponseHolder response = RetryUtils + .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader( + getHttpClient(), + new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", + overlordAddress, + dataSourceName + )) + ), new FullResponseHandler( + Charset.forName("UTF-8")) + ), + input -> input instanceof IOException, + getMaxRetryCount() + ); if (response.getStatus().equals(HttpResponseStatus.OK)) { console.printInfo("Druid Kafka Ingestion shutdown successful."); } else { @@ -485,15 +497,22 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET, - new URL(String - .format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, - dataSourceName))), - new StatusResponseHandler( - Charset.forName("UTF-8"))).get(), - input -> input instanceof IOException, - getMaxRetryCount()); - if (response.getStatus().equals(HttpResponseStatus.OK)) { + FullResponseHolder response = RetryUtils + .retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader( + getHttpClient(), + new Request(HttpMethod.GET, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s", + overlordAddress, + dataSourceName + )) + ), new FullResponseHandler( + Charset.forName("UTF-8")) + ), + input -> input instanceof IOException, + getMaxRetryCount() + ); + if(response.getStatus().equals(HttpResponseStatus.OK)) { return JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorSpec.class); // Druid Returns 400 Bad Request when not found. @@ -524,14 +543,18 @@ private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { - StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET, - new URL(String - .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, - dataSourceName))), - new StatusResponseHandler( - Charset.forName("UTF-8"))).get(), + FullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils + .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/status", + overlordAddress, + dataSourceName + )) + ), new FullResponseHandler( + Charset.forName("UTF-8"))), input -> input instanceof IOException, - getMaxRetryCount()); + getMaxRetryCount() + ); if (response.getStatus().equals(HttpResponseStatus.OK)) { return DruidStorageHandlerUtils.JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorReport.class); @@ -549,7 +572,7 @@ private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { return null; } } - + /** * Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN * @@ -609,9 +632,15 @@ private int checkLoadStatus(List segments){ String coordinatorResponse; try { - coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(), - new URL(String.format("http://%s/status", coordinatorAddress)) - ), input -> input instanceof IOException, maxTries); + coordinatorResponse = RetryUtils + .retry(() -> DruidStorageHandlerUtils + .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, + new URL(String.format("http://%s/status", coordinatorAddress)) + ), + new FullResponseHandler(Charset.forName("UTF-8")) + ).getContent(), + input -> input instanceof IOException, maxTries + ); } catch (Exception e) { console.printInfo( "Will skip waiting for data loading, coordinator unavailable"); @@ -642,10 +671,14 @@ private int checkLoadStatus(List segments){ while (numRetries++ < maxTries && !UrlsOfUnloadedSegments.isEmpty()) { UrlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(UrlsOfUnloadedSegments, input -> { try { - String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); + String result = DruidStorageHandlerUtils + .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input), + new FullResponseHandler(Charset.forName("UTF-8")) + ).getContent(); + LOG.debug("Checking segment [{}] response is [{}]", input, result); return Strings.isNullOrEmpty(result); - } catch (IOException e) { + } catch (InterruptedException | ExecutionException e) { LOG.error(String.format("Error while checking URL [%s]", input), e); return true; } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 3e2a1711f6..9da46df960 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -38,6 +38,8 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; +import com.metamx.http.client.response.FullResponseHandler; +import com.metamx.http.client.response.FullResponseHolder; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; @@ -100,6 +102,7 @@ import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; @@ -120,6 +123,7 @@ import java.io.OutputStream; import java.io.Reader; import java.net.InetAddress; +import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; import java.sql.SQLException; @@ -277,6 +281,43 @@ public static String getURL(HttpClient client, URL url) throws IOException { } } + public static FullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request, + FullResponseHandler fullResponseHandler) + throws ExecutionException, InterruptedException { + FullResponseHolder responseHolder = client.go(request, + fullResponseHandler).get(); + if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) { + String redirectUrlStr = responseHolder.getResponse().headers().get("Location"); + LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), + redirectUrlStr); + final URL redirectUrl; + try { + redirectUrl = new URL(redirectUrlStr); + } catch (MalformedURLException ex) { + throw new ExecutionException( + String.format( + "Malformed redirect location is found in response from url[%s], new location[%s].", + request.getUrl(), + redirectUrlStr), + ex + ); + } + responseHolder = client.go(withUrl(request, redirectUrl), + fullResponseHandler).get(); + } + return responseHolder; + } + + private static Request withUrl(Request old, URL url) + { + Request req = new Request(old.getMethod(), url); + req.addHeaderValues(old.getHeaders()); + if (old.hasContent()) { + req.setContent(old.getContent()); + } + return req; + } + /** * @param taskDir path to the directory containing the segments descriptor info * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json -- 2.15.2 (Apple Git-101.1)