From e820d9d35e7b0fda1512f2a987a86cdf1c6422f9 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 30 Oct 2016 20:03:08 +0800 Subject: [PATCH] HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput --- .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 10 +- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 28 +- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 210 +++--- .../FanOutOneBlockAsyncDFSOutputHelper.java | 257 ++------ .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 702 +++++---------------- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 190 +++--- .../regionserver/wal/AsyncProtobufLogWriter.java | 94 ++- .../org/apache/hadoop/hbase/wal/WALProvider.java | 4 +- .../FanOutOneBlockAsyncDFSOutputFlushHandler.java | 61 -- .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 52 +- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 4 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 67 +- .../regionserver/wal/TestAsyncProtobufLog.java | 21 +- 13 files changed, 502 insertions(+), 1198 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 0c60d3cf..7d513db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -39,8 +40,8 @@ public interface AsyncFSOutput extends Closeable { void write(byte[] b); /** - * Copy the data into the buffer. Note that you need to call - * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. + * Copy the data into the buffer. Note that you need to call {@link #flush(boolean)} to flush the + * buffer manually. */ void write(byte[] b, int off, int len); @@ -66,11 +67,10 @@ public interface AsyncFSOutput extends Closeable { /** * Flush the buffer out. - * @param attachment will be passed to handler when completed. - * @param handler will set the acked length as result when completed. * @param sync persistent the data to device + * @return A CompletableFuture that hold the acked length after flushing. */ - void flush(A attachment, final CompletionHandler handler, boolean sync); + CompletableFuture flush(boolean sync); /** * The close method when error occurred. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index c9d4e70..7fe86be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -25,7 +25,7 @@ import io.netty.channel.EventLoop; import java.io.IOException; import java.io.InterruptedIOException; import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -80,11 +80,7 @@ public final class AsyncFSOutputHelper { if (eventLoop.inEventLoop()) { out.write(b, off, len); } else { - eventLoop.submit(new Runnable() { - public void run() { - out.write(b, off, len); - } - }).syncUninterruptibly(); + eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly(); } } @@ -103,15 +99,14 @@ public final class AsyncFSOutputHelper { return new DatanodeInfo[0]; } - private void flush0(A attachment, CompletionHandler handler, - boolean sync) { + private void flush0(CompletableFuture future, boolean sync) { try { synchronized (out) { fsOut.write(out.getBuffer(), 0, out.size()); out.reset(); } } catch (IOException e) { - eventLoop.execute(() -> handler.failed(e, attachment)); + eventLoop.execute(() -> future.completeExceptionally(e)); return; } try { @@ -120,17 +115,18 @@ public final class AsyncFSOutputHelper { } else { fsOut.hflush(); } - final long pos = fsOut.getPos(); - eventLoop.execute(() -> handler.completed(pos, attachment)); - } catch (final IOException e) { - eventLoop.execute(() -> handler.failed(e, attachment)); + long pos = fsOut.getPos(); + eventLoop.execute(() -> future.complete(pos)); + } catch (IOException e) { + eventLoop.execute(() -> future.completeExceptionally(e)); } } @Override - public void flush(A attachment, CompletionHandler handler, - boolean sync) { - flushExecutor.execute(() -> flush0(attachment, handler, sync)); + public CompletableFuture flush(boolean sync) { + CompletableFuture future = new CompletableFuture<>(); + flushExecutor.execute(() -> flush0(future, sync)); + return future; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 916e534..02ffcd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -26,8 +26,6 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import com.google.common.base.Supplier; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -39,14 +37,11 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseCombiner; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; @@ -54,13 +49,15 @@ import java.util.Deque; import java.util.IdentityHashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -87,8 +84,8 @@ import org.apache.hadoop.util.DataChecksum; * need one thread here. But be careful, we do some blocking operations in {@link #close()} and * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)}, - * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them - * outside {@link EventLoop}, there will be an extra context-switch. + * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop}, + * there will be an extra context-switch. *

* Advantages compare to DFSOutputStream: *

    @@ -125,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final LocatedBlock locatedBlock; - private final CryptoCodec cryptoCodec; + private final Encryptor encryptor; private final EventLoop eventLoop; @@ -151,8 +148,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { if (replicas.isEmpty()) { this.unfinishedReplicas = Collections.emptySet(); } else { - this.unfinishedReplicas = Collections - .newSetFromMap(new IdentityHashMap(replicas.size())); + this.unfinishedReplicas = + Collections.newSetFromMap(new IdentityHashMap(replicas.size())); this.unfinishedReplicas.addAll(replicas); } } @@ -215,13 +212,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { // disable further write, and fail all pending ack. state = State.BROKEN; Throwable error = errorSupplier.get(); - for (Callback c : waitingAckQueue) { - c.promise.tryFailure(error); - } + waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error)); waitingAckQueue.clear(); - for (Channel ch : datanodeList) { - ch.close(); - } + datanodeList.forEach(ch -> ch.close()); } @Sharable @@ -234,29 +227,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } @Override - protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack) - throws Exception { - final Status reply = getStatus(ack); + protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { + Status reply = getStatus(ack); if (reply != Status.SUCCESS) { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock() - + " from datanode " + ctx.channel().remoteAddress()); - } - }); + failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " + + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); return; } if (PipelineAck.isRestartOOBStatus(reply)) { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return new IOException("Restart response " + reply + " for block " - + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); - } - }); + failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " + + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); return; } if (ack.getSeqno() == HEART_BEAT_SEQNO) { @@ -266,25 +246,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"); - } - }); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + failed(ctx.channel(), + () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return cause; - } - }); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + failed(ctx.channel(), () -> cause); } @Override @@ -292,13 +261,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == READER_IDLE) { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); - } - }); + failed(ctx.channel(), + () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); } else if (e.state() == WRITER_IDLE) { PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); int len = heartbeat.getSerializedSize(); @@ -326,7 +290,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop, + LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop, List datanodeList, DataChecksum summer, ByteBufAllocator alloc) { this.conf = conf; this.fsUtils = fsUtils; @@ -337,7 +301,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.clientName = clientName; this.src = src; this.locatedBlock = locatedBlock; - this.cryptoCodec = cryptoCodec; + this.encryptor = encryptor; this.eventLoop = eventLoop; this.datanodeList = datanodeList; this.summer = summer; @@ -350,14 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private void writeInt0(int i) { buf.ensureWritable(4); - if (cryptoCodec == null) { - buf.writeInt(i); - } else { - ByteBuffer inBuffer = ByteBuffer.allocate(4); - inBuffer.putInt(0, i); - cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4)); - buf.writerIndex(buf.writerIndex() + 4); - } + buf.writeInt(i); } @Override @@ -370,14 +327,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } private void write0(ByteBuffer bb) { - int len = bb.remaining(); - buf.ensureWritable(len); - if (cryptoCodec == null) { - buf.writeBytes(bb); - } else { - cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len)); - buf.writerIndex(buf.writerIndex() + len); - } + buf.ensureWritable(bb.remaining()); + buf.writeBytes(bb); } @Override @@ -394,19 +345,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { write(b, 0, b.length); } - private void write0(byte[] b, final int off, final int len) { + private void write0(byte[] b, int off, int len) { buf.ensureWritable(len); - if (cryptoCodec == null) { - buf.writeBytes(b, off, len); - } else { - ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len); - cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len)); - buf.writerIndex(buf.writerIndex() + len); - } + buf.writeBytes(b, off, len); } @Override - public void write(final byte[] b, final int off, final int len) { + public void write(byte[] b, int off, int len) { if (eventLoop.inEventLoop()) { write0(b, off, len); } else { @@ -464,27 +409,40 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { return promise; } - private void flush0(final A attachment, final CompletionHandler handler, - boolean syncBlock) { + private void flush0(CompletableFuture future, boolean syncBlock) { if (state != State.STREAMING) { - handler.failed(new IOException("stream already broken"), attachment); + future.completeExceptionally(new IOException("stream already broken")); return; } int dataLen = buf.readableBytes(); - final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; + if (encryptor != null) { + ByteBuf encryptBuf = alloc.directBuffer(dataLen); + try { + encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen), + encryptBuf.nioBuffer(0, dataLen)); + } catch (IOException e) { + encryptBuf.release(); + future.completeExceptionally(e); + return; + } + encryptBuf.writerIndex(dataLen); + buf.release(); + buf = encryptBuf; + } + long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) { // no new data, just return - handler.completed(locatedBlock.getBlock().getNumBytes(), attachment); + future.complete(locatedBlock.getBlock().getNumBytes()); return; } Callback c = waitingAckQueue.peekLast(); if (c != null && lengthAfterFlush == c.ackedLength) { // just append it to the tail of waiting ack queue,, do not issue new hflush request. - waitingAckQueue.addLast(new Callback(eventLoop. newPromise().addListener(future -> { - if (future.isSuccess()) { - handler.completed(lengthAfterFlush, attachment); + waitingAckQueue.addLast(new Callback(eventLoop. newPromise().addListener(f -> { + if (f.isSuccess()) { + future.complete(lengthAfterFlush); } else { - handler.failed(future.cause(), attachment); + future.completeExceptionally(f.cause()); } }), lengthAfterFlush, Collections. emptyList())); return; @@ -506,11 +464,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } else { promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock); } - promise.addListener(future -> { - if (future.isSuccess()) { - handler.completed(lengthAfterFlush, attachment); + promise.addListener(f -> { + if (f.isSuccess()) { + future.complete(lengthAfterFlush); } else { - handler.failed(future.cause(), attachment); + future.completeExceptionally(f.cause()); } }); int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); @@ -525,23 +483,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { /** * Flush the buffer out to datanodes. - * @param attachment will be passed to handler when completed. - * @param handler will set the acked length as result when completed. * @param syncBlock will call hsync if true, otherwise hflush. + * @return A CompletableFuture that hold the acked length after flushing. */ - public void flush(final A attachment, final CompletionHandler handler, - final boolean syncBlock) { + public CompletableFuture flush(boolean syncBlock) { + CompletableFuture future = new CompletableFuture(); if (eventLoop.inEventLoop()) { - flush0(attachment, handler, syncBlock); + flush0(future, syncBlock); } else { - eventLoop.execute(new Runnable() { - - @Override - public void run() { - flush0(attachment, handler, syncBlock); - } - }); + eventLoop.execute(() -> flush0(future, syncBlock)); } + return future; } private void endBlock(Promise promise, long size) { @@ -558,13 +510,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { buf.release(); buf = null; int headerLen = header.getSerializedSize(); - ByteBuf headerBuf = alloc.buffer(headerLen); + ByteBuf headerBuf = alloc.directBuffer(headerLen); header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); headerBuf.writerIndex(headerLen); waitingAckQueue.add(new Callback(promise, size, datanodeList)); - for (Channel ch : datanodeList) { - ch.writeAndFlush(headerBuf.duplicate().retain()); - } + datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain())); headerBuf.release(); } @@ -574,10 +524,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { @Override public void recoverAndClose(CancelableProgressable reporter) throws IOException { assert !eventLoop.inEventLoop(); - for (Channel ch : datanodeList) { - ch.closeFuture().awaitUninterruptibly(); - } - endFileLease(client, src, fileId); + datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + endFileLease(client, fileId); fsUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); } @@ -589,26 +537,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { @Override public void close() throws IOException { assert !eventLoop.inEventLoop(); - final Promise promise = eventLoop.newPromise(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()); - } - }); - promise.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - for (Channel ch : datanodeList) { - ch.close(); - } - } - }).syncUninterruptibly(); - for (Channel ch : datanodeList) { - ch.closeFuture().awaitUninterruptibly(); - } + Promise promise = eventLoop.newPromise(); + eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes())); + promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly(); + datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 51c48ce..875ff77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,7 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; @@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemLinkResolver; @@ -74,7 +76,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -96,7 +97,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; @@ -143,26 +143,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; - // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and - // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the - // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more - // details. + // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here + // we need to use reflection to set it.See createStorageTypeSetter for more details. private interface StorageTypeSetter { OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum storageType); } private static final StorageTypeSetter STORAGE_TYPE_SETTER; - // helper class for calling create method on namenode. There is a supportedVersions parameter for - // hadoop 2.6 or after. See createFileCreater for more details. - private interface FileCreater { - HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable flag, boolean createParent, - short replication, long blockSize) throws IOException; - } - - private static final FileCreater FILE_CREATER; - // helper class for calling add block method on namenode. There is a addBlockFlags parameter for // hadoop 2.8 or later. See createBlockAdder for more details. private interface BlockAdder { @@ -174,13 +162,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final BlockAdder BLOCK_ADDER; - // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and - // hadoop 2.5 or after use inodeId. See createLeaseManager for more details. private interface LeaseManager { - void begin(DFSClient client, String src, long inodeId); + void begin(DFSClient client, long inodeId); - void end(DFSClient client, String src, long inodeId); + void end(DFSClient client, long inodeId); } private static final LeaseManager LEASE_MANAGER; @@ -197,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // helper class for convert protos. private interface PBHelper { - ExtendedBlockProto convert(final ExtendedBlock b); + ExtendedBlockProto convert(ExtendedBlock b); TokenProto convert(Token tok); } @@ -212,7 +198,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final ChecksumCreater CHECKSUM_CREATER; private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { - final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); + Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); isClientRunningMethod.setAccessible(true); return new DFSClientAdaptor() { @@ -227,16 +213,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }; } - private static LeaseManager createLeaseManager25() throws NoSuchMethodException { - final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", - long.class, DFSOutputStream.class); + private static LeaseManager createLeaseManager() throws NoSuchMethodException { + Method beginFileLeaseMethod = + DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); + Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); endFileLeaseMethod.setAccessible(true); return new LeaseManager() { @Override - public void begin(DFSClient client, String src, long inodeId) { + public void begin(DFSClient client, long inodeId) { try { beginFileLeaseMethod.invoke(client, inodeId, null); } catch (IllegalAccessException | InvocationTargetException e) { @@ -245,7 +231,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } @Override - public void end(DFSClient client, String src, long inodeId) { + public void end(DFSClient client, long inodeId) { try { endFileLeaseMethod.invoke(client, inodeId); } catch (IllegalAccessException | InvocationTargetException e) { @@ -255,66 +241,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }; } - private static LeaseManager createLeaseManager24() throws NoSuchMethodException { - final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", - String.class, DFSOutputStream.class); - beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", - String.class); - endFileLeaseMethod.setAccessible(true); - return new LeaseManager() { - - @Override - public void begin(DFSClient client, String src, long inodeId) { - try { - beginFileLeaseMethod.invoke(client, src, null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public void end(DFSClient client, String src, long inodeId) { - try { - endFileLeaseMethod.invoke(client, src); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static LeaseManager createLeaseManager() throws NoSuchMethodException { - try { - return createLeaseManager25(); - } catch (NoSuchMethodException e) { - LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e); - } - return createLeaseManager24(); - } - private static PipelineAckStatusGetter createPipelineAckStatusGetter27() throws NoSuchMethodException { - final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); + Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); @SuppressWarnings("rawtypes") Class ecnClass; try { ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") .asSubclass(Enum.class); } catch (ClassNotFoundException e) { - final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " + String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + "HBASE-16110 for more information."; LOG.error(msg, e); throw new Error(msg, e); } @SuppressWarnings("unchecked") - final Enum disabledECN = Enum.valueOf(ecnClass, "DISABLED"); - final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); - final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass, - Status.class); - final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader", - int.class); + Enum disabledECN = Enum.valueOf(ecnClass, "DISABLED"); + Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); + Method combineHeaderMethod = + PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); + Method getStatusFromHeaderMethod = + PipelineAck.class.getMethod("getStatusFromHeader", int.class); return new PipelineAckStatusGetter() { @Override @@ -339,7 +287,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static PipelineAckStatusGetter createPipelineAckStatusGetter26() throws NoSuchMethodException { - final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); + Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); return new PipelineAckStatusGetter() { @Override @@ -363,30 +311,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return createPipelineAckStatusGetter26(); } - private static StorageTypeSetter createStorageTypeSetter() { - final Method setStorageTypeMethod; - try { - setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType", - StorageTypeProto.class); - } catch (NoSuchMethodException e) { - LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e); - return new StorageTypeSetter() { - - @Override - public Builder set(Builder builder, Enum storageType) { - return builder; - } - }; - } + private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException { + Method setStorageTypeMethod = + OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); ImmutableMap.Builder builder = ImmutableMap.builder(); for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { builder.put(storageTypeProto.name(), storageTypeProto); } - final ImmutableMap name2ProtoEnum = builder.build(); + ImmutableMap name2ProtoEnum = builder.build(); return new StorageTypeSetter() { @Override - public Builder set(Builder builder, Enum storageType) { + public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum storageType) { Object protoEnum = name2ProtoEnum.get(storageType.name()); try { setStorageTypeMethod.invoke(builder, protoEnum); @@ -398,62 +334,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }; } - private static FileCreater createFileCreater() throws ClassNotFoundException, - NoSuchMethodException, IllegalAccessException, InvocationTargetException { - for (Method method : ClientProtocol.class.getMethods()) { - if (method.getName().equals("create")) { - final Method createMethod = method; - Class[] paramTypes = createMethod.getParameterTypes(); - if (paramTypes[paramTypes.length - 1] == long.class) { - return new FileCreater() { - - @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable flag, boolean createParent, - short replication, long blockSize) throws IOException { - try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, - createParent, replication, blockSize); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } else { - Class cryptoProtocolVersionClass = Class - .forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); - Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); - final Object supported = supportedMethod.invoke(null); - return new FileCreater() { - - @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable flag, boolean createParent, - short replication, long blockSize) throws IOException { - try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, - createParent, replication, blockSize, supported); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } - } - } - throw new NoSuchMethodException("Can not find create method in ClientProtocol"); - } - private static BlockAdder createBlockAdder() throws NoSuchMethodException { for (Method method : ClientProtocol.class.getMethods()) { if (method.getName().equals("addBlock")) { - final Method addBlockMethod = method; + Method addBlockMethod = method; Class[] paramTypes = addBlockMethod.getParameterTypes(); if (paramTypes[paramTypes.length - 1] == String[].class) { return new BlockAdder() { @@ -505,8 +389,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; } - final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); - final Method convertTokenMethod = helperClass.getMethod("convert", Token.class); + Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); + Method convertTokenMethod = helperClass.getMethod("convert", Token.class); return new PBHelper() { @Override @@ -533,7 +417,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { throws NoSuchMethodException { for (Method method : confClass.getMethods()) { if (method.getName().equals("createChecksum")) { - final Method createChecksumMethod = method; + Method createChecksumMethod = method; return new ChecksumCreater() { @Override @@ -552,7 +436,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static ChecksumCreater createChecksumCreater27(Class confClass) throws NoSuchMethodException { - final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); + Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); createChecksumMethod.setAccessible(true); return new ChecksumCreater() { @@ -597,14 +481,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { try { PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); STORAGE_TYPE_SETTER = createStorageTypeSetter(); - FILE_CREATER = createFileCreater(); BLOCK_ADDER = createBlockAdder(); LEASE_MANAGER = createLeaseManager(); DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); PB_HELPER = createPBHelper(); CHECKSUM_CREATER = createChecksumCreater(); } catch (Exception e) { - final String msg = "Couldn't properly initialize access to HDFS internals. Please " + String msg = "Couldn't properly initialize access to HDFS internals. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + "HBASE-16110 for more information."; LOG.error(msg, e); @@ -612,12 +495,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } } - static void beginFileLease(DFSClient client, String src, long inodeId) { - LEASE_MANAGER.begin(client, src, inodeId); + static void beginFileLease(DFSClient client, long inodeId) { + LEASE_MANAGER.begin(client, inodeId); } - static void endFileLease(DFSClient client, String src, long inodeId) { - LEASE_MANAGER.end(client, src, inodeId); + static void endFileLease(DFSClient client, long inodeId) { + LEASE_MANAGER.end(client, inodeId); } static DataChecksum createChecksum(DFSClient client) { @@ -628,8 +511,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return PIPELINE_ACK_STATUS_GETTER.get(ack); } - private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo, - final Promise promise, final int timeoutMs) { + private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, + Promise promise, int timeoutMs) { channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), @@ -693,18 +576,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); int protoLen = proto.getSerializedSize(); - ByteBuf buffer = channel.alloc() - .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); + ByteBuf buffer = + channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); channel.writeAndFlush(buffer); } - private static void initialize(Configuration conf, final Channel channel, - final DatanodeInfo dnInfo, final Enum storageType, - final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client, - Token accessToken, final Promise promise) { + private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo, + Enum storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs, + DFSClient client, Token accessToken, Promise promise) + throws IOException { Promise saslPromise = channel.eventLoop().newPromise(); trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); saslPromise.addListener(new FutureListener() { @@ -722,14 +605,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }); } - private static List> connectToDataNodes(final Configuration conf, - final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd, - long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { + private static List> connectToDataNodes(Configuration conf, DFSClient client, + String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, + BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { Enum[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); - boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, - DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); + boolean connectToDnViaHostname = + conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() @@ -737,7 +620,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); - final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() + OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) @@ -745,11 +628,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); List> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { - final DatanodeInfo dnInfo = datanodeInfos[i]; - // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType - // will cause compilation error for hadoop 2.5 or before. - final Enum storageType = storageTypes[i]; - final Promise promise = eventLoop.newPromise(); + DatanodeInfo dnInfo = datanodeInfos[i]; + Enum storageType = storageTypes[i]; + Promise promise = eventLoop.newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); new Bootstrap().group(eventLoop).channel(NioSocketChannel.class) @@ -799,11 +680,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { ClientProtocol namenode = client.getNamenode(); HdfsFileStatus stat; try { - stat = FILE_CREATER.create(namenode, src, + stat = namenode.create(src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, new EnumSetWritable( overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), - createParent, replication, blockSize); + createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { throw (RemoteException) e; @@ -811,7 +692,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { throw new NameNodeException(e); } } - beginFileLease(client, src, stat.getFileId()); + beginFileLease(client, stat.getFileId()); boolean succ = false; LocatedBlock locatedBlock = null; List> futureList = null; @@ -827,10 +708,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); } - CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client); - FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, - client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop, - datanodeList, summer, ALLOC); + Encryptor encryptor = createEncryptor(conf, stat, client); + FanOutOneBlockAsyncDFSOutput output = + new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, + stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC); succ = true; return output; } finally { @@ -848,7 +729,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }); } } - endFileLease(client, src, stat.getFileId()); + endFileLease(client, stat.getFileId()); fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); } } @@ -859,9 +740,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { * inside {@link EventLoop}. * @param eventLoop all connections to datanode will use the same event loop. */ - public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f, - final boolean overwrite, final boolean createParent, final short replication, - final long blockSize, final EventLoop eventLoop) throws IOException { + public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, + boolean overwrite, boolean createParent, short replication, long blockSize, + EventLoop eventLoop) throws IOException { return new FileSystemLinkResolver() { @Override @@ -890,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { for (int retry = 0;; retry++) { try { if (namenode.complete(src, clientName, block, fileId)) { - endFileLease(client, src, fileId); + endFileLease(client, fileId); return; } else { LOG.warn("complete file " + src + " not finished, retry = " + retry); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index a222e1b..c0121d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -18,12 +18,11 @@ package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.handler.timeout.IdleState.READER_IDLE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.CodedOutputStream; @@ -47,13 +46,13 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Promise; import java.io.IOException; -import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -73,9 +72,17 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.Decryptor; +import org.apache.hadoop.crypto.Encryptor; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hdfs.DFSClient; @@ -83,9 +90,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.security.SaslPropertiesResolver; @@ -110,469 +118,105 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; private static final String NAME_DELIMITER = " "; - @VisibleForTesting - static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites"; - - @VisibleForTesting - static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; - private interface SaslAdaptor { - SaslPropertiesResolver getSaslPropsResolver(DFSClient client); + TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); - TrustedChannelResolver getTrustedChannelResolver(DFSClient client); + SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); - AtomicBoolean getFallbackToSimpleAuth(DFSClient client); - - DataEncryptionKey createDataEncryptionKey(DFSClient client); + AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); } private static final SaslAdaptor SASL_ADAPTOR; - private interface CipherOptionHelper { - - List getCipherOptions(Configuration conf) throws IOException; - - void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder, - List cipherOptions); - - Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy, - SaslClient saslClient) throws IOException; - - Object getCipherSuite(Object cipherOption); - - byte[] getInKey(Object cipherOption); + // helper class for convert protos. + private interface PBHelper { - byte[] getInIv(Object cipherOption); + List convertCipherOptions(List options); - byte[] getOutKey(Object cipherOption); - - byte[] getOutIv(Object cipherOption); + List convertCipherOptionProtos(List options); } - private static final CipherOptionHelper CIPHER_OPTION_HELPER; + private static final PBHelper PB_HELPER; private interface TransparentCryptoHelper { - Object getFileEncryptionInfo(HdfsFileStatus stat); - - CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) + Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) throws IOException; } private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; - static final class CryptoCodec { - - private static final Method CREATE_CODEC; - - private static final Method CREATE_ENCRYPTOR; - - private static final Method CREATE_DECRYPTOR; - - private static final Method INIT_ENCRYPTOR; - - private static final Method INIT_DECRYPTOR; - - private static final Method ENCRYPT; - - private static final Method DECRYPT; - - static { - Class cryptoCodecClass = null; - try { - cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec"); - } catch (ClassNotFoundException e) { - LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e); - } - if (cryptoCodecClass != null) { - Method getInstanceMethod = null; - for (Method method : cryptoCodecClass.getMethods()) { - if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) { - getInstanceMethod = method; - break; - } - } - try { - if (getInstanceMethod == null) { - throw new NoSuchMethodException( - "Can not find suitable getInstance method in CryptoCodec"); - } - CREATE_CODEC = getInstanceMethod; - CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor"); - CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor"); - - Class encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor"); - INIT_ENCRYPTOR = encryptorClass.getMethod("init", byte[].class, byte[].class); - ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class); - - Class decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor"); - INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class); - DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class); - } catch (Exception e) { - final String msg = "Couldn't properly initialize access to HDFS internals. Please " - + "update your WAL Provider to not make use of the 'asyncfs' provider. See " - + "HBASE-16110 for more information."; - LOG.error(msg, e); - throw new Error(msg, e); - } - } else { - CREATE_CODEC = null; - CREATE_ENCRYPTOR = null; - CREATE_DECRYPTOR = null; - INIT_ENCRYPTOR = null; - INIT_DECRYPTOR = null; - ENCRYPT = null; - DECRYPT = null; - } - } - - private final Object encryptor; - - private final Object decryptor; - - public CryptoCodec(Configuration conf, Object cipherOption) { - try { - Object codec = CREATE_CODEC.invoke(null, conf, - CIPHER_OPTION_HELPER.getCipherSuite(cipherOption)); - encryptor = CREATE_ENCRYPTOR.invoke(codec); - byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption); - byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption); - INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); - - decryptor = CREATE_DECRYPTOR.invoke(codec); - byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption); - byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption); - INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) { - try { - Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite); - encryptor = CREATE_ENCRYPTOR.invoke(codec); - INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv); - decryptor = null; - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { - try { - ENCRYPT.invoke(encryptor, inBuffer, outBuffer); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { - try { - DECRYPT.invoke(decryptor, inBuffer, outBuffer); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - } - - private static SaslAdaptor createSaslAdaptor27(Class saslDataTransferClientClass) + private static SaslAdaptor createSaslAdaptor() throws NoSuchFieldException, NoSuchMethodException { - final Field saslPropsResolverField = saslDataTransferClientClass - .getDeclaredField("saslPropsResolver"); + Field saslPropsResolverField = + SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); saslPropsResolverField.setAccessible(true); - final Field trustedChannelResolverField = saslDataTransferClientClass - .getDeclaredField("trustedChannelResolver"); + Field trustedChannelResolverField = + SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); trustedChannelResolverField.setAccessible(true); - final Field fallbackToSimpleAuthField = saslDataTransferClientClass - .getDeclaredField("fallbackToSimpleAuth"); + Field fallbackToSimpleAuthField = + SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); fallbackToSimpleAuthField.setAccessible(true); - final Method getSaslDataTransferClientMethod = DFSClient.class - .getMethod("getSaslDataTransferClient"); - final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); return new SaslAdaptor() { @Override - public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { - try { - return (TrustedChannelResolver) trustedChannelResolverField - .get(getSaslDataTransferClientMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { - try { - return (SaslPropertiesResolver) saslPropsResolverField - .get(getSaslDataTransferClientMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { try { - return (AtomicBoolean) fallbackToSimpleAuthField - .get(getSaslDataTransferClientMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public DataEncryptionKey createDataEncryptionKey(DFSClient client) { - try { - return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { + return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); + } catch (IllegalAccessException e) { throw new RuntimeException(e); } } - }; - } - - private static SaslAdaptor createSaslAdaptor25() - throws NoSuchFieldException, NoSuchMethodException { - final Field trustedChannelResolverField = DFSClient.class - .getDeclaredField("trustedChannelResolver"); - trustedChannelResolverField.setAccessible(true); - final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); - return new SaslAdaptor() { @Override - public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { try { - return (TrustedChannelResolver) trustedChannelResolverField.get(client); + return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } @Override - public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { - return null; - } - - @Override - public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { - return null; - } - - @Override - public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { try { - return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { + return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); + } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }; } - private static SaslAdaptor createSaslAdaptor() - throws NoSuchFieldException, NoSuchMethodException { + private static PBHelper createPBHelper() throws NoSuchMethodException { + Class helperClass; try { - return createSaslAdaptor27( - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient")); - } catch (ClassNotFoundException e) { - LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e); - } - return createSaslAdaptor25(); - } - - private static CipherOptionHelper createCipherHelper25() { - return new CipherOptionHelper() { - - @Override - public byte[] getOutKey(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getOutIv(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getInKey(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getInIv(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public Object getCipherSuite(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public List getCipherOptions(Configuration conf) { - return null; - } - - @Override - public Object getCipherOption(DataTransferEncryptorMessageProto proto, - boolean isNegotiatedQopPrivacy, SaslClient saslClient) { - return null; - } - - @Override - public void addCipherOptions(Builder builder, List cipherOptions) { - throw new UnsupportedOperationException(); - } - }; - } - - private static CipherOptionHelper createCipherHelper27(Class cipherOptionClass) - throws ClassNotFoundException, NoSuchMethodException { - @SuppressWarnings("rawtypes") - Class cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite") - .asSubclass(Enum.class); - @SuppressWarnings("unchecked") - final Enum aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); - final Constructor cipherOptionConstructor = cipherOptionClass - .getConstructor(cipherSuiteClass); - final Constructor cipherOptionWithKeyAndIvConstructor = cipherOptionClass - .getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class); - - final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); - final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); - final Method getInIvMethod = cipherOptionClass.getMethod("getInIv"); - final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); - final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - - Class pbHelperClass; - try { - pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); + helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); } catch (ClassNotFoundException e) { LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); - pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; + helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; } - final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions", - List.class); - final Method convertCipherOptionProtosMethod = pbHelperClass - .getMethod("convertCipherOptionProtos", List.class); - final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class - .getMethod("addAllCipherOption", Iterable.class); - final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class - .getMethod("getCipherOptionList"); - return new CipherOptionHelper() { - - @Override - public byte[] getOutKey(Object cipherOption) { - try { - return (byte[]) getOutKeyMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getOutIv(Object cipherOption) { - try { - return (byte[]) getOutIvMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getInKey(Object cipherOption) { - try { - return (byte[]) getInKeyMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getInIv(Object cipherOption) { - try { - return (byte[]) getInIvMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public Object getCipherSuite(Object cipherOption) { - try { - return getCipherSuiteMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public List getCipherOptions(Configuration conf) throws IOException { - // Negotiate cipher suites if configured. Currently, the only supported - // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple - // values for future expansion. - String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); - if (cipherSuites == null || cipherSuites.isEmpty()) { - return null; - } - if (!cipherSuites.equals(AES_CTR_NOPADDING)) { - throw new IOException(String.format("Invalid cipher suite, %s=%s", - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); - } - Object option; - try { - option = cipherOptionConstructor.newInstance(aesCipherSuite); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - List cipherOptions = Lists.newArrayListWithCapacity(1); - cipherOptions.add(option); - return cipherOptions; - } - - private Object unwrap(Object option, SaslClient saslClient) throws IOException { - byte[] inKey = getInKey(option); - if (inKey != null) { - inKey = saslClient.unwrap(inKey, 0, inKey.length); - } - byte[] outKey = getOutKey(option); - if (outKey != null) { - outKey = saslClient.unwrap(outKey, 0, outKey.length); - } - try { - return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey, - getInIv(option), outKey, getOutIv(option)); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } + Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class); + Method convertCipherOptionProtosMethod = + helperClass.getMethod("convertCipherOptionProtos", List.class); + return new PBHelper() { @SuppressWarnings("unchecked") @Override - public Object getCipherOption(DataTransferEncryptorMessageProto proto, - boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { - List cipherOptions; + public List convertCipherOptions(List options) { try { - cipherOptions = (List) convertCipherOptionProtosMethod.invoke(null, - getCipherOptionListMethod.invoke(proto)); + return (List) convertCipherOptionsMethod.invoke(null, options); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } - if (cipherOptions == null || cipherOptions.isEmpty()) { - return null; - } - Object cipherOption = cipherOptions.get(0); - return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; } + @SuppressWarnings("unchecked") @Override - public void addCipherOptions(Builder builder, List cipherOptions) { + public List convertCipherOptionProtos(List options) { try { - addAllCipherOptionMethod.invoke(builder, - convertCipherOptionsMethod.invoke(null, cipherOptions)); + return (List) convertCipherOptionProtosMethod.invoke(null, options); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -580,65 +224,28 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherOptionHelper createCipherHelper() - throws ClassNotFoundException, NoSuchMethodException { - Class cipherOptionClass; - try { - cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); - } catch (ClassNotFoundException e) { - LOG.debug("No CipherOption class found, should be hadoop 2.5-", e); - return createCipherHelper25(); - } - return createCipherHelper27(cipherOptionClass); - } - - private static TransparentCryptoHelper createTransparentCryptoHelper25() { - return new TransparentCryptoHelper() { - - @Override - public Object getFileEncryptionInfo(HdfsFileStatus stat) { - return null; - } - - @Override - public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) { - throw new UnsupportedOperationException(); - } - }; - } - - private static TransparentCryptoHelper createTransparentCryptoHelper27(Class feInfoClass) - throws NoSuchMethodException, ClassNotFoundException { - final Method getFileEncryptionInfoMethod = HdfsFileStatus.class - .getMethod("getFileEncryptionInfo"); - final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class - .getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass); + private static TransparentCryptoHelper createTransparentCryptoHelper() + throws NoSuchMethodException { + Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class + .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); - final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite"); - Class keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion"); - final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial"); - final Method getIVMethod = feInfoClass.getMethod("getIV"); return new TransparentCryptoHelper() { @Override - public Object getFileEncryptionInfo(HdfsFileStatus stat) { - try { - return getFileEncryptionInfoMethod.invoke(stat); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) - throws IOException { + public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, + DFSClient client) throws IOException { try { - Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); - return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo), - (byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo)); + KeyVersion decryptedKey = + (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); + CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); + Encryptor encryptor = cryptoCodec.createEncryptor(); + encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); + return encryptor; } catch (InvocationTargetException e) { Throwables.propagateIfPossible(e.getTargetException(), IOException.class); throw new RuntimeException(e.getTargetException()); + } catch (GeneralSecurityException e) { + throw new IOException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -646,25 +253,13 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static TransparentCryptoHelper createTransparentCryptoHelper() - throws NoSuchMethodException, ClassNotFoundException { - Class feInfoClass; - try { - feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo"); - } catch (ClassNotFoundException e) { - LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e); - return createTransparentCryptoHelper25(); - } - return createTransparentCryptoHelper27(feInfoClass); - } - static { try { SASL_ADAPTOR = createSaslAdaptor(); - CIPHER_OPTION_HELPER = createCipherHelper(); + PB_HELPER = createPBHelper(); TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); } catch (Exception e) { - final String msg = "Couldn't properly initialize access to HDFS internals. Please " + String msg = "Couldn't properly initialize access to HDFS internals. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + "HBASE-16110 for more information."; LOG.error(msg, e); @@ -748,16 +343,31 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { sendSaslMessage(ctx, payload, null); } - private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List options) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto - .newBuilder(); + private List getCipherOptions() throws IOException { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (StringUtils.isBlank(cipherSuites)) { + return null; + } + if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + return Arrays.asList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, + List options) throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { builder.setPayload(ByteStringer.wrap(payload)); } if (options != null) { - CIPHER_OPTION_HELPER.addCipherOptions(builder, options); + builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options)); } DataTransferEncryptorMessageProto proto = builder.build(); int size = proto.getSerializedSize(); @@ -798,8 +408,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } private boolean requestedQopContainsPrivacy() { - Set requestedQop = ImmutableSet - .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); return requestedQop.contains("auth-conf"); } @@ -807,8 +417,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { if (!saslClient.isComplete()) { throw new IOException("Failed to complete SASL handshake"); } - Set requestedQop = ImmutableSet - .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); String negotiatedQop = getNegotiatedQop(); LOG.debug( "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); @@ -825,48 +435,73 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { return qop != null && !"auth".equalsIgnoreCase(qop); } + private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = saslClient.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = saslClient.unwrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, + option.getOutIv()); + } + + private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, + boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { + List cipherOptions = + PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList()); + if (cipherOptions == null || cipherOptions.isEmpty()) { + return null; + } + CipherOption cipherOption = cipherOptions.get(0); + return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; + } + @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DataTransferEncryptorMessageProto) { DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; check(proto); byte[] challenge = proto.getPayload().toByteArray(); byte[] response = saslClient.evaluateChallenge(challenge); switch (step) { - case 1: { - List cipherOptions = null; - if (requestedQopContainsPrivacy()) { - cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf); - } - sendSaslMessage(ctx, response, cipherOptions); - ctx.flush(); - step++; - break; - } - case 2: { - assert response == null; - checkSaslComplete(); - Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto, - isNegotiatedQopPrivacy(), saslClient); - ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); + case 1: { + List cipherOptions = null; + if (requestedQopContainsPrivacy()) { + cipherOptions = getCipherOptions(); + } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; } - if (cipherOption != null) { - CryptoCodec codec = new CryptoCodec(conf, cipherOption); - p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); - } else { - if (useWrap()) { - p.addLast(new SaslWrapHandler(saslClient), - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), - new SaslUnwrapHandler(saslClient)); + case 2: { + assert response == null; + checkSaslComplete(); + CipherOption cipherOption = + getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); + p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), + new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), + new SaslUnwrapHandler(saslClient)); + } } + promise.trySuccess(null); + break; } - promise.trySuccess(null); - break; - } - default: - throw new IllegalArgumentException("Unrecognized negotiation step: " + step); + default: + throw new IllegalArgumentException("Unrecognized negotiation step: " + step); } } else { ctx.fireChannelRead(msg); @@ -961,10 +596,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final class DecryptHandler extends SimpleChannelInboundHandler { - private final CryptoCodec codec; + private final Decryptor decryptor; - public DecryptHandler(CryptoCodec codec) { - this.codec = codec; + public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) + throws GeneralSecurityException, IOException { + this.decryptor = codec.createDecryptor(); + this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); } @Override @@ -981,7 +618,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { ByteBuffer inBuffer = inBuf.nioBuffer(); ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); - codec.decrypt(inBuffer, outBuffer); + decryptor.decrypt(inBuffer, outBuffer); outBuf.writerIndex(inBuf.readableBytes()); if (release) { inBuf.release(); @@ -992,11 +629,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final class EncryptHandler extends MessageToByteEncoder { - private final CryptoCodec codec; + private final Encryptor encryptor; - public EncryptHandler(CryptoCodec codec) { - super(false); - this.codec = codec; + public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) + throws GeneralSecurityException, IOException { + this.encryptor = codec.createEncryptor(); + this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); } @Override @@ -1022,7 +660,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } ByteBuffer inBuffer = inBuf.nioBuffer(); ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); - codec.encrypt(inBuffer, outBuffer); + encryptor.encrypt(inBuffer, outBuffer); out.writerIndex(inBuf.readableBytes()); if (release) { inBuf.release(); @@ -1070,22 +708,18 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, int timeoutMs, DFSClient client, Token accessToken, - Promise saslPromise) { - SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client); - TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client); - AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client); + Promise saslPromise) throws IOException { + SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); + SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); + TrustedChannelResolver trustedChannelResolver = + SASL_ADAPTOR.getTrustedChannelResolver(saslClient); + AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { saslPromise.trySuccess(null); return; } - DataEncryptionKey encryptionKey; - try { - encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client); - } catch (Exception e) { - saslPromise.tryFailure(e); - return; - } + DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); if (encryptionKey != null) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -1131,12 +765,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } - static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client) + static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) throws IOException { - Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat); + FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); if (feInfo == null) { return null; } - return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client); + return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index d5bccf0..279a6ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -29,7 +29,6 @@ import io.netty.util.concurrent.ScheduledFuture; import java.io.IOException; import java.io.InterruptedIOException; -import java.nio.channels.CompletionHandler; import java.util.ArrayDeque; import java.util.Comparator; import java.util.Deque; @@ -206,9 +205,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private final long logRollerExitedCheckIntervalMs; - private final ExecutorService closeExecutor = Executors - .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Close-WAL-Writer-%d").build()); + private final ExecutorService closeExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); private volatile AsyncFSOutput fsOut; @@ -216,8 +214,8 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Deque unackedEntries = new ArrayDeque(); - private final PriorityQueue syncFutures = new PriorityQueue(11, - SEQ_COMPARATOR); + private final PriorityQueue syncFutures = + new PriorityQueue(11, SEQ_COMPARATOR); private Promise rollPromise; @@ -285,8 +283,8 @@ public class AsyncFSWAL extends AbstractFSWAL { public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, - String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException, - IOException { + String prefix, String suffix, EventLoop eventLoop) + throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoop = eventLoop; int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); @@ -294,9 +292,8 @@ public class AsyncFSWAL extends AbstractFSWAL { batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); - logRollerExitedCheckIntervalMs = - conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, - DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); + logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, + DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); rollWriter(); } @@ -310,82 +307,85 @@ public class AsyncFSWAL extends AbstractFSWAL { } } + private void syncFailed(Throwable error) { + LOG.warn("sync failed", error); + // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty. + // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It + // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener + // directly if it is already in the EventLoop thread. And in the listener method, it will + // call us. So here we know that all failed flush request will call us continuously, and + // before the last one finish, no other task can be executed in EventLoop. So here we are + // safe to use writerBroken as a guard. + // Do not forget to revisit this if we change the implementation of + // FanOutOneBlockAsyncDFSOutput! + synchronized (waitingConsumePayloads) { + if (writerBroken) { + return; + } + // schedule a periodical task to check if log roller is exited. Otherwise the the sync + // request maybe blocked forever since we are still waiting for a new writer to write the + // pending data and sync it... + logRollerExitedChecker = new LogRollerExitedChecker(); + // we are currently in the EventLoop thread, so it is safe to set the future after + // schedule it since the task can not be executed before we release the thread. + logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker, + logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS)); + writerBroken = true; + } + for (Iterator iter = unackedEntries.descendingIterator(); iter.hasNext();) { + waitingAppendEntries.addFirst(iter.next()); + } + highestUnsyncedTxid = highestSyncedTxid.get(); + if (rollPromise != null) { + rollPromise.trySuccess(null); + rollPromise = null; + return; + } + // request a roll. + if (!rollWriterLock.tryLock()) { + return; + } + try { + requestLogRoll(); + } finally { + rollWriterLock.unlock(); + } + } + + private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { + highestSyncedTxid.set(processedTxid); + int syncCount = finishSync(true); + for (Iterator iter = unackedEntries.iterator(); iter.hasNext();) { + if (iter.next().getTxid() <= processedTxid) { + iter.remove(); + } else { + break; + } + } + postSync(System.nanoTime() - startTimeNs, syncCount); + tryFinishRoll(); + if (!rollWriterLock.tryLock()) { + return; + } + try { + if (writer.getLength() >= logrollsize) { + requestLogRoll(); + } + } finally { + rollWriterLock.unlock(); + } + } + private void sync(final AsyncWriter writer, final long processedTxid) { fileLengthAtLastSync = writer.getLength(); final long startTimeNs = System.nanoTime(); - writer.sync(new CompletionHandler() { - - @Override - public void completed(Long result, Void attachment) { - highestSyncedTxid.set(processedTxid); - int syncCount = finishSync(true); - for (Iterator iter = unackedEntries.iterator(); iter.hasNext();) { - if (iter.next().getTxid() <= processedTxid) { - iter.remove(); - } else { - break; - } - } - postSync(System.nanoTime() - startTimeNs, syncCount); - tryFinishRoll(); - if (!rollWriterLock.tryLock()) { - return; - } - try { - if (writer.getLength() >= logrollsize) { - requestLogRoll(); - } - } finally { - rollWriterLock.unlock(); - } - } - - @Override - public void failed(Throwable exc, Void attachment) { - LOG.warn("sync failed", exc); - // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty. - // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It - // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener - // directly if it is already in the EventLoop thread. And in the listener method, it will - // call us. So here we know that all failed flush request will call us continuously, and - // before the last one finish, no other task can be executed in EventLoop. So here we are - // safe to use writerBroken as a guard. - // Do not forget to revisit this if we change the implementation of - // FanOutOneBlockAsyncDFSOutput! - synchronized (waitingConsumePayloads) { - if (writerBroken) { - return; - } - // schedule a periodical task to check if log roller is exited. Otherwise the the sync - // request maybe blocked forever since we are still waiting for a new writer to write the - // pending data and sync it... - logRollerExitedChecker = new LogRollerExitedChecker(); - // we are currently in the EventLoop thread, so it is safe to set the future after - // schedule it since the task can not be executed before we release the thread. - logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker, - logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS)); - writerBroken = true; - } - for (Iterator iter = unackedEntries.descendingIterator(); iter.hasNext();) { - waitingAppendEntries.addFirst(iter.next()); - } - highestUnsyncedTxid = highestSyncedTxid.get(); - if (rollPromise != null) { - rollPromise.trySuccess(null); - rollPromise = null; - return; - } - // request a roll. - if (!rollWriterLock.tryLock()) { - return; - } - try { - requestLogRoll(); - } finally { - rollWriterLock.unlock(); - } + writer.sync().whenComplete((result, error) -> { + if (error != null) { + syncFailed(error); + } else { + syncCompleted(writer, processedTxid, startTimeNs); } - }, null); + }); } private void addTimeAnnotation(SyncFuture future, String annotation) { @@ -457,13 +457,9 @@ public class AsyncFSWAL extends AbstractFSWAL { } } - private static final Comparator SEQ_COMPARATOR = new Comparator() { - - @Override - public int compare(SyncFuture o1, SyncFuture o2) { - int c = Long.compare(o1.getTxid(), o2.getTxid()); - return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); - } + private static final Comparator SEQ_COMPARATOR = (o1, o2) -> { + int c = Long.compare(o1.getTxid(), o2.getTxid()); + return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); }; private final Runnable consumer = new Runnable() { @@ -690,15 +686,11 @@ public class AsyncFSWAL extends AbstractFSWAL { long oldFileLen; if (oldWriter != null) { oldFileLen = oldWriter.getLength(); - closeExecutor.execute(new Runnable() { - - @Override - public void run() { - try { - oldWriter.close(); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } + closeExecutor.execute(() -> { + try { + oldWriter.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); } }); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index db3088c..314bef0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,50 +52,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); - private static final class BlockingCompletionHandler implements CompletionHandler { - - private long size; - - private Throwable error; - - private boolean finished; - - @Override - public void completed(Long result, Void attachment) { - synchronized (this) { - size = result.longValue(); - finished = true; - notifyAll(); - } - } - - @Override - public void failed(Throwable exc, Void attachment) { - synchronized (this) { - error = exc; - finished = true; - notifyAll(); - } - } - - public long get() throws IOException { - synchronized (this) { - while (!finished) { - try { - wait(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - if (error != null) { - Throwables.propagateIfPossible(error, IOException.class); - throw new RuntimeException(error); - } - return size; - } - } - } - private final EventLoop eventLoop; private AsyncFSOutput output; @@ -166,8 +124,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter } @Override - public void sync(CompletionHandler handler, A attachment) { - output.flush(attachment, handler, false); + public CompletableFuture sync() { + return output.flush(false); } @Override @@ -197,10 +155,24 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter this.asyncOutputWrapper = new OutputStreamWrapper(output); } + private long write(Consumer> action) throws IOException { + CompletableFuture future = new CompletableFuture(); + eventLoop.execute(() -> action.accept(future)); + try { + return future.get().longValue(); + } catch (InterruptedException e) { + InterruptedIOException ioe = new InterruptedIOException(); + ioe.initCause(e); + throw ioe; + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new RuntimeException(e.getCause()); + } + } + @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { - final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(() -> { + return write(future -> { output.write(magic); try { header.writeDelimitedTo(asyncOutputWrapper); @@ -208,16 +180,19 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter // should not happen throw new AssertionError(e); } - output.flush(null, handler, false); + output.flush(false).whenComplete((len, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(len); + } + }); }); - return handler.get(); } @Override - protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic) - throws IOException { - final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(() -> { + protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { + return write(future -> { try { trailer.writeTo(asyncOutputWrapper); } catch (IOException e) { @@ -226,9 +201,14 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter } output.writeInt(trailer.getSerializedSize()); output.write(magic); - output.flush(null, handler, false); + output.flush(false).whenComplete((len, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(len); + } + }); }); - return handler.get(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 1a5b140..328f1b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; -import java.nio.channels.CompletionHandler; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -85,7 +85,7 @@ public interface WALProvider { } interface AsyncWriter extends Closeable { - void sync(CompletionHandler handler, A attachment); + CompletableFuture sync(); void append(WAL.Entry entry); long getLength(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java deleted file mode 100644 index 58b5301..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.asyncfs; - -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; - -public final class FanOutOneBlockAsyncDFSOutputFlushHandler - implements CompletionHandler { - - private long size; - - private Throwable error; - - private boolean finished; - - @Override - public synchronized void completed(Long result, Void attachment) { - size = result.longValue(); - finished = true; - notifyAll(); - } - - @Override - public synchronized void failed(Throwable exc, Void attachment) { - error = exc; - finished = true; - notifyAll(); - } - - public synchronized long get() throws InterruptedException, ExecutionException { - while (!finished) { - wait(); - } - if (error != null) { - throw new ExecutionException(error); - } - return size; - } - - public void reset() { - size = 0L; - error = null; - finished = false; - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index a6d3177..7897472 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -107,17 +107,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - out.write(b, 0, b.length); - out.flush(null, handler, false); - } - }); - assertEquals(b.length, handler.get()); + out.write(b, 0, b.length); + assertEquals(b.length, out.flush(false).get().longValue()); out.close(); assertEquals(b.length, dfs.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; @@ -144,31 +135,14 @@ public class TestFanOutOneBlockAsyncDFSOutput { true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - out.write(b, 0, b.length); - out.flush(null, handler, false); - } - }); - handler.get(); + out.write(b, 0, b.length); + out.flush(false).get(); // restart one datanode which causes one connection broken TEST_UTIL.getDFSCluster().restartDataNode(0); try { - handler.reset(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - out.write(b, 0, b.length); - out.flush(null, handler, false); - } - }); + out.write(b, 0, b.length); try { - handler.get(); + out.flush(false).get(); fail("flush should fail"); } catch (ExecutionException e) { // we restarted one datanode so the flush should fail @@ -254,17 +228,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { true, false, (short) 3, 1024 * 1024 * 1024, eventLoop); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); - FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - out.write(b); - out.flush(null, handler, false); - } - }); - assertEquals(b.length, handler.get()); + out.write(b); + out.flush(false); + assertEquals(b.length, out.flush(false).get().longValue()); out.close(); assertEquals(b.length, FS.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 04cb0ef..6bd2d3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -60,10 +60,8 @@ public class TestLocalAsyncOutput { fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next()); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); out.write(b); - out.flush(null, handler, true); - assertEquals(b.length, handler.get()); + assertEquals(b.length, out.flush(true).get().longValue()); out.close(); assertEquals(b.length, fs.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 89c7996..e05d869 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -17,11 +17,19 @@ */ package org.apache.hadoop.hbase.io.asyncfs; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.AES_CTR_NOPADDING; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -38,6 +46,9 @@ import java.util.concurrent.ExecutionException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; @@ -45,7 +56,6 @@ import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.minikdc.MiniKdc; @@ -113,7 +123,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { List params = new ArrayList<>(); for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { - for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) { + for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { for (boolean useTransparentEncryption : Arrays.asList(false, true)) { params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite, useTransparentEncryption }); @@ -125,17 +135,15 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception { - // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1 - conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); - conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); - conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, - HTTP_PRINCIPAL + "@" + KDC.getRealm()); - conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); - conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); - conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm()); + conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath()); keystoresDir.mkdirs(); @@ -146,32 +154,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } private static void setUpKeyProvider(Configuration conf) throws Exception { - Class keyProviderFactoryClass; - try { - keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory"); - } catch (ClassNotFoundException e) { - // should be hadoop 2.5-, give up - TEST_TRANSPARENT_ENCRYPTION = false; - return; - } - URI keyProviderUri = new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); - Method getKeyProviderMethod = - keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class); - Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf); - Class keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider"); - Class keyProviderOptionsClass = - Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options"); - Method createKeyMethod = - keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass); - Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf); - createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options); - Method flushMethod = keyProviderClass.getMethod("flush"); - flushMethod.invoke(keyProvider); - Method closeMethod = keyProviderClass.getMethod("close"); - closeMethod.invoke(keyProvider); + KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); + keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); + keyProvider.flush(); + keyProvider.close(); } @BeforeClass @@ -231,7 +220,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); } - TEST_UTIL.startMiniDFSCluster(3); + TEST_UTIL.startMiniDFSCluster(1); FS = TEST_UTIL.getDFSCluster().getFileSystem(); testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); FS.mkdirs(testDirOnTestFs); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index b64d458..72fc4b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -17,27 +17,26 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.common.base.Throwables; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import com.google.common.base.Throwables; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; - @Category({ RegionServerTests.class, MediumTests.class }) public class TestAsyncProtobufLog extends AbstractTestProtobufLog { @@ -68,14 +67,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog