diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index d4f6865..2bd0728 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -67,6 +66,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.common.util.ShutdownHookManager; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -93,6 +93,23 @@ protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + private static final HttpClient HTTP_CLIENT; + static { + final Lifecycle lifecycle = new Lifecycle(); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Issues with lifecycle start", e); + } + HTTP_CLIENT = makeHttpClient(lifecycle); + ShutdownHookManager.addShutdownHook(new Thread() { + @Override + public void run() { + lifecycle.stop(); + } + }); + } + private final SQLMetadataConnector connector; @@ -100,8 +117,6 @@ private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; - private HttpClient httpClient; - private String uniqueId = null; private String rootWorkingDir = null; @@ -157,13 +172,11 @@ public String getPassword() { @VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector, SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler, - MetadataStorageTablesConfig druidMetadataStorageTablesConfig, - HttpClient httpClient + MetadataStorageTablesConfig druidMetadataStorageTablesConfig ) { this.connector = connector; this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler; this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig; - this.httpClient = httpClient; } @Override @@ -277,19 +290,12 @@ public void commitCreateTable(Table table) throws MetaException { final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); - // check if the coordinator is up - httpClient = makeHttpClient(lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - Throwables.propagate(e); - } String coordinatorResponse = null; try { coordinatorResponse = RetryUtils.retry(new Callable() { @Override public String call() throws Exception { - return DruidStorageHandlerUtils.getURL(httpClient, + return DruidStorageHandlerUtils.getURL(getHttpClient(), new URL(String.format("http://%s/status", coordinatorAddress)) ); } @@ -344,7 +350,7 @@ public URL apply(DataSegment dataSegment) { @Override public boolean apply(URL input) { try { - String result = DruidStorageHandlerUtils.getURL(httpClient, input); + String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result)); return Strings.isNullOrEmpty(result); } catch (IOException e) { @@ -583,15 +589,18 @@ private String getRootWorkingDir() { return rootWorkingDir; } - private HttpClient makeHttpClient(Lifecycle lifecycle) { + private static HttpClient makeHttpClient(Lifecycle lifecycle) { final int numConnection = HiveConf - .getIntVar(getConf(), + .getIntVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION ); final Period readTimeout = new Period( - HiveConf.getVar(getConf(), + HiveConf.getVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT )); + LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout", + numConnection, readTimeout.toStandardDuration().getMillis() + ); return HttpClientInit.createClient( HttpClientConfig.builder().withNumConnections(numConnection) @@ -599,4 +608,8 @@ private HttpClient makeHttpClient(Lifecycle lifecycle) { lifecycle ); } + + public static HttpClient getHttpClient() { + return HTTP_CLIENT; + } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index fe6c901..003cf49 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; @@ -192,23 +193,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio new String[]{address} ) }; } - // Properties from configuration - final int numConnection = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - - // Create request to obtain nodes that are holding data for the given datasource and intervals - final Lifecycle lifecycle = new Lifecycle(); - final HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } final String intervals = StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets final String request = String.format( @@ -216,9 +200,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio address, query.getDataSource().getNames().get(0), intervals); final InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request))); + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -230,8 +213,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } // Create one input split for each segment @@ -259,12 +240,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, SelectQuery query, Path dummyPath ) throws IOException { - final int selectThreshold = (int) HiveConf.getIntVar( + final int selectThreshold = HiveConf.getIntVar( conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); - final int numConnection = HiveConf - .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); if (isFetch) { @@ -282,23 +259,12 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio metadataBuilder.merge(true); metadataBuilder.analysisTypes(); SegmentMetadataQuery metadataQuery = metadataBuilder.build(); - Lifecycle lifecycle = new Lifecycle(); - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, metadataQuery) ); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -312,8 +278,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } if (metadataList == null) { throw new IOException("Connected to Druid but could not retrieve datasource information"); @@ -349,23 +313,11 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder(); timeBuilder.dataSource(query.getDataSource()); TimeBoundaryQuery timeQuery = timeBuilder.build(); - - lifecycle = new Lifecycle(); - client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, timeQuery) ); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -379,8 +331,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } if (timeList == null || timeList.isEmpty()) { throw new IOException( diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 8d099c7..103591d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -25,6 +25,7 @@ import io.druid.query.BaseQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.io.HiveDruidSplit; import org.apache.hadoop.io.NullWritable; @@ -81,26 +82,11 @@ public void initialize(InputSplit split, Configuration conf) throws IOException LOG.info("Retrieving from druid using query:\n " + query); } - final Lifecycle lifecycle = new Lifecycle(); - final int numConnection = HiveConf - .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration()) - .withNumConnections(numConnection).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Issues with lifecycle start", e); - } InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -111,8 +97,6 @@ public void initialize(InputSplit split, Configuration conf) throws IOException } catch (IOException e) { response.close(); throw e; - } finally { - lifecycle.stop(); } if (resultsList == null || resultsList.isEmpty()) { return; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index bbe29b6..656c0f1 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; @@ -100,20 +101,12 @@ protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class); - private int numConnection; - private Period readTimeout; - private String[] columns; private PrimitiveTypeInfo[] types; private ObjectInspector inspector; @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { - // Init connection properties - numConnection = HiveConf - .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - readTimeout = new Period( - HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); final List columnNames = new ArrayList<>(); final List columnTypes = new ArrayList<>(); @@ -256,20 +249,13 @@ public ObjectInspector apply(PrimitiveTypeInfo type) { /* Submits the request and returns */ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) throws SerDeException, IOException { - final Lifecycle lifecycle = new Lifecycle(); - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); InputStream response; try { - lifecycle.start(); - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } // Retrieve results diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index da6610a..97a1f75 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -86,8 +86,7 @@ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { @@ -115,8 +114,7 @@ public void testPreCreateTableWhenDataSourceExists() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); } @@ -127,8 +125,7 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -159,8 +156,7 @@ public void testCommitInsertTable() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -185,8 +181,7 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();