From 56362100caafb9fdf272025a1f0821ecd566c753 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 3 May 2016 17:54:52 +0800 Subject: [PATCH] HBASE-15743 Add Transparent Data Encryption support for FanOutOneBlockAsyncDFSOutput --- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 24 ++- .../FanOutOneBlockAsyncDFSOutputHelper.java | 9 +- .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 234 ++++++++++++--------- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 35 ++- 4 files changed, 196 insertions(+), 106 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 7d6a676..8dd7f5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -43,6 +43,7 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.ArrayDeque; import java.util.Collection; @@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -119,6 +121,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final LocatedBlock locatedBlock; + private final CryptoCodec cryptoCodec; + private final EventLoop eventLoop; private final List datanodeList; @@ -317,8 +321,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, EventLoop eventLoop, List datanodeList, - DataChecksum summer, ByteBufAllocator alloc) { + LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop, + List datanodeList, DataChecksum summer, ByteBufAllocator alloc) { this.conf = conf; this.fsUtils = fsUtils; this.dfs = dfs; @@ -328,6 +332,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.clientName = clientName; this.src = src; this.locatedBlock = locatedBlock; + this.cryptoCodec = cryptoCodec; this.eventLoop = eventLoop; this.datanodeList = datanodeList; this.summer = summer; @@ -342,16 +347,27 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { write(b, 0, b.length); } + private void write0(byte[] b, final int off, final int len) { + buf.ensureWritable(len); + if (cryptoCodec == null) { + buf.writeBytes(b, off, len); + } else { + ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len); + cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len)); + buf.writerIndex(buf.writerIndex() + len); + } + } + @Override public void write(final byte[] b, final int off, final int len) { if (eventLoop.inEventLoop()) { - buf.ensureWritable(len).writeBytes(b, off, len); + write0(b, off, len); } else { eventLoop.submit(new Runnable() { @Override public void run() { - buf.ensureWritable(len).writeBytes(b, off, len); + write0(b, off, len); } }).syncUninterruptibly(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 4f9058c..93aca0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -21,7 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; @@ -672,9 +672,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); } + CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client); + FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, + client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop, + datanodeList, summer, ALLOC); succ = true; - return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, - src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); + return output; } finally { if (!succ) { if (futureList != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 22c4e04..2e8253d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -54,6 +54,7 @@ import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -76,9 +77,15 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; @@ -111,7 +118,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { @VisibleForTesting static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = - "dfs.encrypt.data.transfer.cipher.suites"; + "dfs.encrypt.data.transfer.cipher.suites"; @VisibleForTesting static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; @@ -129,7 +136,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final SaslAdaptor SASL_ADAPTOR; - private interface CipherHelper { + private interface CipherOptionHelper { List getCipherOptions(Configuration conf) throws IOException; @@ -150,9 +157,9 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { byte[] getOutIv(Object cipherOption); } - private static final CipherHelper CIPHER_HELPER; + private static final CipherOptionHelper CIPHER_OPTION_HELPER; - private static final class CryptoCodec { + static final class CryptoCodec { private static final Method CREATE_CODEC; @@ -215,23 +222,34 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private final Object decryptor; public CryptoCodec(Configuration conf, Object cipherOption) { - Object codec; try { - codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption)); + Object codec = CREATE_CODEC.invoke(null, conf, + CIPHER_OPTION_HELPER.getCipherSuite(cipherOption)); encryptor = CREATE_ENCRYPTOR.invoke(codec); - byte[] encKey = CIPHER_HELPER.getInKey(cipherOption); - byte[] encIv = CIPHER_HELPER.getInIv(cipherOption); + byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption); + byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption); INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); decryptor = CREATE_DECRYPTOR.invoke(codec); - byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption); - byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption); + byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption); + byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption); INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) { + try { + Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite); + encryptor = CREATE_ENCRYPTOR.invoke(codec); + INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv); + decryptor = null; + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { try { ENCRYPT.invoke(encryptor, inBuffer, outBuffer); @@ -251,17 +269,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor27(Class saslDataTransferClientClass) throws NoSuchFieldException, NoSuchMethodException { - final Field saslPropsResolverField = - saslDataTransferClientClass.getDeclaredField("saslPropsResolver"); + final Field saslPropsResolverField = saslDataTransferClientClass + .getDeclaredField("saslPropsResolver"); saslPropsResolverField.setAccessible(true); - final Field trustedChannelResolverField = - saslDataTransferClientClass.getDeclaredField("trustedChannelResolver"); + final Field trustedChannelResolverField = saslDataTransferClientClass + .getDeclaredField("trustedChannelResolver"); trustedChannelResolverField.setAccessible(true); - final Field fallbackToSimpleAuthField = - saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth"); + final Field fallbackToSimpleAuthField = saslDataTransferClientClass + .getDeclaredField("fallbackToSimpleAuth"); fallbackToSimpleAuthField.setAccessible(true); - final Method getSaslDataTransferClientMethod = - DFSClient.class.getMethod("getSaslDataTransferClient"); + final Method getSaslDataTransferClientMethod = DFSClient.class + .getMethod("getSaslDataTransferClient"); final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); return new SaslAdaptor() { @@ -288,8 +306,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { @Override public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { try { - return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod - .invoke(client)); + return (AtomicBoolean) fallbackToSimpleAuthField + .get(getSaslDataTransferClientMethod.invoke(client)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -308,8 +326,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor25() { try { - final Field trustedChannelResolverField = - DFSClient.class.getDeclaredField("trustedChannelResolver"); + final Field trustedChannelResolverField = DFSClient.class + .getDeclaredField("trustedChannelResolver"); trustedChannelResolverField.setAccessible(true); final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); return new SaslAdaptor() { @@ -351,8 +369,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static SaslAdaptor createSaslAdaptor() { Class saslDataTransferClientClass = null; try { - saslDataTransferClientClass = - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); + saslDataTransferClientClass = Class + .forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); } catch (ClassNotFoundException e) { LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); } @@ -364,8 +382,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } - private static CipherHelper createCipherHelper25() { - return new CipherHelper() { + private static CipherOptionHelper createCipherHelper25() { + return new CipherOptionHelper() { @Override public byte[] getOutKey(Object cipherOption) { @@ -410,18 +428,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherHelper createCipherHelper27(Class cipherOptionClass) + private static CipherOptionHelper createCipherHelper27(Class cipherOptionClass) throws ClassNotFoundException, NoSuchMethodException { @SuppressWarnings("rawtypes") - Class cipherSuiteClass = - Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class); + Class cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite") + .asSubclass(Enum.class); @SuppressWarnings("unchecked") final Enum aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); - final Constructor cipherOptionConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass); - final Constructor cipherOptionWithKeyAndIvConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class, - byte[].class, byte[].class); + final Constructor cipherOptionConstructor = cipherOptionClass + .getConstructor(cipherSuiteClass); + final Constructor cipherOptionWithKeyAndIvConstructor = cipherOptionClass + .getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class); final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); @@ -429,16 +446,15 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - final Method convertCipherOptionsMethod = - PBHelper.class.getMethod("convertCipherOptions", List.class); - final Method convertCipherOptionProtosMethod = - PBHelper.class.getMethod("convertCipherOptionProtos", List.class); - final Method addAllCipherOptionMethod = - DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption", - Iterable.class); - final Method getCipherOptionListMethod = - DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList"); - return new CipherHelper() { + final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions", + List.class); + final Method convertCipherOptionProtosMethod = PBHelper.class + .getMethod("convertCipherOptionProtos", List.class); + final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class + .getMethod("addAllCipherOption", Iterable.class); + final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class + .getMethod("getCipherOptionList"); + return new CipherOptionHelper() { @Override public byte[] getOutKey(Object cipherOption) { @@ -532,9 +548,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { List cipherOptions; try { - cipherOptions = - (List) convertCipherOptionProtosMethod.invoke(null, - getCipherOptionListMethod.invoke(proto)); + cipherOptions = (List) convertCipherOptionProtosMethod.invoke(null, + getCipherOptionListMethod.invoke(proto)); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -557,7 +572,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherHelper createCipherHelper() { + private static CipherOptionHelper createCipherHelper() { Class cipherOptionClass; try { cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); @@ -574,7 +589,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { static { SASL_ADAPTOR = createSaslAdaptor(); - CIPHER_HELPER = createCipherHelper(); + CIPHER_OPTION_HELPER = createCipherHelper(); } /** @@ -643,9 +658,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { Map saslProps, int timeoutMs, Promise promise) throws SaslException { this.conf = conf; this.saslProps = saslProps; - this.saslClient = - Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, - saslProps, new SaslClientCallbackHandler(username, password)); + this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, + SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); this.timeoutMs = timeoutMs; this.promise = promise; } @@ -656,14 +670,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List options) throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); + DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto + .newBuilder(); builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { builder.setPayload(ByteString.copyFrom(payload)); } if (options != null) { - CIPHER_HELPER.addCipherOptions(builder, options); + CIPHER_OPTION_HELPER.addCipherOptions(builder, options); } DataTransferEncryptorMessageProto proto = builder.build(); int size = proto.getSerializedSize(); @@ -704,8 +718,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } private boolean requestedQopContainsPrivacy() { - Set requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = ImmutableSet + .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); return requestedQop.contains("auth-conf"); } @@ -713,15 +727,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { if (!saslClient.isComplete()) { throw new IOException("Failed to complete SASL handshake"); } - Set requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + Set requestedQop = ImmutableSet + .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); String negotiatedQop = getNegotiatedQop(); - LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " - + negotiatedQop); + LOG.debug( + "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); if (!requestedQop.contains(negotiatedQop)) { throw new IOException(String.format("SASL handshake completed, but " + "channel does not have acceptable quality of protection, " - + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + + "requested = %s, negotiated = %s", + requestedQop, negotiatedQop)); } } @@ -738,39 +753,40 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { byte[] challenge = proto.getPayload().toByteArray(); byte[] response = saslClient.evaluateChallenge(challenge); switch (step) { - case 1: { - List cipherOptions = null; - if (requestedQopContainsPrivacy()) { - cipherOptions = CIPHER_HELPER.getCipherOptions(conf); - } - sendSaslMessage(ctx, response, cipherOptions); - ctx.flush(); - step++; - break; + case 1: { + List cipherOptions = null; + if (requestedQopContainsPrivacy()) { + cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf); } - case 2: { - assert response == null; - checkSaslComplete(); - Object cipherOption = - CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); - ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); - } - if (cipherOption != null) { - CryptoCodec codec = new CryptoCodec(conf, cipherOption); - p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); - } else { - if (useWrap()) { - p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder( - Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient)); - } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; + } + case 2: { + assert response == null; + checkSaslComplete(); + Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto, + isNegotiatedQopPrivacy(), saslClient); + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = new CryptoCodec(conf, cipherOption); + p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), + new SaslUnwrapHandler(saslClient)); } - promise.trySuccess(null); - break; } - default: - throw new IllegalArgumentException("Unrecognized negotiation step: " + step); + promise.trySuccess(null); + break; + } + default: + throw new IllegalArgumentException("Unrecognized negotiation step: " + step); } } else { ctx.fireChannelRead(msg); @@ -992,8 +1008,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } if (encryptionKey != null) { if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " - + dnInfo); + LOG.debug( + "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), encryptionKeyToPassword(encryptionKey.encryptionKey), @@ -1018,8 +1034,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { saslPromise.trySuccess(null); } else if (saslPropsResolver != null) { if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = " - + dnInfo); + LOG.debug( + "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); } doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); @@ -1035,4 +1051,30 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { } } + private static KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo feInfo, + DFSClient client) throws IOException { + KeyProvider provider = client.getKeyProvider(); + if (provider == null) { + throw new IOException("No KeyProvider is configured, cannot access" + " an encrypted file"); + } + EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(feInfo.getKeyName(), + feInfo.getEzKeyVersionName(), feInfo.getIV(), feInfo.getEncryptedDataEncryptionKey()); + try { + KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension + .createKeyProviderCryptoExtension(provider); + return cryptoProvider.decryptEncryptedKey(ekv); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + + static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client) + throws IOException { + if (stat.getFileEncryptionInfo() == null) { + return null; + } + FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); + KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo, client); + return new CryptoCodec(conf, feInfo.getCipherSuite(), decrypted.getMaterial(), feInfo.getIV()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 545a39e..47f5f45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -29,6 +29,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -37,6 +38,10 @@ import java.util.concurrent.ExecutionException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.Options; +import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; @@ -86,6 +91,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static String PRINCIPAL; private static String HTTP_PRINCIPAL; + + private static String TEST_KEY_NAME = "test_key"; @Rule public TestName name = new TestName(); @@ -98,13 +105,19 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @Parameter(2) public String cipherSuite; - @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") + @Parameter(3) + public boolean useTransparentEncryption; + + @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}") public static Iterable data() { List params = new ArrayList<>(); for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) { - params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); + for (boolean useTransparentEncryption : Arrays.asList(false, true)) { + params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite, + useTransparentEncryption }); + } } } } @@ -130,6 +143,15 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false); conf.setBoolean("ignore.secure.ports.for.testing", true); + + // setup key provider + URI keyProviderUri = new URI(JavaKeyStoreProvider.SCHEME_NAME + "://file" + + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); + conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); + KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); + keyProvider.createKey(TEST_KEY_NAME, new Options(conf)); + keyProvider.flush(); + keyProvider.close(); } @BeforeClass @@ -161,6 +183,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } } + private Path testDirOnTestFs; + @Before public void setUp() throws Exception { TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); @@ -182,6 +206,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); + testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + FS.mkdirs(testDirOnTestFs); + if (useTransparentEncryption) { + FS.createEncryptionZone(testDirOnTestFs, TEST_KEY_NAME); + } } @After @@ -190,7 +219,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { } private Path getTestFile() { - return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + return new Path(testDirOnTestFs, "test"); } @Test -- 1.9.1