Index: core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (revision 1492639) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (working copy) @@ -228,7 +228,8 @@ bufferList_ = new ArrayList(numberBuffers_); bufferState_ = new BitSet(numBuffers); - for (int i = 0; i < numBuffers / 2; ++i) { + int initBufferNum = numBuffers / 2 <= 0 ? 1 : numBuffers / 2; + for (int i = 0; i < initBufferNum; ++i) { bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_)); } currentBuffer_ = bufferList_.get(0); @@ -421,29 +422,31 @@ currentBuffer_.flip(); spillStatus_.spillCompleted(); - if (this.startedSpilling_) { - this.spillThread_.completeSpill(); - boolean completionState = false; - try { - completionState = spillThreadState_.get(); - if (!completionState) { - throw new IOException( - "Spilling Thread failed to complete sucessfully."); + try { + if (this.startedSpilling_) { + this.spillThread_.completeSpill(); + boolean completionState = false; + try { + completionState = spillThreadState_.get(); + if (!completionState) { + throw new IOException( + "Spilling Thread failed to complete sucessfully."); + } + } catch (ExecutionException e) { + throw new IOException(e); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + this.spillThreadService_.shutdownNow(); } - } catch (ExecutionException e) { - throw new IOException(e); - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - closed_ = true; - this.processor.close(); - this.spillThreadService_.shutdownNow(); + } else { + this.processor.handleSpilledBuffer(currentBuffer_); } - + } finally { + closed_ = true; + this.processor.close(); } - closed_ = true; } - } /**