From fce612e9c190dc0ca059381775093cdcaf5a80f8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 21 Mar 2016 15:55:22 +0800 Subject: [PATCH] HBASE-15407 Add SASL support for fan out OutputStream --- .../util/FanOutOneBlockAsyncDFSOutputHelper.java | 134 ++++- .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 579 +++++++++++++++++++++ .../util/TestFanOutOneBlockAsyncDFSOutput.java | 34 +- .../util/TestSaslFanOutOneBlockAsyncDFSOutput.java | 191 +++++++ 4 files changed, 902 insertions(+), 36 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java index d34bbb0..7b07578 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -20,18 +20,21 @@ package org.apache.hadoop.hbase.util; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +60,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -68,11 +73,14 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import com.google.common.base.Throwables; @@ -87,6 +95,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -99,6 +108,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; /** @@ -165,27 +175,73 @@ public class FanOutOneBlockAsyncDFSOutputHelper { // This is used to terminate a recoverFileLease call when FileSystem is already closed. // isClientRunning is not public so we need to use reflection. private interface DFSClientAdaptor { + boolean isClientRunning(DFSClient client); + + SaslPropertiesResolver getSaslPropsResolver(DFSClient client); + + TrustedChannelResolver getTrustedChannelResolver(DFSClient client); + + AtomicBoolean getFallbackToSimpleAuth(DFSClient client); } private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; private static DFSClientAdaptor createDFSClientAdaptor() { try { - final Method method = DFSClient.class.getDeclaredMethod("isClientRunning"); - method.setAccessible(true); + final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); + isClientRunningMethod.setAccessible(true); + final Field saslPropsResolverField = SaslDataTransferClient.class + .getDeclaredField("saslPropsResolver"); + saslPropsResolverField.setAccessible(true); + final Field trustedChannelResolverField = SaslDataTransferClient.class + .getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Field fallbackToSimpleAuthField = SaslDataTransferClient.class + .getDeclaredField("fallbackToSimpleAuth"); + fallbackToSimpleAuthField.setAccessible(true); return new DFSClientAdaptor() { @Override public boolean isClientRunning(DFSClient client) { try { - return (Boolean) method.invoke(client); + return (Boolean) isClientRunningMethod.invoke(client); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + try { + return (SaslPropertiesResolver) saslPropsResolverField + .get(client.getSaslDataTransferClient()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField + .get(client.getSaslDataTransferClient()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + try { + return (AtomicBoolean) fallbackToSimpleAuthField + .get(client.getSaslDataTransferClient()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } }; - } catch (NoSuchMethodException e) { + } catch (NoSuchMethodException | NoSuchFieldException e) { throw new Error(e); } } @@ -480,8 +536,12 @@ public class FanOutOneBlockAsyncDFSOutputHelper { } // success ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); + for (ChannelHandler handler; (handler = p.removeLast()) != null;) { + // do not remove all handlers because we may have wrap or unwrap handlers at the header + // of pipeline. + if (handler instanceof IdleStateHandler) { + break; + } } // Disable auto read here. Enable it after we setup the streaming pipeline in // FanOutOneBLockAsyncDFSOutput. @@ -514,7 +574,6 @@ public class FanOutOneBlockAsyncDFSOutputHelper { private static void requestWriteBlock(Channel channel, Enum storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { - // TODO: SASL negotiation. should be done using a netty Handler. OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); int protoLen = proto.getSerializedSize(); ByteBuf buffer = channel.alloc() @@ -525,9 +584,35 @@ public class FanOutOneBlockAsyncDFSOutputHelper { channel.writeAndFlush(buffer); } - private static List> connectToDataNodes(Configuration conf, String clientName, - LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, - DataChecksum summer, EventLoop eventLoop) { + private static void initialize(Configuration conf, final Channel channel, + final DatanodeInfo dnInfo, final Enum storageType, + final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client, + Token accessToken, final Promise promise) { + Promise saslPromise = channel.eventLoop().newPromise(); + SaslPropertiesResolver saslPropsResolver = DFS_CLIENT_ADAPTOR.getSaslPropsResolver(client); + TrustedChannelResolver trustedChannelResolver = DFS_CLIENT_ADAPTOR + .getTrustedChannelResolver(client); + AtomicBoolean fallbackToSimpleAuth = DFS_CLIENT_ADAPTOR.getFallbackToSimpleAuth(client); + trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, saslPropsResolver, + trustedChannelResolver, fallbackToSimpleAuth, accessToken, saslPromise); + saslPromise.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + // setup response processing pipeline first, then send request. + processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); + requestWriteBlock(channel, storageType, writeBlockProtoBuilder); + } else { + promise.tryFailure(future.cause()); + } + } + }); + } + + private static List> connectToDataNodes(final Configuration conf, + final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd, + long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { Enum[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, @@ -561,14 +646,17 @@ public class FanOutOneBlockAsyncDFSOutputHelper { @Override protected void initChannel(Channel ch) throws Exception { - processWriteBlockResponse(ch, dnInfo, promise, timeoutMs); + // we need to get the remote address of the channel so we can only move on after + // channel connected. Leave an empty implementation here because netty does not allow + // a null handler. } }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - requestWriteBlock(future.channel(), storageType, writeBlockProtoBuilder); + initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, + timeoutMs, client, locatedBlock.getBlockToken(), promise); } else { promise.tryFailure(future.cause()); } @@ -594,13 +682,15 @@ public class FanOutOneBlockAsyncDFSOutputHelper { beginFileLease(client, src, stat.getFileId()); boolean succ = false; LocatedBlock locatedBlock = null; - List datanodeList = new ArrayList<>(); + List> futureList = null; try { DataChecksum summer = createChecksum(client); locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); - for (Future future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoop)) { + List datanodeList = new ArrayList<>(); + futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, + PIPELINE_SETUP_CREATE, summer, eventLoop); + for (Future future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); @@ -610,8 +700,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); } finally { if (!succ) { - for (Channel c : datanodeList) { - c.close(); + if (futureList != null) { + for (Future f : futureList) { + f.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + future.getNow().close(); + } + } + }); + } } endFileLease(client, src, stat.getFileId()); fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java new file mode 100644 index 0000000..8b5830d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -0,0 +1,579 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.NAME_DELIMITER; +import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.requestedQopContainsPrivacy; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.Decryptor; +import org.apache.hadoop.crypto.Encryptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Promise; + +/** + * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. + */ +@InterfaceAudience.Private +public class FanOutOneBlockAsyncDFSOutputSaslHelper { + + private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class); + + private static final String SERVER_NAME = "0"; + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + + /** + * Sets user name and password when asked by the client-side SASL object. + */ + private static final class SaslClientCallbackHandler implements CallbackHandler { + + private final char[] password; + private final String userName; + + /** + * Creates a new SaslClientCallbackHandler. + * @param userName SASL user name + * @Param password SASL password + */ + public SaslClientCallbackHandler(String userName, char[] password) { + this.password = password; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(password); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + } + + private static final class SaslNegotiateHandler extends ChannelDuplexHandler { + + private final Configuration conf; + + private final Map saslProps; + + private final SaslClient saslClient; + + private final int timeoutMs; + + private final Promise promise; + + private int step = 0; + + public SaslNegotiateHandler(Configuration conf, String username, char[] password, + Map saslProps, int timeoutMs, Promise promise) throws SaslException { + this.conf = conf; + this.saslProps = saslProps; + this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, + SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); + this.timeoutMs = timeoutMs; + this.promise = promise; + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { + sendSaslMessage(ctx, payload, null); + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, + List options) throws IOException { + DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto + .newBuilder(); + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (options != null) { + builder.addAllCipherOption(PBHelper.convertCipherOptions(options)); + } + DataTransferEncryptorMessageProto proto = builder.build(); + int size = proto.getSerializedSize(); + size += CodedOutputStream.computeRawVarint32Size(size); + ByteBuf buf = ctx.alloc().buffer(size); + proto.writeDelimitedTo(new ByteBufOutputStream(buf)); + ctx.write(buf); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); + sendSaslMessage(ctx, new byte[0]); + ctx.flush(); + step++; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + private void check(DataTransferEncryptorMessageProto proto) throws IOException { + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } + } + + private String getNegotiatedQop() { + return (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } + + private boolean isNegotiatedQopPrivacy() { + String qop = getNegotiatedQop(); + return qop != null && "auth-conf".equalsIgnoreCase(qop); + } + + private void checkSaslComplete() throws IOException { + if (!saslClient.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + Set requestedQop = ImmutableSet + .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + String negotiatedQop = getNegotiatedQop(); + LOG.debug( + "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); + if (!requestedQop.contains(negotiatedQop)) { + throw new IOException(String.format("SASL handshake completed, but " + + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", + requestedQop, negotiatedQop)); + } + } + + private CipherOption unwrap(CipherOption option) throws IOException { + if (option != null) { + byte[] inKey = option.getInKey(); + if (inKey != null) { + inKey = saslClient.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = option.getOutKey(); + if (outKey != null) { + outKey = saslClient.unwrap(outKey, 0, outKey.length); + } + return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, + option.getOutIv()); + } + return null; + } + + private boolean useWrap() { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + return qop != null && !"auth".equalsIgnoreCase(qop); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof DataTransferEncryptorMessageProto) { + DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; + check(proto); + byte[] challenge = proto.getPayload().toByteArray(); + byte[] response = saslClient.evaluateChallenge(challenge); + switch (step) { + case 1: { + List cipherOptions = null; + if (requestedQopContainsPrivacy(saslProps)) { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (cipherSuites != null && !cipherSuites.isEmpty()) { + if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING); + cipherOptions = Lists.newArrayListWithCapacity(1); + cipherOptions.add(option); + } + } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; + } + case 2: { + assert response == null; + checkSaslComplete(); + List cipherOptions = PBHelper + .convertCipherOptionProtos(proto.getCipherOptionList()); + CipherOption cipherOption = (cipherOptions != null && !cipherOptions.isEmpty()) + ? cipherOptions.get(0) : null; + if (isNegotiatedQopPrivacy()) { + cipherOption = unwrap(cipherOption); + } + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); + p.addLast(new EncryptHandler(codec, cipherOption), + new DecryptHandler(codec, cipherOption)); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), + new SaslUnwrapHandler(saslClient)); + } + } + promise.trySuccess(null); + break; + } + default: + throw new IllegalArgumentException(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.tryFailure(cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent + && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { + promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); + } else { + super.userEventTriggered(ctx, evt); + } + } + } + + private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler { + + private final SaslClient saslClient; + + public SaslUnwrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + msg.skipBytes(4); + byte[] b = new byte[msg.readableBytes()]; + msg.readBytes(b); + ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); + } + } + + private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { + + private final SaslClient saslClient; + + private CompositeByteBuf cBuf; + + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + cBuf.addComponent(buf); + cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); + } else { + ctx.write(msg); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (cBuf.isReadable()) { + byte[] b = new byte[cBuf.readableBytes()]; + cBuf.readBytes(b); + cBuf.discardReadComponents(); + byte[] wrapped = saslClient.wrap(b, 0, b.length); + ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); + buf.writeInt(wrapped.length); + buf.writeBytes(wrapped); + ctx.write(buf); + } + ctx.flush(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + cBuf.release(); + cBuf = null; + } + } + + private static final class DecryptHandler extends SimpleChannelInboundHandler { + + private final Decryptor decryptor; + + public DecryptHandler(CryptoCodec codec, CipherOption cipherOption) + throws GeneralSecurityException, IOException { + byte[] key = cipherOption.getOutKey(); + byte[] iv = Arrays.copyOf(cipherOption.getOutIv(), cipherOption.getOutIv().length); + decryptor = codec.createDecryptor(); + decryptor.init(key, iv); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); + ByteBuffer outBuffer = outBuf.nioBuffer(); + decryptor.decrypt(inBuffer, outBuffer); + outBuf.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + ctx.fireChannelRead(outBuf); + } + } + + private static final class EncryptHandler extends MessageToByteEncoder { + + private final Encryptor encryptor; + + public EncryptHandler(CryptoCodec codec, CipherOption cipherOption) + throws GeneralSecurityException, IOException { + super(false); + byte[] key = cipherOption.getInKey(); + byte[] iv = Arrays.copyOf(cipherOption.getInIv(), cipherOption.getInIv().length); + encryptor = codec.createEncryptor(); + encryptor.init(key, iv); + } + + @Override + protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) + throws Exception { + if (preferDirect) { + return ctx.alloc().directBuffer(msg.readableBytes()); + } else { + return ctx.alloc().buffer(msg.readableBytes()); + } + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuffer outBuffer = out.nioBuffer(); + encryptor.encrypt(inBuffer, outBuffer); + out.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + } + } + + private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + } + + private static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); + } + + private static String buildUsername(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); + } + + private static char[] buildClientPassword(Token blockToken) { + return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) + .toCharArray(); + } + + private static Map createSaslPropertiesForEncryption(String encryptionAlgorithm) { + Map saslProps = Maps.newHashMapWithExpectedSize(3); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + return saslProps; + } + + private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, + String username, char[] password, Map saslProps, Promise saslPromise) { + try { + channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), + new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); + } catch (SaslException e) { + saslPromise.tryFailure(e); + } + } + + static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, + int timeoutMs, DFSClient client, SaslPropertiesResolver saslPropsResolver, + TrustedChannelResolver trustedChannelResolver, AtomicBoolean fallbackToSimpleAuth, + Token accessToken, Promise saslPromise) { + InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); + if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { + saslPromise.trySuccess(null); + return; + } + DataEncryptionKey encryptionKey; + try { + encryptionKey = client.newDataEncryptionKey(); + } catch (IOException e) { + saslPromise.tryFailure(e); + return; + } + if (encryptionKey != null) { + doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), + encryptionKeyToPassword(encryptionKey.encryptionKey), + createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); + } else if (!UserGroupInformation.isSecurityEnabled()) { + saslPromise.trySuccess(null); + } else if (SecurityUtil.isPrivilegedPort(dnInfo.getXferPort())) { + saslPromise.trySuccess(null); + } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { + saslPromise.trySuccess(null); + } else if (saslPropsResolver != null) { + doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), + buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); + } else { + saslPromise.trySuccess(null); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java index 0e9f42e..538297e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java @@ -79,12 +79,12 @@ public class TestFanOutOneBlockAsyncDFSOutput { TEST_UTIL.shutdownMiniDFSCluster(); } - private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out) - throws IOException, InterruptedException, ExecutionException { + static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, + final FanOutOneBlockAsyncDFSOutput out) + throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -95,9 +95,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { }); assertEquals(b.length, handler.get()); out.close(); - assertEquals(b.length, FS.getFileStatus(f).getLen()); + assertEquals(b.length, dfs.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; - try (FSDataInputStream in = FS.open(f)) { + try (FSDataInputStream in = dfs.open(f)) { in.readFully(actual); } assertArrayEquals(b, actual); @@ -107,23 +107,20 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); - writeAndVerify(eventLoop, f, out); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + writeAndVerify(eventLoop, FS, f, out); } @Test public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -164,12 +161,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. - writeAndVerify(eventLoop, f, out); + writeAndVerify(eventLoop, FS, f, out); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java new file mode 100644 index 0000000..8c13828 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +@RunWith(Parameterized.class) +@Category({ MiscTests.class, MediumTests.class }) +public class TestSaslFanOutOneBlockAsyncDFSOutput { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static DistributedFileSystem FS; + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static int READ_TIMEOUT_MS = 200000; + + private static final File KEYTAB_FILE = new File( + TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + + private static String HOST = "localhost"; + + private static String USERNAME; + + private static String PRINCIPAL; + + private static String HTTP_PRINCIPAL; + @Rule + public TestName name = new TestName(); + + @Parameter(0) + public String protection; + + @Parameter(1) + public String encryptionAlgorithm; + + @Parameters(name = "{index}: protection={0}, encryption={1}") + public static Iterable data() { + List params = new ArrayList<>(); + for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { + for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { + params.add(new Object[] { protection, encryptionAlgorithm }); + } + } + return params; + } + + private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception { + // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1 + conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, + HTTP_PRINCIPAL + "@" + KDC.getRealm()); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + + File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath()); + keystoresDir.mkdirs(); + String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false); + + conf.setBoolean("ignore.secure.ports.for.testing", true); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG); + Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + Properties conf = MiniKdc.createConf(); + conf.put(MiniKdc.DEBUG, true); + KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath())); + KDC.start(); + USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); + PRINCIPAL = USERNAME + "/" + HOST; + HTTP_PRINCIPAL = "HTTP/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); + setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); + HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration()); + UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException, InterruptedException { + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().sync(); + } + if (KDC != null) { + KDC.stop(); + } + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(DFS_DATA_TRANSFER_PROTECTION_KEY, protection); + if (StringUtils.isBlank(encryptionAlgorithm)) { + TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); + TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + } else { + TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); + } + TEST_UTIL.startMiniDFSCluster(3); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @After + public void tearDown() throws IOException { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + private Path getTestFile() { + return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = getTestFile(); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop); + TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out); + } +} -- 1.9.1