Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1494703) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -377,13 +377,41 @@ final InetSocketAddress addr = entry.getKey(); final Iterable messages = entry.getValue(); - final BSPMessageBundle bundle = combineMessages(messages); + BSPMessageBundle bundle = combineMessages(messages); // remove this message during runtime to save a bit of memory it.remove(); - try { - messenger.transfer(addr, bundle); - } catch (Exception e) { - LOG.error("Error while sending messages", e); + if (combiner != null) { + try { + messenger.transfer(addr, bundle); + } catch (Exception e) { + LOG.error("Error while sending messages", e); + } + } else { + long i = 0; + long bundleThreshold = conf.getLong("hama.messenger.bundle.threshold", + 1048576); + for (M message : messages) { + i++; + if (i <= bundleThreshold) { + bundle.addMessage(message); + } else { + try { + messenger.transfer(addr, bundle); + } catch (Exception e) { + LOG.error("Error while sending messages", e); + } + bundle = new BSPMessageBundle(); + bundle.addMessage(message); + i = 1; + } + } + if (i > 0) { + try { + messenger.transfer(addr, bundle); + } catch (Exception e) { + LOG.error("Error while sending messages", e); + } + } } } @@ -433,11 +461,9 @@ BSPMessageBundle bundle = new BSPMessageBundle(); if (combiner != null) { bundle.addMessage(combiner.combine(messages)); - } else { - for (M message : messages) { - bundle.addMessage(message); - } - } + } /* + * else { for (M message : messages) { bundle.addMessage(message); } } + */ return bundle; }