diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 3c858a8..f2700c8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -52,14 +53,16 @@ protected Thread readerThread = null; protected final LinkedBlockingQueue readerEvents = new LinkedBlockingQueue(); protected final long timeout; + protected final Closeable client; - public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job) { + public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; this.readerThread = Thread.currentThread(); this.timeout = 3 * HiveConf.getTimeVar(job, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + this.client = client; } public Schema getSchema() { @@ -68,7 +71,26 @@ public Schema getSchema() { @Override public void close() throws IOException { - din.close(); + Exception caughtException = null; + try { + din.close(); + } catch (Exception err) { + LOG.error("Error closing input stream:" + err.getMessage(), err); + caughtException = err; + } + + if (client != null) { + try { + client.close(); + } catch (Exception err) { + LOG.error("Error closing client:" + err.getMessage(), err); + caughtException = (caughtException == null ? err : caughtException); + } + } + + if (caughtException != null) { + throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + } } @Override diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 0edb1cd..a7b4f06 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hive.llap.ext; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory; -public class LlapTaskUmbilicalExternalClient extends AbstractService { +public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 4306c22..9cd40fc 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -117,9 +117,6 @@ public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; - private Connection con; - private Statement stmt; - public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.url = url; this.user = user; @@ -181,7 +178,8 @@ public LlapBaseInputFormat() {} LOG.info("Registered id: " + id); - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job); + LlapBaseRecordReader recordReader = new LlapBaseRecordReader( + socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient); umbilicalResponder.setRecordReader(recordReader); return recordReader; } @@ -205,11 +203,12 @@ public LlapBaseInputFormat() {} throw new IOException(e); } - try { - con = DriverManager.getConnection(url,user,pwd); - stmt = con.createStatement(); - String sql = String.format(SPLIT_QUERY, query, numSplits); + String sql = String.format(SPLIT_QUERY, query, numSplits); + try ( + Connection con = DriverManager.getConnection(url,user,pwd); + Statement stmt = con.createStatement(); ResultSet res = stmt.executeQuery(sql); + ) { while (res.next()) { // deserialize split DataInput in = new DataInputStream(res.getBinaryStream(1)); @@ -217,23 +216,12 @@ public LlapBaseInputFormat() {} is.readFields(in); ins.add(is); } - - res.close(); - stmt.close(); } catch (Exception e) { throw new IOException(e); } return ins.toArray(new InputSplit[ins.size()]); } - public void close() { - try { - con.close(); - } catch (Exception e) { - // ignore - } - } - private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { LlapRegistryService registryService = LlapRegistryService.getClient(job); String host = llapSplit.getLocations()[0]; diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index 7efc711..c3001e9 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -35,9 +35,9 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; - public class LlapRowInputFormat implements InputFormat { - LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); + + private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 1d592fb..50c09f3 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -116,7 +116,7 @@ public void testValues() throws Exception { writer.close(null); InputStream in = socket.getInputStream(); - LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job); + LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null); LOG.debug("Have record reader");