diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 7ac52c6..56893d7 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -188,7 +188,7 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio metadataBuilder.merge(true); metadataBuilder.analysisTypes(); SegmentMetadataQuery metadataQuery = metadataBuilder.build(); - final Lifecycle lifecycle = new Lifecycle(); + Lifecycle lifecycle = new Lifecycle(); HttpClient client = HttpClientInit.createClient( HttpClientConfig.builder().withNumConnections(numConnection) .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); @@ -203,9 +203,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio DruidStorageHandlerUtils.createRequest(address, metadataQuery) ); } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { lifecycle.stop(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } // Retrieve results @@ -218,6 +217,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } finally { + lifecycle.stop(); } if (metadataList == null || metadataList.isEmpty()) { throw new IOException("Connected to Druid but could not retrieve datasource information"); @@ -248,11 +249,26 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio timeBuilder.dataSource(query.getDataSource()); TimeBoundaryQuery timeQuery = timeBuilder.build(); + lifecycle = new Lifecycle(); + client = HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle); + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Lifecycle start issue", e); + } + try { + lifecycle.start(); + } catch (Exception e) { + LOG.error("Lifecycle start issue", e); + } try { response = DruidStorageHandlerUtils.submitRequest(client, DruidStorageHandlerUtils.createRequest(address, timeQuery) ); } catch (Exception e) { + lifecycle.stop(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } @@ -266,6 +282,8 @@ private static String createSelectStarQuery(String dataSource) throws IOExceptio } catch (Exception e) { response.close(); throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } finally { + lifecycle.stop(); } if (timeList == null || timeList.isEmpty()) { throw new IOException( diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index dc9d6a0..0d5f0b1 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -90,16 +90,21 @@ public void initialize(InputSplit split, Configuration conf) throws IOException HttpClient client = HttpClientInit.createClient( HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration()) .withNumConnections(numConnection).build(), lifecycle); - try { lifecycle.start(); } catch (Exception e) { LOG.error("Issues with lifecycle start", e); } - InputStream response = DruidStorageHandlerUtils.submitRequest(client, - DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query) - ); - lifecycle.stop(); + InputStream response; + try { + response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query) + ); + } catch (Exception e) { + lifecycle.stop(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + // Retrieve results List resultsList; try { @@ -107,6 +112,8 @@ public void initialize(InputSplit split, Configuration conf) throws IOException } catch (IOException e) { response.close(); throw e; + } finally { + lifecycle.stop(); } if (resultsList == null || resultsList.isEmpty()) { return;