From 395948d98a809f876064fcdc2ef47bf7b7c09594 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 23 Nov 2016 14:49:07 -0800 Subject: [PATCH] adding confing to http client --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++ .../hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java | 10 ++++++++-- .../hadoop/hive/druid/serde/DruidQueryRecordReader.java | 11 ++++++++++- .../java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java | 12 +++++++++++- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9e4c590..7be55f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1934,6 +1934,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "number of records of the query results is larger than this threshold, we split the query in\n" + "total number of rows/threshold parts across the time dimension. Note that we assume the\n" + "records to be split uniformly across the time dimension"), + HIVE_DRUID_NUM_HTTP_CONNECTION("hive.druid.http.numConnection", 20, "num connection used by the http client"), + HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "read timeout period for the http client"), // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java index 787cd52..612f853 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.joda.time.Interval; +import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,7 +161,10 @@ private static String createSelectStarQuery(String address, String dataSource) t String druidQuery, Path dummyPath) throws IOException { final int selectThreshold = (int) HiveConf.getIntVar( conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); - + final int numConnection = HiveConf + .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); + final Period readTimeout = new Period( + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); SelectQuery query; try { query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); @@ -184,7 +188,9 @@ private static String createSelectStarQuery(String address, String dataSource) t metadataBuilder.analysisTypes(); SegmentMetadataQuery metadataQuery = metadataBuilder.build(); - HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + HttpClient client = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle()); InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(client, diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 96bcee87..fe6213b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -23,12 +23,14 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.HiveDruidSplit; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +83,14 @@ public void initialize(InputSplit split, Configuration conf) throws IOException LOG.info("Retrieving from druid using query:\n " + query); } - HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + final int numConnection = HiveConf + .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); + final Period readTimeout = new Period( + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); + + HttpClient client = HttpClientInit.createClient( + HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration()) + .withNumConnections(numConnection).build(), new Lifecycle()); InputStream response = DruidStorageHandlerUtils.submitRequest(client, DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 238f7a3..9857b94 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -46,6 +46,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; +import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +83,9 @@ private PrimitiveTypeInfo[] types; private ObjectInspector inspector; + private int numConnection = 20; + + private Period readTimeout = new Period("PT1M"); @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { @@ -113,6 +117,10 @@ public void initialize(Configuration configuration, Properties properties) throw throw new SerDeException("Druid broker address not specified in configuration"); } + numConnection = HiveConf + .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); + readTimeout = new Period( + HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); // Infer schema SegmentAnalysis schemaInfo; try { @@ -184,7 +192,9 @@ public void initialize(Configuration configuration, Properties properties) throw /* Submits the request and returns */ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) throws SerDeException, IOException { - HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + HttpClient client = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle()); InputStream response; try { response = DruidStorageHandlerUtils.submitRequest(client, -- 2.8.4 (Apple Git-73)