diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index f2700c8..59dec1b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -54,8 +54,10 @@ protected final LinkedBlockingQueue readerEvents = new LinkedBlockingQueue(); protected final long timeout; protected final Closeable client; + private final Closeable socket; - public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client) { + public LlapBaseRecordReader(InputStream in, Schema schema, + Class clazz, JobConf job, Closeable client, Closeable socket) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; @@ -63,6 +65,7 @@ public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobCo this.timeout = 3 * HiveConf.getTimeVar(job, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); this.client = client; + this.socket = socket; } public Schema getSchema() { @@ -78,6 +81,7 @@ public void close() throws IOException { LOG.error("Error closing input stream:" + err.getMessage(), err); caughtException = err; } + // Don't close the socket - the stream already does that if needed. if (client != null) { try { @@ -152,9 +156,10 @@ public boolean next(NullWritable key, V value) throws IOException { ReaderEvent event = getReaderEvent(); switch (event.getEventType()) { case ERROR: - throw new IOException("Received reader event error: " + event.getMessage()); + throw new IOException("Received reader event error: " + event.getMessage(), io); default: - throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); } } } else { @@ -214,7 +219,13 @@ public void handleEvent(ReaderEvent event) { if (LOG.isDebugEnabled()) { LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); } - getReaderThread().interrupt(); + readerThread.interrupt(); + try { + socket.close(); + } catch (IOException e) { + // Leave the client to time out. + LOG.error("Cannot close the socket on error", e); + } break; default: throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6d63797..aef5762 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -170,8 +170,8 @@ public LlapBaseInputFormat() {} LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") - LlapBaseRecordReader recordReader = new LlapBaseRecordReader( - socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient); + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), + llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); umbilicalResponder.setRecordReader(recordReader); return recordReader; } diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 2288cd4..4159be5 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -103,7 +103,8 @@ public void testValues() throws Exception { writer.close(null); InputStream in = socket.getInputStream(); - LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null); + LlapBaseRecordReader reader = new LlapBaseRecordReader( + in, null, Text.class, job, null, null); LOG.debug("Have record reader");