Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1497388) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -157,6 +157,9 @@ */ @Override public final void clearOutgoingQueues() { + if (localQueue != null) { + localQueue.close(); + } localQueue = localQueueForNextIteration.getMessageQueue(); localQueue.prepareRead(); localQueueForNextIteration = getSynchronizedReceiverQueue(); Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1497388) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -385,6 +385,10 @@ } catch (Exception e) { LOG.error("Error while sending messages", e); } + MessageQueue msgQueue = (MessageQueue) messages; + if (msgQueue != null) { + msgQueue.close(); + } } if (this.faultToleranceService != null) {