diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index daee2fe..4510db3 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.common.util.ShutdownHookManager; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -91,13 +92,23 @@ protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + private static final HttpClient HTTP_CLIENT; + static { + final Lifecycle lifecycle = new Lifecycle(); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Issues with lifecycle start", e); + } + HTTP_CLIENT = makeHttpClient(lifecycle); + ShutdownHookManager.addShutdownHook(()-> lifecycle.stop()); + } + private final SQLMetadataConnector connector; private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; - private HttpClient httpClient; - private String uniqueId = null; private String rootWorkingDir = null; @@ -151,12 +162,10 @@ public String getPassword() { @VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector, - MetadataStorageTablesConfig druidMetadataStorageTablesConfig, - HttpClient httpClient + MetadataStorageTablesConfig druidMetadataStorageTablesConfig ) { this.connector = connector; this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig; - this.httpClient = httpClient; } @Override @@ -280,19 +289,12 @@ public void publishSegments(Table table, boolean overwrite) throws MetaException int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); - // check if the coordinator is up - httpClient = makeHttpClient(lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - Throwables.propagate(e); - } String coordinatorResponse = null; try { coordinatorResponse = RetryUtils.retry(new Callable() { @Override public String call() throws Exception { - return DruidStorageHandlerUtils.getURL(httpClient, + return DruidStorageHandlerUtils.getURL(getHttpClient(), new URL(String.format("http://%s/status", coordinatorAddress)) ); } @@ -347,7 +349,7 @@ public URL apply(DataSegment dataSegment) { @Override public boolean apply(URL input) { try { - String result = DruidStorageHandlerUtils.getURL(httpClient, input); + String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result)); return Strings.isNullOrEmpty(result); } catch (IOException e) { @@ -586,15 +588,18 @@ private String getRootWorkingDir() { return rootWorkingDir; } - private HttpClient makeHttpClient(Lifecycle lifecycle) { + private static HttpClient makeHttpClient(Lifecycle lifecycle) { final int numConnection = HiveConf - .getIntVar(getConf(), + .getIntVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION ); final Period readTimeout = new Period( - HiveConf.getVar(getConf(), + HiveConf.getVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT )); + LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout", + numConnection, readTimeout.toStandardDuration().getMillis() + ); return HttpClientInit.createClient( HttpClientConfig.builder().withNumConnections(numConnection) @@ -602,4 +607,8 @@ private HttpClient makeHttpClient(Lifecycle lifecycle) { lifecycle ); } + + public static HttpClient getHttpClient() { + return HTTP_CLIENT; + } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 53624e1..2f53616 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; @@ -193,23 +194,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio new String[]{address} ) }; } - // Properties from configuration - final int numConnection = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - - // Create request to obtain nodes that are holding data for the given datasource and intervals - final Lifecycle lifecycle = new Lifecycle(); - final HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } final String intervals = StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets final String request = String.format( @@ -217,9 +201,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8")); final InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, new URL(request))); + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -231,8 +214,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } // Create one input split for each segment @@ -260,12 +241,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, SelectQuery query, Path dummyPath ) throws IOException { - final int selectThreshold = (int) HiveConf.getIntVar( + final int selectThreshold = HiveConf.getIntVar( conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); - final int numConnection = HiveConf - .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); if (isFetch) { @@ -283,23 +260,12 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio metadataBuilder.merge(true); metadataBuilder.analysisTypes(); SegmentMetadataQuery metadataQuery = metadataBuilder.build(); - Lifecycle lifecycle = new Lifecycle(); - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, metadataQuery) ); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -313,8 +279,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } if (metadataList == null) { throw new IOException("Connected to Druid but could not retrieve datasource information"); @@ -350,23 +314,11 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder(); timeBuilder.dataSource(query.getDataSource()); TimeBoundaryQuery timeQuery = timeBuilder.build(); - - lifecycle = new Lifecycle(); - client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Lifecycle start issue"); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, timeQuery) ); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -380,8 +332,6 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } if (timeList == null || timeList.isEmpty()) { throw new IOException( diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 8d099c7..103591d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -25,6 +25,7 @@ import io.druid.query.BaseQuery; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.io.HiveDruidSplit; import org.apache.hadoop.io.NullWritable; @@ -81,26 +82,11 @@ public void initialize(InputSplit split, Configuration conf) throws IOException LOG.info("Retrieving from druid using query:\n " + query); } - final Lifecycle lifecycle = new Lifecycle(); - final int numConnection = HiveConf - .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - final Period readTimeout = new Period( - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); - - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration()) - .withNumConnections(numConnection).build(), lifecycle); - try { - lifecycle.start(); - } catch (Exception e) { - LOG.error("Issues with lifecycle start", e); - } InputStream response; try { - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getLocations()[0], query)); } catch (Exception e) { - lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -111,8 +97,6 @@ public void initialize(InputSplit split, Configuration conf) throws IOException } catch (IOException e) { response.close(); throw e; - } finally { - lifecycle.stop(); } if (resultsList == null || resultsList.isEmpty()) { return; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index bbe29b6..656c0f1 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; @@ -100,20 +101,12 @@ protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class); - private int numConnection; - private Period readTimeout; - private String[] columns; private PrimitiveTypeInfo[] types; private ObjectInspector inspector; @Override public void initialize(Configuration configuration, Properties properties) throws SerDeException { - // Init connection properties - numConnection = HiveConf - .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION); - readTimeout = new Period( - HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT)); final List columnNames = new ArrayList<>(); final List columnTypes = new ArrayList<>(); @@ -256,20 +249,13 @@ public ObjectInspector apply(PrimitiveTypeInfo type) { /* Submits the request and returns */ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) throws SerDeException, IOException { - final Lifecycle lifecycle = new Lifecycle(); - HttpClient client = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(numConnection) - .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); InputStream response; try { - lifecycle.start(); - response = DruidStorageHandlerUtils.submitRequest(client, + response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), DruidStorageHandlerUtils.createRequest(address, query) ); } catch (Exception e) { throw new SerDeException(StringUtils.stringifyException(e)); - } finally { - lifecycle.stop(); } // Retrieve results diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 05e3ec5..1fe155a 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -24,7 +24,6 @@ import io.druid.indexer.JobHelper; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageTablesConfig; -import io.druid.metadata.SQLMetadataSegmentManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -94,8 +93,7 @@ public void before() throws Throwable { public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { @@ -122,8 +120,7 @@ public void testPreCreateTableWhenDataSourceExists() throws MetaException { ); DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); } @@ -133,8 +130,7 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -164,8 +160,7 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() public void testCommitInsertTable() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -189,8 +184,7 @@ public void testCommitInsertTable() throws MetaException, IOException { public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get(), - null + derbyConnectorRule.metadataTablesConfigSupplier().get() ); String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); @@ -234,8 +228,7 @@ public void testCommitInsertOverwriteTable() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( connector, - metadataStorageTablesConfig, - null + metadataStorageTablesConfig ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration();