diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 88a7d82a04..fb60b2711d 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -393,6 +393,11 @@ public void close() throws StreamingIOFailure { if (LOG.isDebugEnabled()) { logStats("Stats after close:"); } + try { + this.fs.close(); + } catch (IOException e) { + throw new StreamingIOFailure("Error while closing FileSystem", e); + } if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 74fc5314d9..a32aa62bbc 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -147,6 +147,7 @@ public String toString() { private int countTransactions = 0; private Set partitions; private Long tableId; + private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -389,9 +390,10 @@ public HiveStreamingConnection connect() throws StreamingException { } HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + streamingConnection.onShutdownRunner = streamingConnection::close; // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) - ShutdownHookManager.addShutdownHook(streamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); return streamingConnection; } @@ -651,6 +653,10 @@ public void close() { getMSC().close(); getHeatbeatMSC().close(); } + //remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect() + if (!ShutdownHookManager.isShutdownInProgress()) { + ShutdownHookManager.removeShutdownHook(this.onShutdownRunner); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());