diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 3c858a8..f2700c8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -52,14 +53,16 @@ protected Thread readerThread = null; protected final LinkedBlockingQueue readerEvents = new LinkedBlockingQueue(); protected final long timeout; + protected final Closeable client; - public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job) { + public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; this.readerThread = Thread.currentThread(); this.timeout = 3 * HiveConf.getTimeVar(job, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + this.client = client; } public Schema getSchema() { @@ -68,7 +71,26 @@ public Schema getSchema() { @Override public void close() throws IOException { - din.close(); + Exception caughtException = null; + try { + din.close(); + } catch (Exception err) { + LOG.error("Error closing input stream:" + err.getMessage(), err); + caughtException = err; + } + + if (client != null) { + try { + client.close(); + } catch (Exception err) { + LOG.error("Error closing client:" + err.getMessage(), err); + caughtException = (caughtException == null ? err : caughtException); + } + } + + if (caughtException != null) { + throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + } } @Override diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 0edb1cd..a7b4f06 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hive.llap.ext; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory; -public class LlapTaskUmbilicalExternalClient extends AbstractService { +public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 4306c22..0549d15 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -27,6 +27,7 @@ import java.sql.Statement; import java.sql.DriverManager; +import java.io.Closeable; import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; @@ -100,7 +101,8 @@ /** * Base LLAP input format to handle requesting of splits and communication with LLAP daemon. */ -public class LlapBaseInputFormat implements InputFormat { +public class LlapBaseInputFormat + implements InputFormat, Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class); @@ -181,7 +183,8 @@ public LlapBaseInputFormat() {} LOG.info("Registered id: " + id); - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job); + LlapBaseRecordReader recordReader = new LlapBaseRecordReader( + socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient); umbilicalResponder.setRecordReader(recordReader); return recordReader; } @@ -226,11 +229,12 @@ public LlapBaseInputFormat() {} return ins.toArray(new InputSplit[ins.size()]); } - public void close() { + @Override + public void close() throws IOException { try { con.close(); } catch (Exception e) { - // ignore + throw new IOException(e); } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index 7efc711..8148ebf 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap; +import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.hive.llap.LlapBaseRecordReader; @@ -35,9 +36,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +public class LlapRowInputFormat + implements InputFormat, Closeable { -public class LlapRowInputFormat implements InputFormat { - LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); + private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -51,4 +53,9 @@ LlapBaseRecordReader reader = (LlapBaseRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter); return new LlapRowRecordReader(job, reader.getSchema(), reader); } + + @Override + public void close() throws IOException { + baseInputFormat.close(); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 1d592fb..50c09f3 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -116,7 +116,7 @@ public void testValues() throws Exception { writer.close(null); InputStream in = socket.getInputStream(); - LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job); + LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null); LOG.debug("Have record reader");