diff --git common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java index 42286be9d8..56ec2fd6a1 100644 --- common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java +++ common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java @@ -28,6 +28,8 @@ import java.util.List; import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.ListenerNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ private final double threshold; private List listeners = new ArrayList<>(); + private NotificationListener notificationListener; public interface Listener { void memoryUsageAboveThreshold(long usedMemory, long maxMemory); @@ -140,7 +143,7 @@ public void start() { } MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); NotificationEmitter emitter = (NotificationEmitter) mxBean; - emitter.addNotificationListener((n, hb) -> { + notificationListener = (n, hb) -> { if (n.getType().equals( MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { long maxMemory = tenuredGenPool.getUsage().getMax(); @@ -149,6 +152,19 @@ public void start() { listener.memoryUsageAboveThreshold(usedMemory, maxMemory); } } - }, null, null); + }; + emitter.addNotificationListener(notificationListener, null, null); } -} \ No newline at end of file + + public void close() { + if(notificationListener != null) { + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + NotificationEmitter emitter = (NotificationEmitter) mxBean; + try { + emitter.removeNotificationListener(notificationListener); + } catch(ListenerNotFoundException e) { + LOG.warn("Failed to remove HeapMemoryMonitor notification listener from MemoryMXBean", e); + } + } + } +} diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 88a7d82a04..e7588e8860 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -366,6 +366,7 @@ public void flush() throws StreamingIOFailure { @Override public void close() throws StreamingIOFailure { + heapMemoryMonitor.close(); boolean haveError = false; String partition = null; if (LOG.isDebugEnabled()) {