.../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 28 +++++++++++++------ .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 32 +++++++++++++++++++++- .../regionserver/wal/AsyncProtobufLogWriter.java | 11 +++++--- 3 files changed, 58 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 807d82a..37ad651 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -24,34 +24,40 @@ import java.nio.channels.CompletionHandler; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import java.io.OutputStream; /** * Interface for asynchronous filesystem output stream. */ @InterfaceAudience.Private -public interface AsyncFSOutput extends Closeable { +public abstract class AsyncFSOutput extends OutputStream { /** + * Writes an int to the buffer + * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. + */ + public abstract void write(int b) throws IOException; + /** * Just call write(b, 0, b.length). * @see #write(byte[], int, int) */ - void write(byte[] b); + public abstract void write(byte[] b); /** * Copy the data into the buffer. Note that you need to call * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. */ - void write(byte[] b, int off, int len); + public abstract void write(byte[] b, int off, int len); /** * Return the current size of buffered data. */ - int buffered(); + abstract int buffered(); /** * Return current pipeline. Empty array if no pipeline. */ - DatanodeInfo[] getPipeline(); + abstract DatanodeInfo[] getPipeline(); /** * Flush the buffer out. @@ -59,17 +65,23 @@ public interface AsyncFSOutput extends Closeable { * @param handler will set the acked length as result when completed. * @param sync persistent the data to device */ - void flush(A attachment, final CompletionHandler handler, boolean sync); + public abstract void flush(A attachment, final CompletionHandler handler, boolean sync); /** * The close method when error occurred. */ - void recoverAndClose(CancelableProgressable reporter) throws IOException; + public abstract void recoverAndClose(CancelableProgressable reporter) throws IOException; /** * Close the file. You should call {@link #recoverAndClose(CancelableProgressable)} if this method * throws an exception. */ @Override - void close() throws IOException; + public abstract void close() throws IOException; + + @Override + public void flush() throws IOException { + throw new IllegalStateException( + "call flush(A attachment, final CompletionHandler handler, boolean sync)"); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 9aab924..5721acc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -61,6 +61,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -101,7 +103,7 @@ import org.apache.hadoop.util.DataChecksum; * */ @InterfaceAudience.Private -public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { +public class FanOutOneBlockAsyncDFSOutput extends AsyncFSOutput { private final Configuration conf; @@ -358,6 +360,19 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } } + private void writeInt0(int val) { + buf.ensureWritable(Bytes.SIZEOF_INT); + if (cryptoCodec == null) { + buf.writeInt(val); + } else { + ByteBuffer inBuffer = ByteBuffer.allocate(Bytes.SIZEOF_INT); + ByteBufferUtils.putInt(inBuffer, val); + // Check : Should we reset the inBuffer pos to 0? + cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), Bytes.SIZEOF_INT)); + buf.writerIndex(buf.writerIndex() + Bytes.SIZEOF_INT); + } + } + @Override public void write(final byte[] b, final int off, final int len) { if (eventLoop.inEventLoop()) { @@ -374,6 +389,21 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } @Override + public void write(int b) throws IOException { + if (eventLoop.inEventLoop()) { + writeInt0(b); + } else { + eventLoop.submit(new Runnable() { + + @Override + public void run() { + writeInt0(b); + } + }).syncUninterruptibly(); + } + } + + @Override public int buffered() { if (eventLoop.inEventLoop()) { return buf.readableBytes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e2080ff..32ac2f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; @@ -116,12 +117,14 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements } length.addAndGet(buf.size()); output.write(buf.getBuffer(), 0, buf.size()); + buf.reset(); try { for (Cell cell : entry.getEdit().getCells()) { - buf.reset(); cellEncoder.write(cell); - length.addAndGet(buf.size()); - output.write(buf.getBuffer(), 0, buf.size()); + // how to get size here?? Just passing true here for now. + // TODO : Better to add a return 'int' on the ExtendedCell#write to know the actual length that was + // written + length.addAndGet(KeyValueUtil.getSerializedSize(cell, true)); } } catch (IOException e) { throw new AssertionError("should not happen", e); @@ -196,6 +199,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements @Override protected OutputStream getOutputStreamForCellEncoder() { - return buf; + return output; } }