diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0bff243..31b074f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1933,12 +1933,20 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", "Address of the Druid coordinator. It is used to check the load status of newly created segments" ), + HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true, + "If it is set to true, we distribute the execution of Druid Select queries. Concretely, we retrieve\n" + + "the result for Select queries directly from the Druid nodes containing the segments data.\n" + + "In particular, first we contact the Druid broker node to obtain the nodes containing the segments\n" + + "for the given query, and then we contact those nodes to retrieve the results for the query.\n" + + "If it is set to false, we do not execute the Select queries in a distributed fashion. Instead, results\n" + + "for those queries are returned by the Druid broker node."), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, + "Takes only effect when hive.druid.select.distribute is set to false. \n" + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" + "number of records of the query results is larger than this threshold, we split the query in\n" + "total number of rows/threshold parts across the time dimension. Note that we assume the\n" + - "records to be split uniformly across the time dimension"), + "records to be split uniformly across the time dimension."), HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" + "the HTTP client."), HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" + diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 8b37840..4719774 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; @@ -60,23 +62,28 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.collect.Lists; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; import io.druid.query.BaseQuery; import io.druid.query.Druids; import io.druid.query.Druids.SegmentMetadataQueryBuilder; import io.druid.query.Druids.SelectQueryBuilder; import io.druid.query.Druids.TimeBoundaryQueryBuilder; +import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryResultValue; @@ -148,7 +155,12 @@ case Query.SELECT: SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( druidQuery, SelectQuery.class); - return splitSelectQuery(conf, address, selectQuery, paths[0]); + boolean distributed = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE); + if (distributed) { + return distributeSelectQuery(conf, address, selectQuery, paths[0]); + } else { + return splitSelectQuery(conf, address, selectQuery, paths[0]); + } default: throw new IOException("Druid query type not recognized"); } @@ -166,8 +178,78 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build()); } + /* New method that distributes the Select query by creating splits containing + * information about different Druid nodes that have the data for the given + * query. */ + private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address, + SelectQuery query, Path dummyPath) throws IOException { + // If it has a limit, we use it and we do not distribute the query + final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + if (isFetch) { + return new HiveDruidSplit[] { new HiveDruidSplit( + address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) }; + } + + // Properties from configuration + final int numConnection = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); + final Period readTimeout = new Period( + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); + + // Create request to obtain nodes that are holding data for the given datasource and intervals + final Lifecycle lifecycle = new Lifecycle(); + final HttpClient client = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Lifecycle start issue", e); + } + final String intervals = + StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets + final String request = String.format( + "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", + address, query.getDataSource().getNames().get(0), intervals); + final InputStream response; + try { + response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request))); + } catch (Exception e) { + lifecycle.stop(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Retrieve results + final List segmentDescriptors; + try { + segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response, + new TypeReference>() {}); + } catch (Exception e) { + response.close(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } finally { + lifecycle.stop(); + } + + // Create one input split for each segment + final int numSplits = segmentDescriptors.size(); + final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; + for (int i = 0; i < numSplits; i++) { + final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); + final String node = locatedSD.getLocations().get(0).getHost(); + // Create partial Select query + final SegmentDescriptor newSD = new SegmentDescriptor( + locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); + final SelectQuery partialQuery = query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))); + splits[i] = new HiveDruidSplit(node, + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath); + } + return splits; + } + /* Method that splits Select query depending on the threshold so read can be - * parallelized */ + * parallelized. We will only contact the Druid broker to obtain all results. */ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, SelectQuery query, Path dummyPath ) throws IOException { @@ -269,11 +351,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio LOG.error("Lifecycle start issue", e); } try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue", e); - } - try { response = DruidStorageHandlerUtils.submitRequest(client, DruidStorageHandlerUtils.createRequest(address, timeQuery) );