diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0bff243..31b074f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1933,12 +1933,20 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", "Address of the Druid coordinator. It is used to check the load status of newly created segments" ), + HIVE_DRUID_SELECT_DISTRIBUTE("hive.druid.select.distribute", true, + "If it is set to true, we distribute the execution of Druid Select queries. Concretely, we retrieve\n" + + "the result for Select queries directly from the Druid nodes containing the segments data.\n" + + "In particular, first we contact the Druid broker node to obtain the nodes containing the segments\n" + + "for the given query, and then we contact those nodes to retrieve the results for the query.\n" + + "If it is set to false, we do not execute the Select queries in a distributed fashion. Instead, results\n" + + "for those queries are returned by the Druid broker node."), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, + "Takes only effect when hive.druid.select.distribute is set to false. \n" + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" + "number of records of the query results is larger than this threshold, we split the query in\n" + "total number of rows/threshold parts across the time dimension. Note that we assume the\n" + - "records to be split uniformly across the time dimension"), + "records to be split uniformly across the time dimension."), HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "Number of connections used by\n" + "the HTTP client."), HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" + diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 8b37840..0b35428 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; @@ -60,23 +62,28 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.collect.Lists; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; import io.druid.query.BaseQuery; import io.druid.query.Druids; import io.druid.query.Druids.SegmentMetadataQueryBuilder; import io.druid.query.Druids.SelectQueryBuilder; import io.druid.query.Druids.TimeBoundaryQueryBuilder; +import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryResultValue; @@ -143,12 +150,17 @@ case Query.TIMESERIES: case Query.TOPN: case Query.GROUP_BY: - return new HiveDruidSplit[] { new HiveDruidSplit(address, - deserializeSerialize(druidQuery), paths[0]) }; + return new HiveDruidSplit[] { new HiveDruidSplit(deserializeSerialize(druidQuery), + paths[0], new String[] {address}) }; case Query.SELECT: SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( druidQuery, SelectQuery.class); - return splitSelectQuery(conf, address, selectQuery, paths[0]); + boolean distributed = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_DISTRIBUTE); + if (distributed) { + return distributeSelectQuery(conf, address, selectQuery, paths[0]); + } else { + return splitSelectQuery(conf, address, selectQuery, paths[0]); + } default: throw new IOException("Druid query type not recognized"); } @@ -166,8 +178,83 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build()); } + /* New method that distributes the Select query by creating splits containing + * information about different Druid nodes that have the data for the given + * query. */ + private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address, + SelectQuery query, Path dummyPath) throws IOException { + // If it has a limit, we use it and we do not distribute the query + final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + if (isFetch) { + return new HiveDruidSplit[] { new HiveDruidSplit( + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, + new String[]{address} ) }; + } + + // Properties from configuration + final int numConnection = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); + final Period readTimeout = new Period( + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); + + // Create request to obtain nodes that are holding data for the given datasource and intervals + final Lifecycle lifecycle = new Lifecycle(); + final HttpClient client = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Lifecycle start issue"); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + final String intervals = + StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets + final String request = String.format( + "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", + address, query.getDataSource().getNames().get(0), intervals); + final InputStream response; + try { + response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request))); + } catch (Exception e) { + lifecycle.stop(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Retrieve results + final List segmentDescriptors; + try { + segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response, + new TypeReference>() {}); + } catch (Exception e) { + response.close(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } finally { + lifecycle.stop(); + } + + // Create one input split for each segment + final int numSplits = segmentDescriptors.size(); + final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; + for (int i = 0; i < numSplits; i++) { + final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); + final String[] hosts = new String[locatedSD.getLocations().size()]; + for (int j = 0; j < locatedSD.getLocations().size(); j++) { + hosts[j] = locatedSD.getLocations().get(j).getHost(); + } + // Create partial Select query + final SegmentDescriptor newSD = new SegmentDescriptor( + locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); + final SelectQuery partialQuery = query.withQuerySegmentSpec( + new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))); + splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), + dummyPath, hosts); + } + return splits; + } + /* Method that splits Select query depending on the threshold so read can be - * parallelized */ + * parallelized. We will only contact the Druid broker to obtain all results. */ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, SelectQuery query, Path dummyPath ) throws IOException { @@ -182,7 +269,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio if (isFetch) { // If it has a limit, we use it and we do not split the query return new HiveDruidSplit[] { new HiveDruidSplit( - address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) }; + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, + new String[] {address} ) }; } // We do not have the number of rows, thus we need to execute a @@ -200,7 +288,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio try { lifecycle.start(); } catch (Exception e) { - LOG.error("Lifecycle start issue", e); + LOG.error("Lifecycle start issue"); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } InputStream response; try { @@ -231,7 +320,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio if (metadataList.isEmpty()) { // There are no rows for that time range, we can submit query as it is return new HiveDruidSplit[] { new HiveDruidSplit( - address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) }; + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, + new String[] {address} ) }; } if (metadataList.size() != 1) { throw new IOException("Information about segments should have been merged"); @@ -242,9 +332,9 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE)); if (numRows <= selectThreshold) { // We are not going to split it - return new HiveDruidSplit[] { new HiveDruidSplit(address, - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath - ) }; + return new HiveDruidSplit[] { new HiveDruidSplit( + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, + new String[] {address} ) }; } // If the query does not specify a timestamp, we obtain the total time using @@ -266,12 +356,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio try { lifecycle.start(); } catch (Exception e) { - LOG.error("Lifecycle start issue", e); - } - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue", e); + LOG.error("Lifecycle start issue"); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } try { response = DruidStorageHandlerUtils.submitRequest(client, @@ -318,9 +404,9 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio // Create partial Select query final SelectQuery partialQuery = query.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(newIntervals.get(i))); - splits[i] = new HiveDruidSplit(address, - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath - ); + splits[i] = new HiveDruidSplit( + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, + new String[] {address}); } return splits; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java index 861075d..58cb47a 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; @@ -29,56 +30,41 @@ */ public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit { - private String address; - private String druidQuery; + private String[] hosts; + // required for deserialization public HiveDruidSplit() { super((Path) null, 0, 0, (String[]) null); } - public HiveDruidSplit(String address, String druidQuery, Path dummyPath) { - super(dummyPath, 0, 0, (String[]) null); - this.address = address; + public HiveDruidSplit(String druidQuery, Path dummyPath, String hosts[]) { + super(dummyPath, 0, 0, hosts); this.druidQuery = druidQuery; + this.hosts = hosts; } @Override public void write(DataOutput out) throws IOException { super.write(out); - out.writeUTF(address); out.writeUTF(druidQuery); } @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - address = in.readUTF(); druidQuery = in.readUTF(); } - @Override - public long getLength() { - return 0L; - } - - @Override - public String[] getLocations() { - return new String[] { "" }; - } - - public String getAddress() { - return address; - } - public String getDruidQuery() { return druidQuery; } @Override public String toString() { - return "HiveDruidSplit{" + address + ", " + druidQuery + "}"; + return "HiveDruidSplit{" + druidQuery + ", " + + (hosts == null ? "empty hosts" : Arrays.toString(hosts)) + "}"; } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 0d5f0b1..8d099c7 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -98,8 +98,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query) - ); + DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); } catch (Exception e) { lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));