Index: core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (revision 1441692) +++ core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (working copy) @@ -19,16 +19,34 @@ import java.io.EOFException; import java.io.File; +import java.io.RandomAccessFile; import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.security.SecureRandom; +import java.util.Iterator; +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.Combiner; +import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor; +import org.apache.hama.bsp.message.io.DirectByteBufferInputStream; +import org.apache.hama.bsp.message.io.DirectByteBufferOutputStream; +import org.apache.hama.bsp.message.io.ReusableByteBuffer; +import org.apache.hama.bsp.message.io.SpilledByteBuffer; import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; +import org.apache.hama.bsp.message.io.SpilledDataProcessor; import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer; +import org.apache.hama.bsp.message.io.SyncFlushByteBufferOutputStream; +import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream; import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor; -import junit.framework.TestCase; - public class TestMessageIO extends TestCase { public void testNonSpillBuffer() throws Exception { @@ -39,23 +57,22 @@ for (int i = 0; i < 100; ++i) { text.write(outputBuffer); } - assertTrue(outputBuffer != null); assertTrue(outputBuffer.size() == 4000); assertFalse(outputBuffer.hasSpilled()); - outputBuffer.close(); - } public void testSpillBuffer() throws Exception { + Configuration conf = new HamaConfiguration(); String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32); + SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName); + processor.init(conf); SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2, - 1024, 1024, true, new WriteSpilledDataProcessor(fileName)); + 1024, 1024, true, processor); Text text = new Text("Testing the spillage of spilling buffer"); - for (int i = 0; i < 100; ++i) { text.write(outputBuffer); } @@ -70,48 +87,398 @@ } - public void testSpillInputStream() throws Exception { - String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar - + new BigInteger(128, new SecureRandom()).toString(32); - SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2, - 1024, 1024, true, new WriteSpilledDataProcessor(fileName)); - Text text = new Text("Testing the spillage of spilling buffer"); + public static class SumCombiner extends Combiner { + @Override + public IntWritable combine(Iterable messages) { + int sum = 0; + for (IntWritable intObj : messages) { + sum += intObj.get(); + } + return new IntWritable(sum); + } + + } + + public void testSpillingByteBuffer() throws Exception { + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + SpilledByteBuffer spillBuffer = new SpilledByteBuffer(buffer); for (int i = 0; i < 100; ++i) { - text.write(outputBuffer); + spillBuffer.putInt(i); + spillBuffer.markEndOfRecord(); } + spillBuffer.putInt(100); + assertEquals(spillBuffer.getMarkofLastRecord(), 400); + assertEquals(spillBuffer.remaining(), (512 - 404)); + spillBuffer.flip(); + assertEquals(spillBuffer.remaining(), 404); + assertEquals(spillBuffer.getMarkofLastRecord(), 400); - assertTrue(outputBuffer != null); - assertTrue(outputBuffer.size() == 4000); - assertTrue(outputBuffer.hasSpilled()); - File f = new File(fileName); - assertTrue(f.exists()); - outputBuffer.close(); - assertTrue(f.length() == 4000L); + } - SpilledDataInputBuffer inputBuffer = outputBuffer - .getInputStreamToRead(fileName); + public void testDirectByteBufferOutput() throws Exception { + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + for (int i = 0; i < 100; ++i) { - text.readFields(inputBuffer); - assertTrue("Testing the spillage of spilling buffer".equals(text - .toString())); - text.clear(); + intWritable.set(i); + intWritable.write(stream); } + stream.close(); + + buffer.flip(); + for (int i = 0; i < 100; ++i) { + assertEquals(i, buffer.getInt()); + } + try { - text.readFields(inputBuffer); + buffer.getInt(); assertTrue(false); - } catch (EOFException eof) { + } catch (Exception e) { assertTrue(true); } - inputBuffer.close(); - inputBuffer.completeReading(false); - assertTrue(f.exists()); - inputBuffer.completeReading(true); - assertFalse(f.exists()); + } + public void testDirectByteBufferInput() throws Exception { + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + + for (int i = 0; i < 100; ++i) { + intWritable.set(i); + intWritable.write(stream); + } + intWritable.write(stream); + + stream.close(); + + buffer.flip(); + + DirectByteBufferInputStream inStream = new DirectByteBufferInputStream(); + + inStream.setBuffer(new SpilledByteBuffer(buffer, 400)); + for (int i = 0; i < 100; ++i) { + intWritable.readFields(inStream); + assertEquals(i, intWritable.get()); + } + + assertFalse(inStream.hasDataToRead()); + assertTrue(inStream.hasUnmarkData()); + inStream.prepareForNext(); + + // push in another buffer and check if the unmarked data could be read. + + buffer.clear(); + stream = new DirectByteBufferOutputStream(); + buffer = ByteBuffer.allocateDirect(2048); + stream.setBuffer(buffer); + + for (int i = 0; i < 400; ++i) { + intWritable.set(i); + intWritable.write(stream); + } + stream.close(); + buffer.flip(); + + inStream.setBuffer(new SpilledByteBuffer(buffer, 400)); + + // Read previous data + intWritable.readFields(inStream); + assertEquals(99, intWritable.get()); + + for (int i = 0; i < 100; ++i) { + intWritable.readFields(inStream); + assertEquals(i, intWritable.get()); + } + + assertFalse(inStream.hasDataToRead()); + assertTrue(inStream.hasUnmarkData()); + inStream.prepareForNext(); + + buffer.clear(); + stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + + for (int i = 0; i < 100; ++i) { + intWritable.set(i); + intWritable.write(stream); + } + stream.close(); + buffer.flip(); + + inStream.setBuffer(new SpilledByteBuffer(buffer, 400)); + + // Read previous data with resized intermediate buffer + for (int i = 100; i < 400; ++i) { + intWritable.readFields(inStream); + assertEquals(i, intWritable.get()); + } + + for (int i = 0; i < 100; ++i) { + intWritable.readFields(inStream); + assertEquals(i, intWritable.get()); + } + + assertFalse(inStream.hasDataToRead()); + assertFalse(inStream.hasUnmarkData()); + } + /** + * + * @throws Exception + */ + public void testReusableByteBufferIter() throws Exception { + + ReusableByteBuffer reuseByteBuffer = new ReusableByteBuffer( + new IntWritable()); + + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + + for (int i = 0; i < 100; ++i) { + intWritable.set(i); + intWritable.write(stream); + } + intWritable.write(stream); + stream.close(); + buffer.flip(); + reuseByteBuffer.set(new SpilledByteBuffer(buffer, 400)); + + Iterator iter = reuseByteBuffer.iterator(); + int j = 0; + while (iter.hasNext()) { + assertEquals(iter.next().get(), j++); + } + assertEquals(j, 100); + reuseByteBuffer.prepareForNext(); + + buffer.clear(); + + stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + + for (int i = 0; i < 101; ++i) { + intWritable.set(i); + intWritable.write(stream); + } + stream.close(); + buffer.flip(); + + reuseByteBuffer.set(new SpilledByteBuffer(buffer, 404)); + iter = reuseByteBuffer.iterator(); + assertEquals(iter.next().get(), 99); + + j = 0; + while (iter.hasNext()) { + assertEquals(iter.next().get(), j++); + } + buffer.clear(); + } + + public void testCombineProcessor() throws Exception { + String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar + + new BigInteger(128, new SecureRandom()).toString(32); + + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream(); + stream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + int sum = 0; + for (int i = 0; i < 100; ++i) { + intWritable.set(i); + intWritable.write(stream); + sum += i; + } + intWritable.write(stream); + stream.close(); + buffer.flip(); + + Configuration conf = new HamaConfiguration(); + + conf.setClass(Constants.MESSAGE_CLASS, IntWritable.class, Writable.class); + conf.setClass(Constants.COMBINER_CLASS, SumCombiner.class, Combiner.class); + + CombineSpilledDataProcessor processor = new CombineSpilledDataProcessor( + fileName); + assertTrue(processor.init(conf)); + File f = new File(fileName); + try { + assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer, + 400))); + buffer.flip(); + assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer, + 400))); + assertTrue(processor.close()); + + assertTrue(f.exists()); + assertEquals(f.length(), 8); + + RandomAccessFile raf = new RandomAccessFile(fileName, "r"); + FileChannel fileChannel = raf.getChannel(); + ByteBuffer readBuff = ByteBuffer.allocateDirect(16); + fileChannel.read(readBuff); + readBuff.flip(); + assertEquals(readBuff.getInt(), sum); + assertEquals(readBuff.getInt(), sum + 99); + } finally { + assertTrue(f.delete()); + } + + } + + public void testSpillInputStream() throws Exception { + + File f = null; + try { + String fileName = System.getProperty("java.io.tmpdir") + + File.separatorChar + "testSpillInputStream.txt"; + Configuration conf = new HamaConfiguration(); + SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName); + processor.init(conf); + SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2, + 1024, 1024, true, processor); + Text text = new Text("Testing the spillage of spilling buffer"); + for (int i = 0; i < 100; ++i) { + text.write(outputBuffer); + outputBuffer.markRecordEnd(); + } + + assertTrue(outputBuffer != null); + assertTrue(outputBuffer.size() == 4000); + assertTrue(outputBuffer.hasSpilled()); + f = new File(fileName); + assertTrue(f.exists()); + outputBuffer.close(); + assertTrue(f.length() == 4000);// + (4000 / 1024 + 1) * 4)); + + SpilledDataInputBuffer inputBuffer = outputBuffer + .getInputStreamToRead(fileName); + + for (int i = 0; i < 100; ++i) { + text.readFields(inputBuffer); + assertTrue("Testing the spillage of spilling buffer".equals(text + .toString())); + text.clear(); + } + + try { + text.readFields(inputBuffer); + assertTrue(false); + } catch (EOFException eof) { + assertTrue(true); + } + + inputBuffer.close(); + inputBuffer.completeReading(false); + assertTrue(f.exists()); + inputBuffer.completeReading(true); + assertFalse(f.exists()); + } finally { + if (f != null) { + if (f.exists()) { + f.delete(); + } + } + } + + } + + public void testSyncFlushByteBufferOutputStream() throws Exception { + + File f = null; + try { + String fileName = System.getProperty("java.io.tmpdir") + + File.separatorChar + "testSyncFlushByteBufferOutputStream.txt"; + SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream( + fileName); + DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream( + stream); + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + syncFlushStream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + + for (int i = 0; i < 200; ++i) { + intWritable.set(i); + intWritable.write(syncFlushStream); + } + intWritable.write(syncFlushStream); + syncFlushStream.close(); + + f = new File(fileName); + assertTrue(f.exists()); + assertTrue(f.length() == 804); + assertTrue(f.delete()); + } finally { + if (f != null) { + f.delete(); + } + } + + } + + public void testSyncFlushBufferInputStream() throws Exception { + File f = null; + try { + String fileName = System.getProperty("java.io.tmpdir") + + File.separatorChar + "testSyncFlushBufferInputStream.txt"; + SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream( + fileName); + DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream( + stream); + ByteBuffer buffer = ByteBuffer.allocateDirect(512); + syncFlushStream.setBuffer(buffer); + IntWritable intWritable = new IntWritable(1); + + for (int i = 0; i < 200; ++i) { + intWritable.set(i); + intWritable.write(syncFlushStream); + } + intWritable.write(syncFlushStream); + syncFlushStream.close(); + + f = new File(fileName); + assertTrue(f.exists()); + assertEquals(f.length(), 804); + + SyncReadByteBufferInputStream syncReadStream = new SyncReadByteBufferInputStream( + stream.isSpilled(), fileName); + DirectByteBufferInputStream inStream = new DirectByteBufferInputStream( + syncReadStream); + buffer.clear(); + inStream.setBuffer(buffer); + + for (int i = 0; i < 200; ++i) { + intWritable.readFields(inStream); + assertEquals(intWritable.get(), i); + } + + intWritable.readFields(inStream); + assertEquals(intWritable.get(), 199); + + try { + intWritable.readFields(inStream); + assertFalse(true); + } catch (Exception e) { + assertTrue(true); + } + + inStream.close(); + syncFlushStream.close(); + + } finally { + if (f != null) { + f.delete(); + } + } + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (working copy) @@ -305,4 +305,9 @@ .getJobID().toString()), id.getTaskID().toString()); } + @Override + public boolean isMessageSerialized() { + return false; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (working copy) @@ -31,16 +31,17 @@ import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.Constants; import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor; import org.apache.hama.bsp.message.io.PreFetchCache; import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; import org.apache.hama.bsp.message.io.SpilledDataProcessor; import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer; -import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor; /** * - * + * * @param */ public class SpillingQueue implements MessageQueue { @@ -67,26 +68,25 @@ private SpilledDataInputBuffer spilledInput; private boolean objectWritableMode; private ObjectWritable objectWritable; - + private Class messageClass; private PreFetchCache prefetchCache; private boolean enablePrefetch; - private class SpillIterator implements Iterator { private boolean objectMode; private Class classObject; private M messageHolder; - - public SpillIterator(boolean mode, Class classObj, Configuration conf){ + + public SpillIterator(boolean mode, Class classObj, Configuration conf) { this.objectMode = mode; this.classObject = classObj; - if(classObject != null){ + if (classObject != null) { messageHolder = ReflectionUtils.newInstance(classObj, conf); } } - + @Override public boolean hasNext() { return numMessagesRead != numMessagesWritten && numMessagesWritten > 0; @@ -94,10 +94,9 @@ @Override public M next() { - if(objectMode){ + if (objectMode) { return poll(); - } - else { + } else { return poll(messageHolder); } } @@ -134,6 +133,7 @@ } else { msg.write(spillOutputBuffer); } + spillOutputBuffer.markRecordEnd(); ++numMessagesWritten; } catch (IOException e) { LOG.error("Error adding message.", e); @@ -190,19 +190,21 @@ public void init(Configuration conf, TaskAttemptID arg1) { bufferCount = conf.getInt(SPILLBUFFER_COUNT, 3); - bufferSize = conf.getInt(SPILLBUFFER_SIZE, 16 * 1024); + bufferSize = conf.getInt(SPILLBUFFER_SIZE, Constants.BUFFER_DEFAULT_SIZE); direct = conf.getBoolean(SPILLBUFFER_DIRECT, true); - threshold = conf.getInt(SPILLBUFFER_THRESHOLD, 16 * 1024); + threshold = conf.getInt(SPILLBUFFER_THRESHOLD, + Constants.BUFFER_DEFAULT_SIZE); fileName = conf.get(SPILLBUFFER_FILENAME, System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32)); - - messageClass = (Class) conf.getClass(SPILLBUFFER_MSGCLASS, null); + + messageClass = (Class) conf.getClass(Constants.MESSAGE_CLASS, null); objectWritableMode = messageClass == null; - + SpilledDataProcessor processor; try { - processor = new WriteSpilledDataProcessor(fileName); + processor = new CombineSpilledDataProcessor(fileName); + processor.init(conf); } catch (FileNotFoundException e) { LOG.error("Error initializing spilled data stream.", e); throw new RuntimeException(e); @@ -213,14 +215,14 @@ objectWritable.setConf(conf); this.conf = conf; } - - private void incReadMsgCount(){ + + private void incReadMsgCount() { ++numMessagesRead; } - + @SuppressWarnings("unchecked") - private M readDirect(M msg){ - if(numMessagesRead >= numMessagesWritten){ + private M readDirect(M msg) { + if (numMessagesRead >= numMessagesWritten) { return null; } try { @@ -240,19 +242,18 @@ } public M poll(M msg) { - if(numMessagesRead >= numMessagesWritten){ + if (numMessagesRead >= numMessagesWritten) { return null; } - if(enablePrefetch){ + if (enablePrefetch) { return readFromPrefetch(msg); - } - else { + } else { return readDirect(msg); } } - + @SuppressWarnings("unchecked") - private M readDirectObjectWritable(){ + private M readDirectObjectWritable() { if (!objectWritableMode) { throw new IllegalStateException( "API call not supported. Set the configuration property " @@ -267,34 +268,32 @@ } return (M) objectWritable.get(); } - + @SuppressWarnings({ "unchecked" }) - private M readFromPrefetch(M msg){ - if(objectWritableMode){ + private M readFromPrefetch(M msg) { + if (objectWritableMode) { this.objectWritable = (ObjectWritable) prefetchCache.get(); incReadMsgCount(); - return (M)this.objectWritable.get(); - } - else { + return (M) this.objectWritable.get(); + } else { incReadMsgCount(); - return (M)this.prefetchCache.get(); + return (M) this.prefetchCache.get(); } - + } @Override public M poll() { - if(numMessagesRead >= numMessagesWritten){ + if (numMessagesRead >= numMessagesWritten) { return null; } - - if(enablePrefetch){ + + if (enablePrefetch) { M msg = readFromPrefetch(null); - if(msg != null) + if (msg != null) incReadMsgCount(); return msg; - } - else { + } else { return readDirectObjectWritable(); } } @@ -313,7 +312,7 @@ LOG.error("Error initializing the input spilled stream", e); throw new RuntimeException(e); } - if(conf.getBoolean(ENABLE_PREFETCH, false)){ + if (conf.getBoolean(ENABLE_PREFETCH, false)) { this.prefetchCache = new PreFetchCache(numMessagesWritten); this.enablePrefetch = true; try { @@ -321,6 +320,8 @@ } catch (InterruptedException e) { LOG.error("Error starting prefetch on message queue.", e); throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); } } } @@ -334,5 +335,12 @@ public int size() { return numMessagesWritten; } + + @Override + public boolean isMessageSerialized() { + return true; + } + + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (working copy) @@ -79,5 +79,11 @@ * @return how many items are in the queue. */ public int size(); + + /** + * + * @return true if the messages in the queue are serialized to byte buffers. + */ + public boolean isMessageSerialized(); } Index: core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (working copy) @@ -105,4 +105,9 @@ } + @Override + public boolean isMessageSerialized() { + return false; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (working copy) @@ -103,4 +103,9 @@ } + @Override + public boolean isMessageSerialized() { + return false; + } + } Index: core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (working copy) @@ -23,10 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; import java.util.BitSet; import java.util.List; import java.util.concurrent.Callable; @@ -35,10 +32,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.hama.bsp.message.io.BufferReadStatus; -import org.apache.hama.bsp.message.io.ReadIndexStatus; -import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; -import org.apache.hama.bsp.message.io.SpilledDataReadStatus; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * SpilledDataInputBuffer class is designed to read from the @@ -49,6 +44,8 @@ */ public class SpilledDataInputBuffer extends DataInputStream implements DataInput { + private static final Log LOG = LogFactory + .getLog(SpilledDataInputBuffer.class); /** * The thread is used to asynchronously read from the spilled file and load @@ -57,7 +54,7 @@ static class SpillReadThread implements Callable { private String fileName; - private List bufferList_; + private List bufferList_; private long bytesToRead_; private long bytesWrittenInFile_; private SpilledDataReadStatus status_; @@ -73,7 +70,7 @@ * @param status The shared object that synchronizes the indexes for buffer * to fill the data with. */ - public SpillReadThread(String fileName, List bufferList, + public SpillReadThread(String fileName, List bufferList, SpilledDataReadStatus status) { this.fileName = fileName; bufferList_ = bufferList; @@ -92,7 +89,6 @@ FileChannel fc = raf.getChannel(); bytesToRead_ = fc.size(); bytesWrittenInFile_ = bytesToRead_; - MappedByteBuffer mBuffer = null; long fileReadPos = 0; int fileReadIndex = -1; do { @@ -105,13 +101,14 @@ if (fileReadIndex < 0) break; - ByteBuffer buffer = bufferList_.get(fileReadIndex); + SpilledByteBuffer buffer = bufferList_.get(fileReadIndex); buffer.clear(); long readSize = Math.min(buffer.remaining(), (bytesWrittenInFile_ - fileReadPos)); - - mBuffer = fc.map(MapMode.READ_ONLY, fileReadPos, readSize); - buffer.put(mBuffer); + readSize = fc.read(buffer.getByteBuffer()); + if (readSize < 0) { + break; + } buffer.flip(); bytesToRead_ -= readSize; fileReadPos += readSize; @@ -136,7 +133,13 @@ @Override public Boolean call() throws Exception { - keepReadingFromFile(); + try { + keepReadingFromFile(); + } catch (Exception e) { + LOG.error("Error reading from file: " + fileName, e); + status_.notifyError(); + return Boolean.FALSE; + } return Boolean.TRUE; } @@ -152,7 +155,7 @@ static class SpilledInputStream extends InputStream { private String fileName_; - private List bufferList_; + private List bufferList_; private boolean spilledAlready_; ReadIndexStatus status_; private final byte[] readByte = new byte[1]; @@ -162,13 +165,14 @@ private Callable spillReadThread_; private Future spillReadState_; private ExecutorService spillThreadService_; - private ByteBuffer currentReadBuffer_; + private SpilledByteBuffer currentReadBuffer_; private BitSet bufferBitState_; private boolean closed_; public SpilledInputStream(String fileName, boolean direct, - List bufferList, boolean hasSpilled) throws IOException { + List bufferList, boolean hasSpilled) + throws IOException { fileName_ = fileName; bufferList_ = bufferList; spilledAlready_ = hasSpilled; @@ -193,7 +197,9 @@ (SpilledDataReadStatus) status_); spillThreadService_ = Executors.newFixedThreadPool(1); spillReadState_ = spillThreadService_.submit(spillReadThread_); - status_.startReading(); + if (!status_.startReading()) { + throw new IOException("Failed to read the spilled file: " + fileName_); + } } try { currentReadBuffer_ = getNextBuffer(); @@ -217,7 +223,7 @@ } } - public ByteBuffer getNextBuffer() throws InterruptedException { + public SpilledByteBuffer getNextBuffer() throws InterruptedException { int index = status_.getReadBufferIndex(); if (index >= 0 && index < bufferList_.size()) { return bufferList_.get(index); @@ -362,7 +368,7 @@ } public static SpilledDataInputBuffer getSpilledDataInputBuffer( - String fileName, boolean direct, List bufferList) + String fileName, boolean direct, List bufferList) throws IOException { SpilledInputStream inStream = new SpilledInputStream(fileName, direct, bufferList, true); Index: core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (working copy) @@ -17,10 +17,10 @@ */ package org.apache.hama.bsp.message.io; +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; @@ -28,8 +28,7 @@ import org.apache.hadoop.conf.Configuration; /** - * - * + * A {@link SpilledDataProcessor} that writes the spilled data to the file. */ public class WriteSpilledDataProcessor implements SpilledDataProcessor { @@ -37,28 +36,43 @@ .getLog(WriteSpilledDataProcessor.class); private FileChannel fileChannel; - private RandomAccessFile raf; private String fileName; - + public WriteSpilledDataProcessor(String fileName) throws FileNotFoundException { this.fileName = fileName; - raf = new RandomAccessFile(fileName, "rw"); - fileChannel = raf.getChannel(); } + private void initializeFileChannel() { + FileOutputStream stream; + try { + stream = new FileOutputStream(new File(fileName), true); + } catch (FileNotFoundException e) { + LOG.error("Error opening file to write spilled data.", e); + throw new RuntimeException(e); + } + fileChannel = stream.getChannel(); + } + @Override public boolean init(Configuration conf) { + return true; } @Override - public boolean handleSpilledBuffer(ByteBuffer buffer) { + public boolean handleSpilledBuffer(SpilledByteBuffer buffer) { try { - fileChannel.write(buffer); + + if(fileChannel == null){ + initializeFileChannel(); + } + + fileChannel.write(buffer.getByteBuffer()); + fileChannel.force(true); return true; } catch (IOException e) { - LOG.error("Error writing to file:"+fileName, e); + LOG.error("Error writing to file:" + fileName, e); } return false; } Index: core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (working copy) @@ -52,7 +52,7 @@ /** * Indicate to start reading. */ - public abstract void startReading(); + public abstract boolean startReading(); } Index: core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.security.SecureRandom; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.Constants; + +/** + * A synchronous i/o stream that is used to write data into and to read back the + * written data. + */ +public class DualChannelByteBufferStream { + + private DirectByteBufferOutputStream outputBuffer; + private SyncFlushByteBufferOutputStream outputStream; + private DirectByteBufferInputStream inputBuffer; + private SyncReadByteBufferInputStream inputStream; + + private String fileName; + + private ByteBuffer buffer; + private boolean outputMode; + private boolean inputMode; + + public void init(Configuration conf) { + + boolean directAlloc = conf.getBoolean(Constants.BYTEBUFFER_DIRECT, + Constants.BYTEBUFFER_DIRECT_DEFAULT); + int size = conf.getInt(Constants.BYTEBUFFER_SIZE, + Constants.BUFFER_DEFAULT_SIZE); + if (directAlloc) { + buffer = ByteBuffer.allocateDirect(size); + } else { + buffer = ByteBuffer.allocateDirect(size); + } + fileName = conf.get(Constants.DATA_SPILL_PATH) + File.separatorChar + + new BigInteger(128, new SecureRandom()).toString(32); + outputMode = true; + outputStream = new SyncFlushByteBufferOutputStream(fileName); + outputBuffer = new DirectByteBufferOutputStream(outputStream); + outputStream.setBuffer(buffer); + + } + + public DirectByteBufferOutputStream getOutputStream() { + return outputBuffer; + } + + public void closeOutput() throws IOException { + if (outputMode) { + outputBuffer.close(); + } + outputMode = false; + } + + public void close() throws IOException{ + closeInput(); + closeOutput(); + } + + public boolean prepareRead() throws IOException { + outputStream.close(); + outputMode = false; + buffer.clear(); + inputStream = new SyncReadByteBufferInputStream(outputStream.isSpilled(), + fileName); + inputBuffer = new DirectByteBufferInputStream(inputStream); + inputBuffer.setBuffer(buffer); + inputMode = true; + return true; + } + + public DirectByteBufferInputStream getInputStream() { + return inputBuffer; + } + + public void closeInput() throws IOException { + if (inputMode) { + inputBuffer.close(); + } + inputMode = false; + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java (revision 0) @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * SpilledByteBuffer encapsulates a ByteBuffer. It lets the user to + * mark the end of last record written into the buffer. + * + */ +public class SpilledByteBuffer { + + ByteBuffer buffer; + int endOfRecord; + Class writableClass; + + public SpilledByteBuffer(boolean direct, int size) { + if (direct) { + buffer = ByteBuffer.allocate(size); + } else { + buffer = ByteBuffer.allocateDirect(size); + } + } + + public void setRecordClass(Class classObj) { + this.writableClass = classObj; + } + + public Class getRecordClass(){ + return this.writableClass; + } + + public SpilledByteBuffer(ByteBuffer byteBuffer) { + this.buffer = byteBuffer; + } + + public SpilledByteBuffer(ByteBuffer byteBuffer, int markEnd) { + this.buffer = byteBuffer; + this.endOfRecord = markEnd; + } + + public void markEndOfRecord() { + this.endOfRecord = this.buffer.position(); + } + + public void markEndOfRecord(int pos){ + if(pos < this.buffer.capacity()){ + this.endOfRecord = pos; + } + } + + public int getMarkofLastRecord(){ + return this.endOfRecord; + } + + public ByteBuffer getByteBuffer() { + return buffer; + } + + public CharBuffer asCharBuffer() { + return buffer.asCharBuffer(); + } + + public DoubleBuffer asDoubleBuffer() { + return buffer.asDoubleBuffer(); + } + + public FloatBuffer asFloatBuffer() { + return buffer.asFloatBuffer(); + } + + public IntBuffer asIntBuffer() { + return buffer.asIntBuffer(); + } + + public LongBuffer asLongBuffer() { + return buffer.asLongBuffer(); + } + + public SpilledByteBuffer asReadOnlyBuffer() { + return new SpilledByteBuffer(buffer.asReadOnlyBuffer()); + } + + public ShortBuffer asShortBuffer() { + return buffer.asShortBuffer(); + } + + public SpilledByteBuffer compact() { + buffer.compact(); + return this; + } + + public SpilledByteBuffer duplicate() { + buffer.duplicate(); + return new SpilledByteBuffer(this.buffer, this.endOfRecord); + } + + public byte get() { + return buffer.get(); + } + + public byte get(int index) { + return buffer.get(index); + } + + public char getChar() { + return buffer.getChar(); + } + + public char getChar(int index) { + return buffer.getChar(index); + } + + public double getDouble() { + return buffer.getDouble(); + } + + public double getDouble(int index) { + return buffer.getDouble(index); + } + + public float getFloat() { + return buffer.getFloat(); + } + + public float getFloat(int index) { + return buffer.getFloat(index); + } + + public int getInt() { + return buffer.getInt(); + } + + public int getInt(int index) { + return buffer.getInt(index); + } + + public long getLong() { + return buffer.getLong(); + } + + public long getLong(int index) { + return buffer.getLong(); + } + + public short getShort() { + return buffer.getShort(); + } + + public short getShort(int index) { + return buffer.getShort(index); + } + + public SpilledByteBuffer put(byte b) { + buffer.put(b); + return this; + } + + public SpilledByteBuffer put(int index, byte b) { + buffer.put(index, b); + return this; + } + + public SpilledByteBuffer putChar(char value) { + buffer.putChar(value); + return this; + } + + public SpilledByteBuffer putChar(int index, char value) { + buffer.putChar(index, value); + return this; + } + + public SpilledByteBuffer putDouble(double value) { + buffer.putDouble(value); + return this; + } + + public SpilledByteBuffer putDouble(int index, double value) { + buffer.putDouble(index, value); + return this; + } + + public SpilledByteBuffer putFloat(float value) { + buffer.putFloat(value); + return this; + } + + public SpilledByteBuffer putFloat(int index, float value) { + buffer.putFloat(index, value); + return this; + } + + public SpilledByteBuffer putInt(int index, int value) { + buffer.putInt(index, value); + return this; + } + + public SpilledByteBuffer putInt(int value) { + buffer.putInt(value); + return this; + } + + public SpilledByteBuffer putLong(int index, long value) { + buffer.putLong(index, value); + return this; + } + + public SpilledByteBuffer putLong(long value) { + buffer.putLong(value); + return this; + } + + public SpilledByteBuffer putShort(int index, short value) { + buffer.putShort(index, value); + return this; + } + + public SpilledByteBuffer putShort(short value) { + buffer.putShort(value); + return this; + } + + public SpilledByteBuffer slice() { + return new SpilledByteBuffer(buffer.slice()); + } + + public byte[] array() { + return buffer.array(); + } + + public int arrayOffset() { + return buffer.arrayOffset(); + } + + public boolean hasArray() { + return buffer.hasArray(); + } + + public boolean isDirect() { + return buffer.isDirect(); + } + + public boolean isReadOnly() { + return buffer.isReadOnly(); + } + + public void clear() { + buffer.clear(); + } + + public SpilledByteBuffer flip() { + buffer.flip(); + return this; + } + + public int remaining() { + return buffer.remaining(); + } + + public void put(byte[] b, int off, int rem) { + buffer.put(b, off, rem); + } + + public void put(ByteBuffer byteBuffer) { + buffer.put(byteBuffer); + + } + + public int capacity(){ + return this.buffer.capacity(); + } + + public void get(byte[] b, int off, int readSize) { + buffer.get(b, off, readSize); + + } +} Index: core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java (revision 0) @@ -0,0 +1,78 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * A utility class to just hold the byte buffer to read back the written data + * or write data to read back. Buffer overflow, underflow conditions are not + * enforced. + * + */ +public class DuplexByteArrayChannel implements WritableByteChannel, + ReadableByteChannel { + + private boolean open; + private ByteBuffer buffer; + + DuplexByteArrayChannel(){ + + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + open = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int num = dst.remaining(); + dst.put(buffer); + return num - dst.remaining(); + } + + @Override + public int write(ByteBuffer src) throws IOException { + int num = buffer.remaining(); + buffer.put(src); + return num - buffer.remaining(); + } + + public void setBuffer(ByteBuffer buffer) { + this.buffer = buffer; + open = true; + } + + public void flip() { + buffer.flip(); + } + + public ByteBuffer getBuffer(){ + return buffer; + } +} Index: core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java (revision 0) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.Writable; + +/** + * + * + * @param + */ +public class ReusableByteBuffer implements Iterable { + + private DirectByteBufferInputStream stream; + private SpilledByteBuffer buffer; + private boolean isIterStarted; + + private M message; + + private static class ReusableByteBufferIterator + implements Iterator { + + private ReusableByteBuffer buffer; + private M message; + + public ReusableByteBufferIterator(ReusableByteBuffer bbuffer, M msg) { + this.buffer = bbuffer; + this.message = msg; + } + + @Override + public boolean hasNext() { + if (!buffer.isIterStarted) { + throw new IllegalStateException( + "Iterator should be reinitialized to work with new buffer."); + } + return buffer.stream.hasDataToRead(); + } + + @Override + public M next() { + if (!buffer.isIterStarted) { + throw new IllegalStateException( + "Iterator should be reinitialized to work with new buffer."); + } + try { + message.readFields(this.buffer.stream); + } catch (IOException e) { + throw new RuntimeException(e); + } + return message; + } + + @Override + public void remove() { + } + } + + public ReusableByteBuffer(M reusableObject) { + stream = new DirectByteBufferInputStream(); + message = reusableObject; + } + + public void set(SpilledByteBuffer buffer) throws IOException { + this.buffer = buffer; + stream.setBuffer(this.buffer); + isIterStarted = false; + } + + public void setReusableObject(M object) { + this.message = object; + } + + @Override + public Iterator iterator() { + if (isIterStarted) { + throw new UnsupportedOperationException( + "Only one iterator creation is allowed."); + } + isIterStarted = true; + return new ReusableByteBufferIterator(this, message); + } + + public void prepareForNext() throws IOException { + this.stream.prepareForNext(); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java (revision 0) @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +/** + * A {@link ByteBuffer} input stream that synchronously reads from spilled data. + * Uses {@link DuplexByteArrayChannel} within. + */ +public class SyncReadByteBufferInputStream extends ByteBufferInputStream { + + private static final Log LOG = LogFactory + .getLog(SyncReadByteBufferInputStream.class); + + private boolean spilled; + private FileChannel fileChannel; + private long fileBytesToRead; + private long fileBytesRead; + private DuplexByteArrayChannel duplexChannel = new DuplexByteArrayChannel(); + + public SyncReadByteBufferInputStream(boolean isSpilled, String fileName) { + spilled = isSpilled; + if (isSpilled) { + RandomAccessFile f; + try { + f = new RandomAccessFile(fileName, "r"); + fileChannel = f.getChannel(); + fileBytesToRead = fileChannel.size(); + } catch (FileNotFoundException e) { + LOG.error("File not found initializing Synchronous Input Byte Stream", + e); + throw new RuntimeException(e); + } catch (IOException e) { + LOG.error("Error initializing Synchronous Input Byte Stream", e); + throw new RuntimeException(e); + } + + } + } + + private void feedDataFromFile() throws IOException { + int toReadNow = (int) Math.min(buffer.capacity(), fileBytesToRead); + fileChannel.transferTo(fileBytesRead, toReadNow, duplexChannel); + fileBytesRead += toReadNow; + fileBytesToRead -= toReadNow; + duplexChannel.flip(); + } + + @Override + public void setBuffer(ByteBuffer buffer, long toRead, long total) + throws IOException { + this.buffer = buffer; + duplexChannel.setBuffer(buffer); + if (spilled) { + feedDataFromFile(); + } + super.setBuffer(buffer, fileBytesToRead, fileBytesToRead); + + } + + @Override + protected int onBufferRead(byte[] b, int off, int len, int cur) + throws IOException { + + if (fileBytesToRead == 0) { + return cur == 0 ? -1 : cur; + } + + if (spilled) { + buffer.clear(); + feedDataFromFile(); + } + return cur += read(b, off, len); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (working copy) @@ -50,7 +50,8 @@ } @Override - public void startReading() { + public boolean startReading() { + return true; } } Index: core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java (revision 0) @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hama.Constants; +import org.apache.hama.bsp.Combiner; +import org.apache.hama.util.ReflectionUtils; + +/** + * This data processor adds a stage in between the spillage of data. Based on + * the combiner provided it combines the bunch of messages in the byte buffer + * and then writes them to the file writer using the base class handle method. + * TODO: Experiment the feature with pipelining design. + */ +public class CombineSpilledDataProcessor extends + WriteSpilledDataProcessor { + + public static Log LOG = LogFactory.getLog(CombineSpilledDataProcessor.class); + + Combiner combiner; + M writableObject; + ReusableByteBuffer iterator; + DirectByteBufferOutputStream combineOutputBuffer; + ByteBuffer combineBuffer; + + public CombineSpilledDataProcessor(String fileName) + throws FileNotFoundException { + super(fileName); + } + + @Override + public boolean init(Configuration conf) { + if (!super.init(conf)) { + return false; + } + String className = conf.get(Constants.COMBINER_CLASS); + + if (className == null) + return true; + try { + combiner = ReflectionUtils.newInstance(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + className = conf.get(Constants.MESSAGE_CLASS); + + if (className != null) { + try { + writableObject = ReflectionUtils.newInstance(className); + iterator = new ReusableByteBuffer(writableObject); + } catch (ClassNotFoundException e) { + LOG.error("Error combining the records.", e); + return false; + } + } + + combineOutputBuffer = new DirectByteBufferOutputStream(); + combineBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_DEFAULT_SIZE); + combineOutputBuffer.setBuffer(combineBuffer); + return true; + } + + @Override + public boolean handleSpilledBuffer(SpilledByteBuffer buffer) { + + if (combiner == null || writableObject == null) { + return super.handleSpilledBuffer(buffer); + } + + try { + iterator.set(buffer); + } catch (IOException e1) { + LOG.error("Error setting buffer for combining data", e1); + return false; + } + Writable combinedMessage = combiner.combine(iterator); + try { + iterator.prepareForNext(); + } catch (IOException e1) { + LOG.error("Error preparing for next buffer.", e1); + return false; + } + try { + combinedMessage.write(this.combineOutputBuffer); + } catch (IOException e) { + LOG.error("Error writing the combiner output.", e); + return false; + } + try { + this.combineOutputBuffer.flush(); + } catch (IOException e) { + LOG.error("Error flushing the combiner output.", e); + return false; + } + this.combineOutputBuffer.getBuffer().flip(); + try { + return super.handleSpilledBuffer(new SpilledByteBuffer( + this.combineOutputBuffer.getBuffer(), this.combineOutputBuffer + .getBuffer().remaining())); + } finally { + this.combineOutputBuffer.clear(); + } + } + + @Override + public boolean close() { + return true; + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java (revision 0) @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Encapsulates {@link ByteBufferInputStream} + */ +public class DirectByteBufferInputStream extends DataInputStream implements + DataInput { + + public DirectByteBufferInputStream(ByteBufferInputStream in) { + super(in); + } + + public DirectByteBufferInputStream() { + super(new ByteBufferInputStream()); + } + + public void prepareForNext() throws IOException { + ByteBufferInputStream stream = (ByteBufferInputStream) this.in; + stream.fillForNext(); + } + + public boolean hasDataToRead() { + ByteBufferInputStream stream = (ByteBufferInputStream) this.in; + return stream.hasDataToRead(); + } + + public boolean hasUnmarkData() { + ByteBufferInputStream stream = (ByteBufferInputStream) this.in; + return stream.hasUnmarkedData(); + } + + public void setBuffer(SpilledByteBuffer buff) throws IOException { + ByteBufferInputStream stream = (ByteBufferInputStream) this.in; + stream.setBuffer(buff.getByteBuffer(), buff.getMarkofLastRecord(), + buff.remaining()); + } + + public void setBuffer(ByteBuffer buffer) throws IOException { + ByteBufferInputStream stream = (ByteBufferInputStream) this.in; + stream.setBuffer(buffer, buffer.remaining(), buffer.remaining()); + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (working copy) @@ -142,12 +142,14 @@ public void startFetching(Class classObject, SpilledDataInputBuffer buffer, Configuration conf) - throws InterruptedException { + throws InterruptedException, IOException { preFetchThread = new PreFetchThread(classObject, objectListArr, capacity, buffer, totalMessages, status, conf); preFetchThread.start(); - status.startReading(); + if(!status.startReading()){ + throw new IOException("Failed to start reading the spilled file: "); + } arrIndex = status.getReadBufferIndex(); } Index: core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java (revision 0) @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; + +/** + * Encapsulates a {@link DirectByteBufferOutputStream}. + * + */ +public class DirectByteBufferOutputStream extends DataOutputStream { + + public DirectByteBufferOutputStream() { + super(new ByteBufferOutputStream()); + } + + public DirectByteBufferOutputStream(ByteBufferOutputStream stream){ + super(stream); + } + + public ByteBuffer getBuffer() { + ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out; + return stream.getBuffer(); + } + + public void setBuffer(ByteBuffer buffer) { + ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out; + stream.setBuffer(buffer); + } + + public void clear(){ + ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out; + stream.clear(); + } +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (working copy) @@ -38,6 +38,7 @@ private volatile boolean fileReadComplete_; private volatile boolean bufferReadComplete_; private volatile BitSet bufferBitState_; + private volatile boolean errorState_; public SpilledDataReadStatus(int totalSize, BitSet bufferBitState) { readBufferIndex_ = -1; @@ -47,8 +48,22 @@ bufferReadComplete_ = false; totalSize_ = totalSize; bufferBitState_ = bufferBitState; + errorState_ = false; } + private int checkError(int index) { + if (errorState_) { + return -1; + } else { + return index; + } + } + + public void notifyError() { + errorState_ = true; + notify(); + } + @Override public synchronized int getReadBufferIndex() throws InterruptedException { @@ -59,14 +74,14 @@ notify(); } readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_; - while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_) { + while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_ && !errorState_) { wait(); } // The file is completely read and transferred to buffers already. if (bufferBitState_.isEmpty() && fileReadComplete_) { return -1; } - return readBufferIndex_; + return checkError(readBufferIndex_); } @Override @@ -91,7 +106,7 @@ fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_; while (bufferBitState_.get(fetchFileBufferIndex_) - && !bufferReadComplete_) { + && !bufferReadComplete_ && !errorState_) { wait(); } @@ -99,7 +114,7 @@ return -1; } - return fetchFileBufferIndex_; + return checkError(fetchFileBufferIndex_); } @Override @@ -121,14 +136,16 @@ } @Override - public synchronized void startReading() { - while (!spilledReadStart_) + public synchronized boolean startReading() { + while (!spilledReadStart_ && !errorState_) { try { wait(); } catch (InterruptedException e) { LOG.error("Interrupted waiting to read the spilled file.", e); throw new RuntimeException(e); } + } + return !errorState_; } } Index: core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (working copy) @@ -17,33 +17,34 @@ */ package org.apache.hama.bsp.message.io; -import java.nio.ByteBuffer; - import org.apache.hadoop.conf.Configuration; /** - * Base interface defining the behaviour to process the spilled data provided - * in a byte buffer. + * Base interface defining the behaviour to process the spilled data provided in + * a byte buffer. */ public interface SpilledDataProcessor { /** * Initialize the data processor. + * * @param conf * @return true if no errors. */ boolean init(Configuration conf); - + /** * Override the method to define the action to be taken on the spilled data * provided in the byte buffer. + * * @param buffer * @return true if no errors. */ - boolean handleSpilledBuffer(ByteBuffer buffer); - + boolean handleSpilledBuffer(SpilledByteBuffer buffer); + /** * Close the data processor. + * * @return true if no errors. */ boolean close(); Index: core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.ArrayList; import java.util.BitSet; @@ -36,11 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hama.bsp.message.io.SpillWriteIndexStatus; -import org.apache.hama.bsp.message.io.SpilledDataInputBuffer; -import org.apache.hama.bsp.message.io.SpilledDataProcessor; -import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer; -import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor; /** * SpillingBuffer is an output stream comprised of byte arrays that @@ -56,7 +50,8 @@ */ public class SpillingDataOutputBuffer extends DataOutputStream { - private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class); + private static final Log LOG = LogFactory + .getLog(SpillingDataOutputBuffer.class); /** * This thread is responsible for writing from the ByteBuffers in the list to @@ -64,13 +59,13 @@ */ static class ProcessSpilledDataThread implements Callable { private SpillWriteIndexStatus status_; - private List bufferList_; + private List bufferList_; private long fileWrittenSize_; private boolean closed; SpilledDataProcessor processor; ProcessSpilledDataThread(SpillWriteIndexStatus status, - List bufferList, SpilledDataProcessor processor) { + List bufferList, SpilledDataProcessor processor) { status_ = status; bufferList_ = bufferList; closed = false; @@ -86,6 +81,7 @@ private void keepProcessingData() throws IOException { int fileWriteIndex = -1; + do { try { @@ -94,7 +90,7 @@ throw new IOException(e1); } while (fileWriteIndex >= 0) { - ByteBuffer buffer = bufferList_.get(fileWriteIndex); + SpilledByteBuffer buffer = bufferList_.get(fileWriteIndex); processor.handleSpilledBuffer(buffer); buffer.clear(); try { @@ -107,6 +103,9 @@ } } while (!closed); + if (LOG.isDebugEnabled()) { + LOG.debug("Done handling spilling data."); + } } @@ -128,7 +127,13 @@ @Override public Boolean call() throws Exception { - keepProcessingData(); + try { + keepProcessingData(); + } catch (Exception e) { + LOG.error("Error handling spilled data.", e); + status_.notifyError(); + return Boolean.FALSE; + } return Boolean.TRUE; } @@ -146,12 +151,13 @@ final byte[] b; final boolean direct_; - private List bufferList_; + private List bufferList_; private int bufferSize_; private BitSet bufferState_; private int numberBuffers_; - private ByteBuffer currentBuffer_; - private long bytesWritten_; + private SpilledByteBuffer currentBuffer_; + protected long bytesWritten_; + protected long bytesWrittenToBuffer; private long bytesRemaining_; private SpillWriteIndexStatus spillStatus_; private int thresholdSize_; @@ -159,8 +165,9 @@ private ProcessSpilledDataThread spillThread_; private ExecutorService spillThreadService_; private Future spillThreadState_; - private boolean closed_; + private boolean closed_;; + private int interBufferEndOfRecord; private SpilledDataProcessor processor; /** * The internal buffer where data is stored. @@ -208,8 +215,8 @@ assert (threshold >= bufferSize); assert (threshold < numBuffers * bufferSize); - if(interBufferSize > bufferSize){ - interBufferSize = bufferSize/2; + if (interBufferSize > bufferSize) { + interBufferSize = bufferSize / 2; } defaultBufferSize_ = interBufferSize; this.b = new byte[1]; @@ -218,15 +225,11 @@ direct_ = direct; numberBuffers_ = numBuffers; bufferSize_ = bufferSize; - bufferList_ = new ArrayList(numberBuffers_); + bufferList_ = new ArrayList(numberBuffers_); bufferState_ = new BitSet(numBuffers); for (int i = 0; i < numBuffers / 2; ++i) { - if (direct_) { - bufferList_.add(ByteBuffer.allocateDirect(bufferSize_)); - } else { - bufferList_.add(ByteBuffer.allocate(bufferSize_)); - } + bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_)); } currentBuffer_ = bufferList_.get(0); bytesWritten_ = 0L; @@ -241,8 +244,14 @@ closed_ = false; } - - public void clear() throws IOException{ + + public void markEndOfRecord() { + interBufferEndOfRecord = (int) (this.bytesWrittenToBuffer + count); + if (currentBuffer_.capacity() > interBufferEndOfRecord) + this.currentBuffer_.markEndOfRecord(interBufferEndOfRecord); + } + + public void clear() throws IOException { this.close(); startedSpilling_ = false; bufferState_.clear(); @@ -261,7 +270,7 @@ buf[count++] = (byte) (b & 0xFF); return; } - + this.b[0] = (byte) (b & 0xFF); write(this.b); } @@ -277,51 +286,21 @@ * * @param len * @throws InterruptedException + * @throws IOException */ - private void startSpilling() throws InterruptedException { + private void startSpilling() throws InterruptedException, IOException { synchronized (this) { spillThread_ = new ProcessSpilledDataThread(spillStatus_, bufferList_, processor); startedSpilling_ = true; spillThreadService_ = Executors.newFixedThreadPool(1); spillThreadState_ = spillThreadService_.submit(spillThread_); - spillStatus_.startSpilling(); + if (!spillStatus_.startSpilling()) { + throw new IOException("Could not start spilling on disk."); + } } - // } } - public void perfectFillWrite(byte[] b, int off, int len) throws IOException { - int rem = currentBuffer_.remaining(); - while (len > rem) { - currentBuffer_.put(b, off, rem); - // bytesWritten_ += len; - // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) { - // try { - // startSpilling(rem); - // } catch (InterruptedException e) { - // throw new IOException("Internal error occured writing to buffer.", - // e); - // } - // } - - currentBuffer_.flip(); - int index = spillStatus_.getNextBufferIndex(); - currentBuffer_ = getBuffer(index); - off += rem; - len -= rem; - rem = currentBuffer_.remaining(); - } - currentBuffer_.put(b, off, len); - // bytesWritten_ += len; - // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) { - // try { - // startSpilling(len); - // } catch (InterruptedException e) { - // throw new IOException("Internal error occured writing to buffer.", e); - // } - // } - } - @Override public void write(byte[] b, int off, int len) throws IOException { if (len >= buf.length) { @@ -341,10 +320,11 @@ count += len; } - private void writeInternal(byte[] b, int off, int len) throws IOException { + @SuppressWarnings("unused") + private void writeInternalImperfect(byte[] b, int off, int len) + throws IOException { - bytesWritten_ += len; - if (bytesWritten_ >= thresholdSize_ && !startedSpilling_) { + if (!startedSpilling_ && bytesWritten_ >= thresholdSize_) { try { startSpilling(); } catch (InterruptedException e) { @@ -356,12 +336,52 @@ currentBuffer_.flip(); currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex()); bytesRemaining_ = bufferSize_; + this.bytesWrittenToBuffer = bytesWritten_; } currentBuffer_.put(b, off, len); bytesRemaining_ -= len; + bytesWritten_ += len; } + public void writeInternal(byte[] b, int off, int len) throws IOException { + int rem = currentBuffer_.remaining(); + while (len > rem) { + currentBuffer_.put(b, off, rem); + bytesWritten_ += rem; + if (!startedSpilling_) { + checkSpillStart(); + } + currentBuffer_.flip(); + currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex()); + if (currentBuffer_ == null) + throw new IOException( + "Error writing to spilling buffer. Could not get free buffer."); + bytesRemaining_ = bufferSize_; + this.bytesWrittenToBuffer = 0; + off += rem; + len -= rem; + rem = currentBuffer_.remaining(); + } + currentBuffer_.put(b, off, len); + bytesWritten_ += len; + bytesRemaining_ -= len; + if (!startedSpilling_) { + checkSpillStart(); + } + this.bytesWrittenToBuffer += len; + } + + private void checkSpillStart() throws IOException { + if (bytesWritten_ >= thresholdSize_) { + try { + startSpilling(); + } catch (InterruptedException e) { + throw new IOException("Internal error occured writing to buffer.", e); + } + } + } + /** Flush the internal buffer */ private void flushBuffer() throws IOException { if (count > 0) { @@ -377,16 +397,13 @@ * @return * @throws IOException */ - ByteBuffer getBuffer(int index) throws IOException { - + SpilledByteBuffer getBuffer(int index) throws IOException { + if(index < 0){ + return null; + } if (index >= bufferList_.size()) { - if (direct_) { - bufferList_.add(index, ByteBuffer.allocateDirect(bufferSize_)); - } else { - bufferList_.add(index, ByteBuffer.allocate(bufferSize_)); - } + bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_)); } - return bufferList_.get(index); } @@ -422,7 +439,7 @@ this.processor.close(); this.spillThreadService_.shutdownNow(); } - + } } @@ -432,28 +449,27 @@ * Initialize the spilling buffer with spilling file name * * @param fileName name of the file. - * @throws FileNotFoundException + * @throws FileNotFoundException */ public SpillingDataOutputBuffer(String fileName) throws FileNotFoundException { - super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, + super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, new WriteSpilledDataProcessor(fileName))); } - - public SpillingDataOutputBuffer(SpilledDataProcessor processor) throws FileNotFoundException { - super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, - processor)); + + public SpillingDataOutputBuffer(SpilledDataProcessor processor) + throws FileNotFoundException { + super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, processor)); } - - /** * Initializes the spilling buffer. - * @throws FileNotFoundException + * + * @throws FileNotFoundException */ public SpillingDataOutputBuffer() throws FileNotFoundException { super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, - new WriteSpilledDataProcessor( - System.getProperty("java.io.tmpdir") + File.separatorChar + new WriteSpilledDataProcessor(System.getProperty("java.io.tmpdir") + + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32)))); } @@ -465,32 +481,36 @@ * @param direct * @param fileName */ - public SpillingDataOutputBuffer(int bufferCount, int bufferSize, int threshold, - boolean direct, SpilledDataProcessor processor) { + public SpillingDataOutputBuffer(int bufferCount, int bufferSize, + int threshold, boolean direct, SpilledDataProcessor processor) { super(new SpillingStream(bufferCount, bufferSize, threshold, direct, processor)); } - - public void clear() throws IOException{ + + public void clear() throws IOException { SpillingStream stream = (SpillingStream) this.out; stream.clear(); } - - public boolean hasSpilled(){ + + public boolean hasSpilled() { return ((SpillingStream) this.out).startedSpilling_; } + public void markRecordEnd() { + ((SpillingStream) this.out).markEndOfRecord(); + } + /** * Provides an input stream to read from the spilling buffer. * * @throws IOException */ - public SpilledDataInputBuffer getInputStreamToRead(String fileName) throws IOException { + public SpilledDataInputBuffer getInputStreamToRead(String fileName) + throws IOException { SpillingStream stream = (SpillingStream) this.out; SpilledDataInputBuffer.SpilledInputStream inStream = new SpilledDataInputBuffer.SpilledInputStream( - fileName, stream.direct_, stream.bufferList_, - stream.startedSpilling_); + fileName, stream.direct_, stream.bufferList_, stream.startedSpilling_); inStream.prepareRead(); return new SpilledDataInputBuffer(inStream); } Index: core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java (revision 0) @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ByteBufferInputStream is used to read back from SpilledByteBuffer. The + * goal of this class to read data till a boundary record marked. The remaining + * data which is the partial record in the byte buffer is read into the intermediate + * buffer using {@link ByteBufferInputStream#fillForNext()}. If this stream is + * to be used as an intermediate buffer to read from a bigger source of data, + * say a file, the function {@link ByteBufferInputStream#onBufferRead(byte[], int, int, int)} + * could be used to fill up the ByteBuffer encapsulated by the class. + * + */ +public class ByteBufferInputStream extends InputStream { + + private final byte[] readByte = new byte[1]; + private int interBuffSize; + private byte[] interBuffer; + private int dataSizeInInterBuffer; + private int curPos; + + protected ByteBuffer buffer; + private long toBeRead; + private long bytesRead; + private long totalBytes; + + public ByteBufferInputStream() { + } + + @Override + public void close() throws IOException { + super.close(); + } + + public boolean hasDataToRead() { + return ((toBeRead - bytesRead) > 0); + } + + public boolean hasUnmarkedData() { + return ((totalBytes - bytesRead) > 0); + } + + @Override + public int read() throws IOException { + if (dataSizeInInterBuffer > 0) { + --dataSizeInInterBuffer; + ++bytesRead; + return interBuffer[curPos++] & 0xFF; + } + + if (-1 == read(readByte, 0, 1)) { + return -1; + } + return readByte[0] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (dataSizeInInterBuffer >= len) { + // copy the count of bytes to b + System.arraycopy(interBuffer, curPos, b, off, len); + dataSizeInInterBuffer -= len; + curPos += len; + bytesRead += len; + return len; + } + int size = 0; + + while (len > 0) { + if (dataSizeInInterBuffer == 0) { + dataSizeInInterBuffer = readInternal(interBuffer, 0, interBuffer.length); + curPos = 0; + if (dataSizeInInterBuffer <= 0) { + break; + } + + } + int readSize = Math.min(dataSizeInInterBuffer, len); + System.arraycopy(interBuffer, curPos, b, off, readSize); + len -= readSize; + off += readSize; + size += readSize; + dataSizeInInterBuffer -= readSize; + curPos += readSize; + } + bytesRead += size; + return size; + } + + public int readInternal(byte[] b, int off, int len) throws IOException { + if (buffer == null) { + return -1; + } + int cur = 0; + while (len > 0) { + int rem = buffer.remaining(); + if (rem == 0) { + return onBufferRead(b, off, len, cur); + } + int readSize = Math.min(rem, len); + buffer.get(b, off, readSize); + len -= readSize; + off += readSize; + cur += readSize; + } + return cur; + } + + /** + * When the byte buffer encapsulated is out of data then this function is + * invoked. + * + * @param b the byte buffer to read into + * @param off offset index to start writing + * @param len length of data to be written + * @param cur The current size already read by the class. + * @return if the end of the stream has reached, and cur is 0 return -1; else + * return the data size currently read. + * @throws IOException + */ + protected int onBufferRead(byte[] b, int off, int len, int cur) + throws IOException { + if (cur != 0) + return cur; + else + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Sets the byte buffer to read the data from. + * @param buffer The byte buffer to read data from + * @param toRead Number of bytes till end of last record. + * @param total Total bytes of data to read in the buffer. + * @throws IOException + */ + public void setBuffer(ByteBuffer buffer, long toRead, long total) + throws IOException { + this.buffer = buffer; + toBeRead = toRead += dataSizeInInterBuffer; + totalBytes = total; + bytesRead = 0; + if (interBuffer == null) { + interBuffSize = Math.min(buffer.remaining(), 8192); + interBuffer = new byte[interBuffSize]; + dataSizeInInterBuffer = 0; + } + fetchIntermediate(); + } + + private void fetchIntermediate() throws IOException { + int readSize = readInternal(interBuffer, dataSizeInInterBuffer, + interBuffer.length - dataSizeInInterBuffer); + if (readSize > 0) { + dataSizeInInterBuffer += readSize; + } + curPos = 0; + } + + /** + * This function should be called to provision reading the partial records + * into the buffer after the last record in the buffer is read. This data + * would be appended with the next ByteBuffer that is set using + * {@link ByteBufferInputStream#setBuffer(ByteBuffer, long, long)} to start + * reading records. + * @throws IOException + */ + public void fillForNext() throws IOException { + + int remainingBytes = buffer.remaining(); + if (curPos != 0) { + System.arraycopy(interBuffer, curPos, interBuffer, 0, + dataSizeInInterBuffer); + } + curPos = 0; + if (remainingBytes == 0) + return; + + if (dataSizeInInterBuffer + remainingBytes > interBuffSize) { + interBuffSize = dataSizeInInterBuffer + remainingBytes; + byte[] arr = this.interBuffer; + this.interBuffer = new byte[interBuffSize]; + System.arraycopy(arr, 0, interBuffer, 0, dataSizeInInterBuffer); + } + int readSize = readInternal(this.interBuffer, dataSizeInInterBuffer, + remainingBytes); + if (readSize > 0) + dataSizeInInterBuffer += readSize; + } +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java (revision 0) @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +/** + * A {@link ByteBuffer} stream that synchronously writes the spilled data to + * local storage. + * + */ +public class SyncFlushByteBufferOutputStream extends ByteBufferOutputStream { + + String fileName; + FileChannel channel; + FileOutputStream stream; + private boolean spilled; + + public SyncFlushByteBufferOutputStream(String fileName) { + super(); + this.fileName = fileName; + } + + @Override + protected boolean onBufferFull(byte[] b, int off, int len) throws IOException { + buffer.flip(); + if (channel == null) { + File f = new File(fileName); + stream = new FileOutputStream(f, true); + channel = stream.getChannel(); + } + channel.write(buffer); + channel.write(ByteBuffer.wrap(b, off, len)); + + channel.force(true); + buffer.clear(); + spilled = true; + return false; + } + + @Override + protected void onFlush() throws IOException { + if (spilled) { + buffer.flip(); + channel.write(buffer); + channel.force(true); + channel.close(); + } + } + + public boolean isSpilled() { + return spilled; + } + +} Index: core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (revision 1441692) +++ core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (working copy) @@ -22,8 +22,8 @@ /** * This class provides a thread-safe interface for both the spilling thread and - * the thread that is writing into SpillingBuffer. It stores the - * state and manipulates the next available buffer for both the threads. + * the thread that is writing into {@link SpillingDataOutputBuffer}. It stores + * the state and manipulates the next available buffer for both the threads. */ class SpillWriteIndexStatus { @@ -33,6 +33,7 @@ private volatile boolean spillStart; private int numBuffers; private volatile BitSet bufferBitState; + private volatile boolean errorState; SpillWriteIndexStatus(int size, int bufferCount, int bufferIndex, int fileWriteIndex, BitSet bufferBitState) { @@ -42,6 +43,7 @@ processorBufferIndex = fileWriteIndex; numBuffers = bufferCount; this.bufferBitState = bufferBitState; + errorState = false; } /** @@ -57,16 +59,25 @@ bufferBitState.set(bufferListWriteIndex, true); notify(); bufferListWriteIndex = (bufferListWriteIndex + 1) % numBuffers; - while (bufferBitState.get(bufferListWriteIndex)) { + while (bufferBitState.get(bufferListWriteIndex) && !errorState) { try { wait(); } catch (InterruptedException e) { throw new IOException(e); } } - return bufferListWriteIndex; + return checkError(bufferListWriteIndex); } + private int checkError(int index) { + return errorState ? -1 : index; + } + + public void notifyError() { + errorState = true; + notify(); + } + /** * Returns the index of the next buffer from the list that has information * written to be spilled to disk. The call blocks until a buffer is available @@ -90,15 +101,16 @@ notify(); } processorBufferIndex = (processorBufferIndex + 1) % numBuffers; - while (!bufferBitState.get(processorBufferIndex) && !spillComplete) { + while (!bufferBitState.get(processorBufferIndex) && !spillComplete + && !errorState) { wait(); } // Is the last buffer written to file after the spilling is complete? // then complete the operation. - if (spillComplete && bufferBitState.isEmpty()) { + if ((spillComplete && bufferBitState.isEmpty()) || errorState) { return -1; } - return processorBufferIndex; + return checkError(processorBufferIndex); } /** @@ -107,12 +119,14 @@ * spilled. * * @throws InterruptedException + * @return whether the spill started correctly */ - public synchronized void startSpilling() throws InterruptedException { + public synchronized boolean startSpilling() throws InterruptedException { - while (!spillStart) { + while (!spillStart && !errorState) { wait(); } + return !errorState; } /** Index: core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java (revision 0) @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * ByteBufferOutputStream encapsulates a byte buffer to write data into. The + * function {@link ByteBufferOutputStream#onBufferFull(byte[], int, int)} + * should be overriden to handle the case when the size of data exceeds the size + * of buffer. The default behavior is to throw an exception. + */ +class ByteBufferOutputStream extends OutputStream { + + private final byte[] b = new byte[1]; + private byte[] interBuffer; + private int interBufferDataSize; + protected ByteBuffer buffer; + + public void clear() { + if (this.buffer != null) { + this.buffer.clear(); + } + interBufferDataSize = 0; + } + + /** + * Sets the buffer for the stream. + * @param buffer byte buffer to hold within. + */ + public void setBuffer(ByteBuffer buffer) { + this.buffer = buffer; + this.interBufferDataSize = 0; + int interSize = Math.min(buffer.capacity()/2, 8192); + if(interBuffer == null){ + interBuffer = new byte[interSize]; + } + + } + + @Override + public void write(int b) throws IOException { + if (interBufferDataSize < interBuffer.length - 1) { + interBuffer[interBufferDataSize++] = (byte) (b & 0xFF); + return; + } + + this.b[0] = (byte) (b & 0xFF); + write(this.b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len >= interBuffer.length) { + /* + * If the request length exceeds the size of the output buffer, flush the + * output buffer and then write the data directly. In this way buffered + * streams will cascade harmlessly. + */ + flushBuffer(); + writeInternal(b, off, len); + return; + } + if (len > interBuffer.length - interBufferDataSize) { + flushBuffer(); + } + System.arraycopy(b, off, interBuffer, interBufferDataSize, len); + interBufferDataSize += len; + } + + private void writeInternal(byte[] b, int off, int len) throws IOException { + + if (len <= buffer.remaining() || onBufferFull(b, off, len)) { + buffer.put(b, off, len); + } + } + + /** + * Action to take when the data to be written exceeds the size of the byte + * buffer inside. + * + * @return + * @throws IOException + */ + protected boolean onBufferFull(byte[] b, int off, int len) throws IOException { + return true; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + onFlush(); + } + + /** + * Called when the byte buffer stream is closed. + * @throws IOException + */ + protected void onFlush() throws IOException { + + } + + /** Flush the internal buffer */ + private void flushBuffer() throws IOException { + if (interBufferDataSize > 0) { + writeInternal(interBuffer, 0, interBufferDataSize); + interBufferDataSize = 0; + } + } + + public ByteBuffer getBuffer() { + return this.buffer; + } + +} Index: core/src/main/java/org/apache/hama/Constants.java =================================================================== --- core/src/main/java/org/apache/hama/Constants.java (revision 1441692) +++ core/src/main/java/org/apache/hama/Constants.java (working copy) @@ -64,12 +64,12 @@ public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts"; public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job"; - + public static final String COMBINER_CLASS = "bsp.combiner.class"; public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2; - - //////////////////////////////////////// + + // ////////////////////////////////////// // Task scheduler related constants // ////////////////////////////////////// @@ -97,13 +97,22 @@ // ///////////////////////////////////////////// // Job configuration related parameters. // ///////////////////////////////////////////// - public static final String JOB_INPUT_DIR = "bsp.input.dir"; - public static final String JOB_PEERS_COUNT = "bsp.peers.num"; - public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class"; + public static final String JOB_INPUT_DIR = "bsp.input.dir"; + public static final String JOB_PEERS_COUNT = "bsp.peers.num"; + public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class"; public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class"; - + public static final String MESSAGE_CLASS = "bsp.message.type.class"; // ///////////////////////////////////////////// + // Messaging related parameters. + // ///////////////////////////////////////////// + public static final int BUFFER_DEFAULT_SIZE = 16 * 1024; + public static final String BYTEBUFFER_SIZE = "bsp.message.bytebuffer.size"; + public static final String BYTEBUFFER_DIRECT = "bsp.message.bytebuffer.direct"; + public static final boolean BYTEBUFFER_DIRECT_DEFAULT = true; + public static final String DATA_SPILL_PATH = "bsp.data.spill.location"; + + // ///////////////////////////////////////////// // Constants related to partitioning // ///////////////////////////////////////////// public static final String RUNTIME_PARTITIONING_DIR = "bsp.partitioning.dir"; @@ -112,7 +121,6 @@ public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks"; public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter"; - // ///////////////////////////////////// // Constants for ZooKeeper // /////////////////////////////////////