diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java index 6b7dc5b..7212599 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -206,8 +206,10 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { * @param ch channel to start connection on */ private void startHBaseConnection(Channel ch) { - ch.pipeline().addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast("beforeUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0)); + ch.pipeline().addLast("afterUnwrapDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast(new AsyncServerResponseHandler(this)); try { writeChannelHeader(ch).addListener(new GenericFutureListener() { @@ -264,7 +266,7 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { client.fallbackAllowed, client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), - getChannelHeaderBytes(authMethod), + getChannelHeaderBytes(authMethod), client.conf, new SaslClientHandler.SaslExceptionHandler() { @Override public void handle(int retryCount, Random random, Throwable cause) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index dc05af1..d040819 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -620,7 +620,7 @@ public class RpcClientImpl extends AbstractRpcClient { saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed, conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); - return saslRpcClient.saslConnect(in2, out2); + return saslRpcClient.saslConnect(in2, out2, conf); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java index ce32ed9..e9872e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java @@ -22,10 +22,14 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Properties; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -38,9 +42,14 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.crypto.aes.SaslAES; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.SaslInputStream; @@ -61,6 +70,9 @@ public class HBaseSaslRpcClient { private final SaslClient saslClient; private final boolean fallbackAllowed; protected final Map saslProps; + private boolean isAesEncryption; + private SaslAES saslAES; + /** * Create a HBaseSaslRpcClient for an authentication method * @@ -169,12 +181,11 @@ public class HBaseSaslRpcClient { * to simple Auth. * @throws IOException */ - public boolean saslConnect(InputStream inS, OutputStream outS) + public boolean saslConnect(InputStream inS, OutputStream outS, Configuration conf) throws IOException { DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( outS)); - try { byte[] saslToken = new byte[0]; if (saslClient.hasInitialResponse()) @@ -231,6 +242,9 @@ public class HBaseSaslRpcClient { LOG.debug("SASL client context established. Negotiated QoP: " + saslClient.getNegotiatedProperty(Sasl.QOP)); } + // complete the sasl negotiation, do the negotiation for + // aes encryption if necessary + negotiateAes(inS, outS, conf); return true; } catch (IOException e) { try { @@ -243,6 +257,63 @@ public class HBaseSaslRpcClient { } /** + * Negotiates AES cipher based on complete sasl client. The keys need to be + * decrypted by sasl client. + */ + private void negotiateAes(InputStream inS, OutputStream outS, + Configuration conf) throws IOException { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + boolean isSaslWrapEnabled = + SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); + boolean isSaslAesEnabled = conf.getBoolean( + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_DEFAULT); + + // return if doesn't use sasl encryption or aes for encryption + if (!isSaslWrapEnabled || !isSaslAesEnabled) { + return; + } + + // start the negotiation process, client send the cipher mode to server, + // server will compute the iv & key and back to client. + // initial the message for SaslCipherMeta, set the cipher mode + String transformation = conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CTR); + RPCProtos.SaslCipherMeta.Builder builder = RPCProtos.SaslCipherMeta.newBuilder(); + builder.setTransformation(transformation); + byte[] saslCipherMetaBytes = builder.build().toByteArray(); + // wrap the SaslCipherMeta message with sasl client + byte[] wrapped = saslClient.wrap(saslCipherMetaBytes, 0, saslCipherMetaBytes.length); + DataOutputStream dos = new DataOutputStream(outS); + dos.writeInt(wrapped.length); + dos.write(wrapped, 0, wrapped.length); + dos.flush(); + + // read the SaslCipherMeta message from server + DataInputStream inStream = new DataInputStream(inS); + int len = inStream.readInt(); + byte[] buff = new byte[len]; + inStream.readFully(buff); + + // unwrap the SaslCipherMeta message with sasl client + byte[] unwrappedResponse = saslClient.unwrap(buff, 0, buff.length); + RPCProtos.SaslCipherMeta saslCipherMeta = + RPCProtos.SaslCipherMeta.parseFrom(unwrappedResponse); + Properties properties = new Properties(); + // the property for cipher class + properties.setProperty(CryptoCipherFactory.CLASSES_KEY, + conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_DEFAULT)); + // create SaslAES for client + saslAES = new SaslAES(transformation, properties, + saslCipherMeta.getInKey().toByteArray(), + saslCipherMeta.getOutKey().toByteArray(), + saslCipherMeta.getInIv().toByteArray(), + saslCipherMeta.getOutIv().toByteArray()); + isAesEncryption = true; + } + + /** * Get a SASL wrapped InputStream. Can be called only after saslConnect() has * been called. * @@ -255,9 +326,59 @@ public class HBaseSaslRpcClient { if (!saslClient.isComplete()) { throw new IOException("Sasl authentication exchange hasn't completed yet"); } + if (isAesEncryption) { + return new WrappedInputStream(in); + } return new SaslInputStream(in, saslClient); } + class WrappedInputStream extends FilterInputStream { + private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0); + public WrappedInputStream(InputStream in) throws IOException { + super(in); + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int n = read(b, 0, 1); + return (n != -1) ? b[0] : -1; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + // fill the buffer with the next RPC message + if (unwrappedRpcBuffer.remaining() == 0) { + readNextRpcPacket(); + } + // satisfy as much of the request as possible + int readLen = Math.min(len, unwrappedRpcBuffer.remaining()); + unwrappedRpcBuffer.get(buf, off, readLen); + return readLen; + } + + // all messages must be wrapped by saslAES, else an exception is thrown + private void readNextRpcPacket() throws IOException { + LOG.debug("reading next wrapped RPC packet"); + DataInputStream dis = new DataInputStream(in); + int rpcLen = dis.readInt(); + byte[] rpcBuf = new byte[rpcLen]; + dis.readFully(rpcBuf); + + // unwrap with saslAES + rpcBuf = saslAES.unwrap(rpcBuf, 0, rpcBuf.length); + if (LOG.isDebugEnabled()) { + LOG.debug("unwrapping token of length:" + rpcBuf.length); + } + unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf); + } + } + /** * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has * been called. @@ -271,9 +392,31 @@ public class HBaseSaslRpcClient { if (!saslClient.isComplete()) { throw new IOException("Sasl authentication exchange hasn't completed yet"); } + if (isAesEncryption) { + return new WrappedOutputStream(out); + } return new SaslOutputStream(out, saslClient); } + class WrappedOutputStream extends FilterOutputStream { + public WrappedOutputStream(OutputStream out) throws IOException { + super(out); + } + @Override + public void write(byte[] buf, int off, int len) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("wrapping token of length:" + len); + } + + // wrap with saslAES + byte[] wrapped = saslAES.wrap(buf, off, len); + DataOutputStream dob = new DataOutputStream(out); + dob.writeInt(wrapped.length); + dob.write(wrapped, 0, wrapped.length); + dob.flush(); + } + } + /** Release resources used by wrapped saslClient */ public void dispose() throws SaslException { saslClient.dispose(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java index d583e20..e24c2e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java @@ -25,9 +25,14 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.crypto.aes.SaslAES; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -39,10 +44,10 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.util.Map; +import java.util.Properties; import java.util.Random; /** @@ -66,6 +71,10 @@ public class SaslClientHandler extends ChannelDuplexHandler { private byte[] saslToken; private byte[] connectionHeader; private boolean firstRead = true; + private boolean isAesEncryption; + private boolean isWaitAesNegotiate; + private SaslAES saslAES; + private final Configuration conf; private int retryCount = 0; private Random random; @@ -82,8 +91,9 @@ public class SaslClientHandler extends ChannelDuplexHandler { */ public SaslClientHandler(UserGroupInformation ticket, AuthMethod method, Token token, String serverPrincipal, boolean fallbackAllowed, - String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler, + String rpcProtection, byte[] connectionHeader, Configuration conf, SaslExceptionHandler exceptionHandler, SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException { + this.conf = conf; this.ticket = ticket; this.fallbackAllowed = fallbackAllowed; this.connectionHeader = connectionHeader; @@ -97,7 +107,7 @@ public class SaslClientHandler extends ChannelDuplexHandler { if (LOG.isDebugEnabled()) LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName() + " client to authenticate to service at " + token.getService()); - saslClient = createDigestSaslClient(new String[] { AuthMethod.DIGEST.getMechanismName() }, + saslClient = createDigestSaslClient(new String[] {AuthMethod.DIGEST.getMechanismName()}, SaslUtil.SASL_DEFAULT_REALM, new HBaseSaslRpcClient.SaslClientCallbackHandler(token)); break; case KERBEROS: @@ -113,7 +123,7 @@ public class SaslClientHandler extends ChannelDuplexHandler { throw new IOException( "Kerberos principal does not have the expected format: " + serverPrincipal); } - saslClient = createKerberosSaslClient(new String[] { AuthMethod.KERBEROS.getMechanismName() }, + saslClient = createKerberosSaslClient(new String[] {AuthMethod.KERBEROS.getMechanismName()}, names[0], names[1]); break; default: @@ -128,7 +138,7 @@ public class SaslClientHandler extends ChannelDuplexHandler { * Create a Digest Sasl client */ protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm, - CallbackHandler saslClientCallbackHandler) throws IOException { + CallbackHandler saslClientCallbackHandler) throws IOException { return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps, saslClientCallbackHandler); } @@ -140,7 +150,7 @@ public class SaslClientHandler extends ChannelDuplexHandler { * @param userSecondPart second part of username */ protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart, - String userSecondPart) throws IOException { + String userSecondPart) throws IOException { return Sasl .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps, null); @@ -232,16 +242,24 @@ public class SaslClientHandler extends ChannelDuplexHandler { ctx.pipeline().remove(this); successfulConnectHandler.onSuccess(ctx.channel()); } else { - byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length); - // write connection header - writeSaslToken(ctx, wrappedCH); - successfulConnectHandler.onSaslProtectionSucess(ctx.channel()); + requestNegotiateAes(ctx, conf); + if (!isWaitAesNegotiate) { + // negotiate and use sasl default encryption + postEncryptionNegotiation(ctx); + } } } } // Normal wrapped reading else { try { + if (isWaitAesNegotiate) { + // process the aes negotiation response from server + negotiateAesWithResponse(in, conf); + isWaitAesNegotiate = false; + postEncryptionNegotiation(ctx); + return; + } int length = in.readInt(); if (LOG.isDebugEnabled()) { LOG.debug("Actual length is " + length); @@ -256,7 +274,13 @@ public class SaslClientHandler extends ChannelDuplexHandler { try { ByteBuf b = ctx.channel().alloc().buffer(saslToken.length); - b.writeBytes(saslClient.unwrap(saslToken, 0, saslToken.length)); + byte[] unwrapped; + if (isAesEncryption) { + unwrapped = saslAES.unwrap(saslToken, 0, saslToken.length); + } else { + unwrapped = saslClient.unwrap(saslToken, 0, saslToken.length); + } + b.writeBytes(unwrapped); ctx.fireChannelRead(b); } catch (SaslException se) { @@ -270,6 +294,75 @@ public class SaslClientHandler extends ChannelDuplexHandler { } } + private void postEncryptionNegotiation(ChannelHandlerContext ctx) throws SaslException { + byte[] wrappedCH; + if (isAesEncryption) { + wrappedCH = saslAES.wrap(connectionHeader, 0, connectionHeader.length); + } else { + wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length); + } + // write connection header + writeSaslToken(ctx, wrappedCH); + successfulConnectHandler.onSaslProtectionSucess(ctx.channel()); + } + + /** + * Negotiates AES cipher based on complete sasl client. The keys need to be + * decrypted by sasl client. + */ + private void requestNegotiateAes(final ChannelHandlerContext ctx, + Configuration conf) throws IOException { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + boolean isSaslWrapEnabled = + SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); + boolean isSaslAesEnabled = conf.getBoolean( + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_DEFAULT); + + // return if doesn't use sasl encryption or aes for encryption + if (!isSaslWrapEnabled || !isSaslAesEnabled) { + return; + } + + // start the negotiation process, client send the cipher mode to server, + // server will compute the iv & key and back to client. + // initial the message for SaslCipherMeta, set the cipher mode + String transformation = conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CTR); + RPCProtos.SaslCipherMeta.Builder builder = RPCProtos.SaslCipherMeta.newBuilder(); + builder.setTransformation(transformation); + byte[] saslCipherMetaBytes = builder.build().toByteArray(); + // wrap the SaslCipherMeta message with sasl client + byte[] wrapped = saslClient.wrap(saslCipherMetaBytes, 0, saslCipherMetaBytes.length); + writeSaslToken(ctx, wrapped); + isWaitAesNegotiate = true; + } + + private void negotiateAesWithResponse(ByteBuf in, Configuration conf) + throws IOException { + // read the SaslCipherMeta message from server + int len = in.readInt(); + byte[] buff = new byte[len]; + in.readBytes(buff); + + // unwrap the SaslCipherMeta message with sasl client + byte[] unwrappedResponse = saslClient.unwrap(buff, 0, buff.length); + RPCProtos.SaslCipherMeta saslCipherMeta = + RPCProtos.SaslCipherMeta.parseFrom(unwrappedResponse); + Properties properties = new Properties(); + // the property for cipher class + properties.setProperty(CryptoCipherFactory.CLASSES_KEY, + conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_DEFAULT)); + // create SaslAES for client + saslAES = new SaslAES(saslCipherMeta.getTransformation(), properties, + saslCipherMeta.getInKey().toByteArray(), + saslCipherMeta.getOutKey().toByteArray(), + saslCipherMeta.getInIv().toByteArray(), + saslCipherMeta.getOutIv().toByteArray()); + isAesEncryption = true; + } + private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) { ByteBuf b = ctx.alloc().buffer(4 + saslToken.length); b.writeInt(saslToken.length); @@ -295,7 +388,8 @@ public class SaslClientHandler extends ChannelDuplexHandler { } } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { saslClient.dispose(); @@ -321,7 +415,11 @@ public class SaslClientHandler extends ChannelDuplexHandler { in.release(); try { - saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length); + if (isAesEncryption) { + saslToken = saslAES.wrap(unwrapped, 0, unwrapped.length); + } else { + saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length); + } } catch (SaslException se) { try { saslClient.dispose(); @@ -336,7 +434,8 @@ public class SaslClientHandler extends ChannelDuplexHandler { out.writeBytes(saslToken, 0, saslToken.length); ctx.write(out).addListener(new ChannelFutureListener() { - @Override public void operationComplete(ChannelFuture future) throws Exception { + @Override + public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { exceptionCaught(ctx, future.cause()); } diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 5b43553..9e5c87b 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -279,6 +279,10 @@ org.apache.htrace htrace-core + + org.apache.commons + commons-crypto + diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4c499a2..46483c3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1180,10 +1180,43 @@ public final class HConstants { /** Configuration key for the algorithm used for creating jks key, a string */ public static final String CRYPTO_KEY_ALGORITHM_CONF_KEY = "hbase.crypto.key.algorithm"; - /** Configuration key for the name of the alternate cipher algorithm for the cluster, a string */ + /** Configuration key for the name of the alternate cipher algorithm for the cluster */ public static final String CRYPTO_ALTERNATE_KEY_ALGORITHM_CONF_KEY = "hbase.crypto.alternate.key.algorithm"; + /** Configuration key for if enable AES with Sasl encryption */ + public static final String CRYPTO_SASL_ENCRYPTION_AES_ENABLED_CONF_KEY = + "hbase.crypto.sasl.encryption.aes.enabled"; + + public static final boolean CRYPTO_SASL_ENCRYPTION_AES_ENABLED_DEFAULT = false; + + /** Configuration key for the transformation of AES cipher */ + public static final String CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CONF_KEY = + "hbase.crypto.sasl.encryption.aes.cipher.transform"; + + public static final String CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CTR = + "AES/CTR/NoPadding"; + + /** Configuration key for the random of AES cipher */ + public static final String CRYPTO_SASL_ENCRYPTION_RANDOM_CONF_KEY = + "hbase.crypto.sasl.encryption.aes.crypto.random"; + + public static final String CRYPTO_SASL_ENCRYPTION_RANDOM_DEFAULT = + "org.apache.commons.crypto.random.JavaCryptoRandom"; + + /** Configuration key for the key size of AES cipher */ + public static final String CRYPTO_SASL_ENCRYPTION_AES_CIPHER_KEY_SIZE_CONF_KEY = + "hbase.crypto.sasl.encryption.aes.cipher.keySizeBits"; + + public static final int CRYPTO_SASL_ENCRYPTION_AES_CIPHER_KEY_SIZE_DEFAULT = 128; + + /** Configuration key for the class of AES cipher */ + public static final String CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_KEY = + "hbase.crypto.sasl.encryption.aes.cipher.class"; + + public static final String CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_DEFAULT = + "org.apache.commons.crypto.cipher.JceCipher"; + /** Configuration key for enabling WAL encryption, a boolean */ public static final String ENABLE_WAL_ENCRYPTION = "hbase.regionserver.wal.encryption"; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/aes/SaslAES.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/aes/SaslAES.java new file mode 100644 index 0000000..f9dcda3 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/aes/SaslAES.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.crypto.aes; + +import org.apache.commons.crypto.cipher.CryptoCipher; +import org.apache.commons.crypto.utils.Utils; +import org.apache.hadoop.hbase.HConstants; + +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.ShortBufferException; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Properties; + +/** + * AES encryption and decryption. + */ +public class SaslAES { + + private final CryptoCipher encryptor; + private final CryptoCipher decryptor; + + private final Integrity integrity; + + public SaslAES(String transformation, Properties properties, + byte[] inKey, byte[] outKey, byte[] inIv, byte[] outIv) throws IOException { + checkTransformation(transformation); + // encryptor + encryptor = Utils.getCipherInstance(transformation, properties); + try { + SecretKeySpec outKEYSpec = new SecretKeySpec(outKey, "AES"); + IvParameterSpec outIVSpec = new IvParameterSpec(outIv); + encryptor.init(Cipher.ENCRYPT_MODE, outKEYSpec, outIVSpec); + } catch (InvalidKeyException | InvalidAlgorithmParameterException e) { + throw new IOException("Failed to initialize encryptor", e); + } + + // decryptor + decryptor = Utils.getCipherInstance(transformation, properties); + try { + SecretKeySpec inKEYSpec = new SecretKeySpec(inKey, "AES"); + IvParameterSpec inIVSpec = new IvParameterSpec(inIv); + decryptor.init(Cipher.DECRYPT_MODE, inKEYSpec, inIVSpec); + } catch (InvalidKeyException | InvalidAlgorithmParameterException e) { + throw new IOException("Failed to initialize decryptor", e); + } + + integrity = new Integrity(outKey, inKey); + } + + /** + * Encrypts input data. The result composes of (msg, padding if needed, mac) and sequence num. + * @param data the input byte array + * @param offset the offset in input where the input starts + * @param len the input length + * @return the new encrypted byte array. + * @throws SaslException if error happens + */ + public byte[] wrap(byte[] data, int offset, int len) throws SaslException { + // mac + byte[] mac = integrity.getHMAC(data, offset, len); + integrity.incMySeqNum(); + + // encrypt + byte[] encrypted = new byte[len + 10]; + try { + int n = encryptor.update(data, offset, len, encrypted, 0); + encryptor.update(mac, 0, 10, encrypted, n); + } catch (ShortBufferException sbe) { + // this should not happen + throw new SaslException("Error happens during encrypt data", sbe); + } + + // append seqNum used for mac + byte[] wrapped = new byte[encrypted.length + 4]; + System.arraycopy(encrypted, 0, wrapped, 0, encrypted.length); + System.arraycopy(integrity.getSeqNum(), 0, wrapped, encrypted.length, 4); + + return wrapped; + } + + /** + * Decrypts input data. The input composes of (msg, padding if needed, mac) and sequence num. + * The result is msg. + * @param data the input byte array + * @param offset the offset in input where the input starts + * @param len the input length + * @return the new decrypted byte array. + * @throws SaslException if error happens + */ + public byte[] unwrap(byte[] data, int offset, int len) throws SaslException { + // get plaintext and seqNum + byte[] decrypted = new byte[len - 4]; + byte[] peerSeqNum = new byte[4]; + try { + decryptor.update(data, offset, len - 4, decrypted, 0); + } catch (ShortBufferException sbe) { + // this should not happen + throw new SaslException("Error happens during decrypt data", sbe); + } + System.arraycopy(data, offset + decrypted.length, peerSeqNum, 0, 4); + + // get msg and mac + byte[] msg = new byte[decrypted.length - 10]; + byte[] mac = new byte[10]; + System.arraycopy(decrypted, 0, msg, 0, msg.length); + System.arraycopy(decrypted, msg.length, mac, 0, 10); + + // check mac integrity and msg sequence + if (!integrity.compareHMAC(mac, peerSeqNum, msg, 0, msg.length)) { + throw new SaslException("Unmatched MAC"); + } + if (!integrity.comparePeerSeqNum(peerSeqNum)) { + throw new SaslException("Out of order sequencing of messages. Got: " + integrity.byteToInt + (peerSeqNum) + " Expected: " + integrity.peerSeqNum); + } + integrity.incPeerSeqNum(); + + return msg; + } + + private void checkTransformation(String transformation) throws IOException { + if (HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_TRANSFORM_CTR.equals(transformation)) { + return; + } + throw new IOException("AES cipher transformation is not supported: " + transformation); + } + + /** + * Helper class for providing integrity protection. + */ + private static class Integrity { + + private int mySeqNum = 0; + private int peerSeqNum = 0; + private byte[] seqNum = new byte[4]; + + private byte[] myKey; + private byte[] peerKey; + + Integrity(byte[] outKey, byte[] inKey) throws IOException { + myKey = outKey; + peerKey = inKey; + } + + byte[] getHMAC(byte[] msg, int start, int len) throws SaslException { + intToByte(mySeqNum); + return calculateHMAC(myKey, seqNum, msg, start, len); + } + + boolean compareHMAC(byte[] expectedHMAC, byte[] peerSeqNum, byte[] msg, int start, + int len) throws SaslException { + byte[] mac = calculateHMAC(peerKey, peerSeqNum, msg, start, len); + return Arrays.equals(mac, expectedHMAC); + } + + boolean comparePeerSeqNum(byte[] peerSeqNum) { + return this.peerSeqNum == byteToInt(peerSeqNum); + } + + byte[] getSeqNum() { + return seqNum; + } + + void incMySeqNum() { + mySeqNum ++; + } + + void incPeerSeqNum() { + peerSeqNum ++; + } + + private byte[] calculateHMAC(byte[] key, byte[] seqNum, byte[] msg, int start, + int len) throws SaslException { + byte[] seqAndMsg = new byte[4+len]; + System.arraycopy(seqNum, 0, seqAndMsg, 0, 4); + System.arraycopy(msg, start, seqAndMsg, 4, len); + + try { + SecretKey keyKi = new SecretKeySpec(key, "HmacMD5"); + Mac m = Mac.getInstance("HmacMD5"); + m.init(keyKi); + m.update(seqAndMsg); + byte[] hMAC_MD5 = m.doFinal(); + + /* First 10 bytes of HMAC_MD5 digest */ + byte macBuffer[] = new byte[10]; + System.arraycopy(hMAC_MD5, 0, macBuffer, 0, 10); + + return macBuffer; + } catch (InvalidKeyException e) { + throw new SaslException("Invalid bytes used for key of HMAC-MD5 hash.", e); + } catch (NoSuchAlgorithmException e) { + throw new SaslException("Error creating instance of MD5 MAC algorithm", e); + } + } + + private void intToByte(int num) { + for(int i = 3; i >= 0; i --) { + seqNum[i] = (byte)(num & 0xff); + num >>>= 8; + } + } + + private int byteToInt(byte[] seqNum) { + int answer = 0; + for (int i = 0; i < 4; i ++) { + answer <<= 8; + answer |= ((int)seqNum[i] & 0xff); + } + return answer; + } + } +} diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java index d05eb57..effa1ef 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java @@ -3782,6 +3782,905 @@ public final class RPCProtos { // @@protoc_insertion_point(class_scope:hbase.pb.ExceptionResponse) } + public interface SaslCipherMetaOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string transformation = 1; + /** + * required string transformation = 1; + */ + boolean hasTransformation(); + /** + * required string transformation = 1; + */ + java.lang.String getTransformation(); + /** + * required string transformation = 1; + */ + com.google.protobuf.ByteString + getTransformationBytes(); + + // optional bytes inKey = 2; + /** + * optional bytes inKey = 2; + */ + boolean hasInKey(); + /** + * optional bytes inKey = 2; + */ + com.google.protobuf.ByteString getInKey(); + + // optional bytes inIv = 3; + /** + * optional bytes inIv = 3; + */ + boolean hasInIv(); + /** + * optional bytes inIv = 3; + */ + com.google.protobuf.ByteString getInIv(); + + // optional bytes outKey = 4; + /** + * optional bytes outKey = 4; + */ + boolean hasOutKey(); + /** + * optional bytes outKey = 4; + */ + com.google.protobuf.ByteString getOutKey(); + + // optional bytes outIv = 5; + /** + * optional bytes outIv = 5; + */ + boolean hasOutIv(); + /** + * optional bytes outIv = 5; + */ + com.google.protobuf.ByteString getOutIv(); + } + /** + * Protobuf type {@code hbase.pb.SaslCipherMeta} + * + *
+   **
+   * Cipher meta for SASL
+   * 
+ */ + public static final class SaslCipherMeta extends + com.google.protobuf.GeneratedMessage + implements SaslCipherMetaOrBuilder { + // Use SaslCipherMeta.newBuilder() to construct. + private SaslCipherMeta(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SaslCipherMeta(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SaslCipherMeta defaultInstance; + public static SaslCipherMeta getDefaultInstance() { + return defaultInstance; + } + + public SaslCipherMeta getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SaslCipherMeta( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + transformation_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + inKey_ = input.readBytes(); + break; + } + case 26: { + bitField0_ |= 0x00000004; + inIv_ = input.readBytes(); + break; + } + case 34: { + bitField0_ |= 0x00000008; + outKey_ = input.readBytes(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + outIv_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_hbase_pb_SaslCipherMeta_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_hbase_pb_SaslCipherMeta_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public SaslCipherMeta parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SaslCipherMeta(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string transformation = 1; + public static final int TRANSFORMATION_FIELD_NUMBER = 1; + private java.lang.Object transformation_; + /** + * required string transformation = 1; + */ + public boolean hasTransformation() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string transformation = 1; + */ + public java.lang.String getTransformation() { + java.lang.Object ref = transformation_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + transformation_ = s; + } + return s; + } + } + /** + * required string transformation = 1; + */ + public com.google.protobuf.ByteString + getTransformationBytes() { + java.lang.Object ref = transformation_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transformation_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional bytes inKey = 2; + public static final int INKEY_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString inKey_; + /** + * optional bytes inKey = 2; + */ + public boolean hasInKey() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes inKey = 2; + */ + public com.google.protobuf.ByteString getInKey() { + return inKey_; + } + + // optional bytes inIv = 3; + public static final int INIV_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString inIv_; + /** + * optional bytes inIv = 3; + */ + public boolean hasInIv() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bytes inIv = 3; + */ + public com.google.protobuf.ByteString getInIv() { + return inIv_; + } + + // optional bytes outKey = 4; + public static final int OUTKEY_FIELD_NUMBER = 4; + private com.google.protobuf.ByteString outKey_; + /** + * optional bytes outKey = 4; + */ + public boolean hasOutKey() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bytes outKey = 4; + */ + public com.google.protobuf.ByteString getOutKey() { + return outKey_; + } + + // optional bytes outIv = 5; + public static final int OUTIV_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString outIv_; + /** + * optional bytes outIv = 5; + */ + public boolean hasOutIv() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes outIv = 5; + */ + public com.google.protobuf.ByteString getOutIv() { + return outIv_; + } + + private void initFields() { + transformation_ = ""; + inKey_ = com.google.protobuf.ByteString.EMPTY; + inIv_ = com.google.protobuf.ByteString.EMPTY; + outKey_ = com.google.protobuf.ByteString.EMPTY; + outIv_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTransformation()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getTransformationBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, inKey_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, inIv_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, outKey_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, outIv_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getTransformationBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, inKey_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, inIv_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, outKey_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, outIv_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta other = (org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta) obj; + + boolean result = true; + result = result && (hasTransformation() == other.hasTransformation()); + if (hasTransformation()) { + result = result && getTransformation() + .equals(other.getTransformation()); + } + result = result && (hasInKey() == other.hasInKey()); + if (hasInKey()) { + result = result && getInKey() + .equals(other.getInKey()); + } + result = result && (hasInIv() == other.hasInIv()); + if (hasInIv()) { + result = result && getInIv() + .equals(other.getInIv()); + } + result = result && (hasOutKey() == other.hasOutKey()); + if (hasOutKey()) { + result = result && getOutKey() + .equals(other.getOutKey()); + } + result = result && (hasOutIv() == other.hasOutIv()); + if (hasOutIv()) { + result = result && getOutIv() + .equals(other.getOutIv()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasTransformation()) { + hash = (37 * hash) + TRANSFORMATION_FIELD_NUMBER; + hash = (53 * hash) + getTransformation().hashCode(); + } + if (hasInKey()) { + hash = (37 * hash) + INKEY_FIELD_NUMBER; + hash = (53 * hash) + getInKey().hashCode(); + } + if (hasInIv()) { + hash = (37 * hash) + INIV_FIELD_NUMBER; + hash = (53 * hash) + getInIv().hashCode(); + } + if (hasOutKey()) { + hash = (37 * hash) + OUTKEY_FIELD_NUMBER; + hash = (53 * hash) + getOutKey().hashCode(); + } + if (hasOutIv()) { + hash = (37 * hash) + OUTIV_FIELD_NUMBER; + hash = (53 * hash) + getOutIv().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code hbase.pb.SaslCipherMeta} + * + *
+     **
+     * Cipher meta for SASL
+     * 
+ */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMetaOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_hbase_pb_SaslCipherMeta_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_hbase_pb_SaslCipherMeta_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + transformation_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + inKey_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + inIv_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); + outKey_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + outIv_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_hbase_pb_SaslCipherMeta_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta build() { + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta result = new org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.transformation_ = transformation_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.inKey_ = inKey_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.inIv_ = inIv_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.outKey_ = outKey_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.outIv_ = outIv_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta.getDefaultInstance()) return this; + if (other.hasTransformation()) { + bitField0_ |= 0x00000001; + transformation_ = other.transformation_; + onChanged(); + } + if (other.hasInKey()) { + setInKey(other.getInKey()); + } + if (other.hasInIv()) { + setInIv(other.getInIv()); + } + if (other.hasOutKey()) { + setOutKey(other.getOutKey()); + } + if (other.hasOutIv()) { + setOutIv(other.getOutIv()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTransformation()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.RPCProtos.SaslCipherMeta) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string transformation = 1; + private java.lang.Object transformation_ = ""; + /** + * required string transformation = 1; + */ + public boolean hasTransformation() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string transformation = 1; + */ + public java.lang.String getTransformation() { + java.lang.Object ref = transformation_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + transformation_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string transformation = 1; + */ + public com.google.protobuf.ByteString + getTransformationBytes() { + java.lang.Object ref = transformation_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + transformation_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string transformation = 1; + */ + public Builder setTransformation( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + transformation_ = value; + onChanged(); + return this; + } + /** + * required string transformation = 1; + */ + public Builder clearTransformation() { + bitField0_ = (bitField0_ & ~0x00000001); + transformation_ = getDefaultInstance().getTransformation(); + onChanged(); + return this; + } + /** + * required string transformation = 1; + */ + public Builder setTransformationBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + transformation_ = value; + onChanged(); + return this; + } + + // optional bytes inKey = 2; + private com.google.protobuf.ByteString inKey_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes inKey = 2; + */ + public boolean hasInKey() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes inKey = 2; + */ + public com.google.protobuf.ByteString getInKey() { + return inKey_; + } + /** + * optional bytes inKey = 2; + */ + public Builder setInKey(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + inKey_ = value; + onChanged(); + return this; + } + /** + * optional bytes inKey = 2; + */ + public Builder clearInKey() { + bitField0_ = (bitField0_ & ~0x00000002); + inKey_ = getDefaultInstance().getInKey(); + onChanged(); + return this; + } + + // optional bytes inIv = 3; + private com.google.protobuf.ByteString inIv_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes inIv = 3; + */ + public boolean hasInIv() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bytes inIv = 3; + */ + public com.google.protobuf.ByteString getInIv() { + return inIv_; + } + /** + * optional bytes inIv = 3; + */ + public Builder setInIv(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + inIv_ = value; + onChanged(); + return this; + } + /** + * optional bytes inIv = 3; + */ + public Builder clearInIv() { + bitField0_ = (bitField0_ & ~0x00000004); + inIv_ = getDefaultInstance().getInIv(); + onChanged(); + return this; + } + + // optional bytes outKey = 4; + private com.google.protobuf.ByteString outKey_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes outKey = 4; + */ + public boolean hasOutKey() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bytes outKey = 4; + */ + public com.google.protobuf.ByteString getOutKey() { + return outKey_; + } + /** + * optional bytes outKey = 4; + */ + public Builder setOutKey(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + outKey_ = value; + onChanged(); + return this; + } + /** + * optional bytes outKey = 4; + */ + public Builder clearOutKey() { + bitField0_ = (bitField0_ & ~0x00000008); + outKey_ = getDefaultInstance().getOutKey(); + onChanged(); + return this; + } + + // optional bytes outIv = 5; + private com.google.protobuf.ByteString outIv_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes outIv = 5; + */ + public boolean hasOutIv() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes outIv = 5; + */ + public com.google.protobuf.ByteString getOutIv() { + return outIv_; + } + /** + * optional bytes outIv = 5; + */ + public Builder setOutIv(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + outIv_ = value; + onChanged(); + return this; + } + /** + * optional bytes outIv = 5; + */ + public Builder clearOutIv() { + bitField0_ = (bitField0_ & ~0x00000010); + outIv_ = getDefaultInstance().getOutIv(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:hbase.pb.SaslCipherMeta) + } + + static { + defaultInstance = new SaslCipherMeta(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:hbase.pb.SaslCipherMeta) + } + public interface RequestHeaderOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -6202,6 +7101,11 @@ public final class RPCProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_hbase_pb_ExceptionResponse_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor + internal_static_hbase_pb_SaslCipherMeta_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_hbase_pb_SaslCipherMeta_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor internal_static_hbase_pb_RequestHeader_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable @@ -6231,17 +7135,20 @@ public final class RPCProtos { "llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" + "Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023", "\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001(\t\022\014\n" + - "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\311\001\n\rRe" + - "questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" + - "fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" + - "ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\0220\n\017cell" + - "_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" + - "ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001" + - "\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" + - "eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" + - "e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce", - "llBlockMetaB<\n*org.apache.hadoop.hbase.p" + - "rotobuf.generatedB\tRPCProtosH\001\240\001\001" + "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"d\n\016Sas" + + "lCipherMeta\022\026\n\016transformation\030\001 \002(\t\022\r\n\005i" + + "nKey\030\002 \001(\014\022\014\n\004inIv\030\003 \001(\014\022\016\n\006outKey\030\004 \001(\014" + + "\022\r\n\005outIv\030\005 \001(\014\"\311\001\n\rRequestHeader\022\017\n\007cal" + + "l_id\030\001 \001(\r\022&\n\ntrace_info\030\002 \001(\0132\022.hbase.p" + + "b.RPCTInfo\022\023\n\013method_name\030\003 \001(\t\022\025\n\rreque" + + "st_param\030\004 \001(\010\0220\n\017cell_block_meta\030\005 \001(\0132" + + "\027.hbase.pb.CellBlockMeta\022\020\n\010priority\030\006 \001" + + "(\r\022\017\n\007timeout\030\007 \001(\r\"\203\001\n\016ResponseHeader\022\017", + "\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 \001(\0132\033.hba" + + "se.pb.ExceptionResponse\0220\n\017cell_block_me" + + "ta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*org" + + ".apache.hadoop.hbase.protobuf.generatedB" + + "\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6272,14 +7179,20 @@ public final class RPCProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ExceptionResponse_descriptor, new java.lang.String[] { "ExceptionClassName", "StackTrace", "Hostname", "Port", "DoNotRetry", }); - internal_static_hbase_pb_RequestHeader_descriptor = + internal_static_hbase_pb_SaslCipherMeta_descriptor = getDescriptor().getMessageTypes().get(4); + internal_static_hbase_pb_SaslCipherMeta_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_hbase_pb_SaslCipherMeta_descriptor, + new java.lang.String[] { "Transformation", "InKey", "InIv", "OutKey", "OutIv", }); + internal_static_hbase_pb_RequestHeader_descriptor = + getDescriptor().getMessageTypes().get(5); internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_RequestHeader_descriptor, new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", }); internal_static_hbase_pb_ResponseHeader_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(6); internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ResponseHeader_descriptor, diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index 8413d25..c314164 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -112,6 +112,17 @@ message ExceptionResponse { optional bool do_not_retry = 5; } +/** + * Cipher meta for SASL + */ +message SaslCipherMeta { + required string transformation = 1; + optional bytes inKey = 2; + optional bytes inIv = 3; + optional bytes outKey = 4; + optional bytes outIv = 5; +} + // Header sent making a request. message RequestHeader { // Monotonically increasing call_id to keep track of RPC requests and their response diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 4b27924..1490664 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -42,6 +42,7 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; +import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -50,6 +51,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -66,6 +68,10 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.protobuf.ByteString; +import org.apache.commons.crypto.cipher.CryptoCipherFactory; +import org.apache.commons.crypto.random.CryptoRandom; +import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -86,10 +92,12 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.crypto.aes.SaslAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; @@ -414,6 +422,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.response = new BufferChain(response); } + protected synchronized void setSaslAesResponse(ByteBuffer response) { + this.response = new BufferChain(response); + } + protected synchronized void setResponse(Object m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; @@ -546,9 +558,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { byte [] responseBytes = bc.getBytes(); byte [] token; // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); + // threads using saslServer or saslAES to wrap responses. + if (connection.useAesWrap) { + // wrap with saslAES + synchronized (connection.saslAES) { + token = connection.saslAES.wrap(responseBytes, 0, responseBytes.length); + } + } else { + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); + } } if (LOG.isTraceEnabled()) { LOG.trace("Adding saslServer wrapped token of size " + token.length @@ -1224,13 +1243,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private AuthMethod authMethod; private boolean saslContextEstablished; + private boolean saslWaitAesNegotiation; private boolean skipInitialSaslHandshake; private ByteBuffer unwrappedData; // When is this set? FindBugs wants to know! Says NP private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); boolean useSasl; SaslServer saslServer; + private SaslAES saslAES; private boolean useWrap = false; + private boolean useAesWrap = false; // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, @@ -1241,6 +1263,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, 0, null, null, 0); + // Fake 'call' for SASL aes encryption context setup + private static final int SASL_AES_CALLID = -34; + private final Call saslAesCall = new Call(SASL_AES_CALLID, null, null, null, + null, null, this, null, 0, null, null, 0); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1351,7 +1377,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { processOneRpc(saslToken); } else { byte[] b = saslToken.array(); - byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + byte [] plaintextData; + if (useAesWrap) { + // unwrap with saslAES + plaintextData = saslAES.unwrap(b, saslToken.position(), saslToken.limit()); + } else { + plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + } processUnwrappedData(plaintextData); } } else { @@ -1403,7 +1435,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken.array()); + // do the negotiation for sasl encryption with aes + if (saslWaitAesNegotiation) { + // get the cipher meta data from client, the data is wrapped by + // sasl default encryption + RPCProtos.SaslCipherMeta saslCipherMeta = + RPCProtos.SaslCipherMeta.parseFrom(saslServer.unwrap( + saslToken.array(), 0, saslToken.limit())); + // generate the key, iv + byte[] saslCipherMetaBytes = negotiateAes(saslCipherMeta); + // send the cipher data to client to finish the aes negotiation + doSaslAesReply(saslCipherMetaBytes); + saslContextEstablished = true; + saslWaitAesNegotiation = false; + useAesWrap = true; + return; + } else { + replyToken = saslServer.evaluateResponse(saslToken.array()); + } } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1433,6 +1482,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (saslServer.isComplete()) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); useWrap = qop != null && !"auth".equalsIgnoreCase(qop); + boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY + .getSaslQop().equalsIgnoreCase(qop); ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); if (LOG.isDebugEnabled()) { LOG.debug("SASL server context established. Authenticated client: " @@ -1441,7 +1492,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } metrics.authenticationSuccess(); AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi); - saslContextEstablished = true; + boolean isAesEncryption = isEncryption && conf.getBoolean( + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_DEFAULT); + // if use aes for sasl encryption, wait and negotiate with client + // set saslContextEstablished = true after negotiation is finished + if (isAesEncryption) { + saslWaitAesNegotiation = true; + } else { + saslContextEstablished = true; + } } } } @@ -1478,6 +1538,33 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + /** + * No protobuf encoding of raw sasl messages + */ + private void doSaslAesReply(byte[] wrappedCipherMetaData) throws IOException { + ByteBufferOutputStream response = null; + DataOutputStream out = null; + try { + // In my testing, have noticed that sasl aes messages are usually + // in the ballpark of 100-200. That's why the initial capacity is 256. + response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4); + out = new DataOutputStream(response); + out.writeInt(wrappedCipherMetaData.length); + out.write(wrappedCipherMetaData); + + saslAesCall.setSaslAesResponse(response.getByteBuffer()); + saslAesCall.responder = responder; + saslAesCall.sendResponseIfReady(); + } finally { + if (response != null) { + response.close(); + } + if (out != null) { + out.close(); + } + } + } + private void disposeSasl() { if (saslServer != null) { try { @@ -1722,8 +1809,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort + " with unknown version info"); } - - } /** @@ -1788,7 +1873,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); @@ -1912,6 +1996,71 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + /** + + * Negotiates AES based on complete saslServer. The keys need to be encrypted by + + * sasl server. + + */ + private byte[] negotiateAes(RPCProtos.SaslCipherMeta saslCipherMeta) + throws IOException { + // receive cipher mode from client + String transformation = saslCipherMeta.getTransformation(); + Properties properties = new Properties(); + // the property for SecureRandomFactory + properties.setProperty(CryptoRandomFactory.CLASSES_KEY, + conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_RANDOM_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_RANDOM_DEFAULT)); + // the property for cipher class + properties.setProperty(CryptoCipherFactory.CLASSES_KEY, + conf.get(HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_CLASS_DEFAULT)); + + int cipherKeyBits = conf.getInt( + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_KEY_SIZE_CONF_KEY, + HConstants.CRYPTO_SASL_ENCRYPTION_AES_CIPHER_KEY_SIZE_DEFAULT); + // generate key and iv + if (cipherKeyBits % 8 != 0) { + throw new IllegalArgumentException("The AES cipher key size in bits" + + " should be a multiple of byte"); + } + int len = cipherKeyBits / 8; + byte[] inKey = new byte[len]; + byte[] outKey = new byte[len]; + byte[] inIv = new byte[len]; + byte[] outIv = new byte[len]; + + try { + // generate the cipher meta data with SecureRandom + CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); + secureRandom.nextBytes(inKey); + secureRandom.nextBytes(outKey); + secureRandom.nextBytes(inIv); + secureRandom.nextBytes(outIv); + } catch (GeneralSecurityException gse) { + throw new IOException(gse.getMessage(), gse); + } + + // create SaslAES for server + saslAES = new SaslAES(transformation, properties, + inKey, outKey, inIv, outIv); + // create SaslCipherMeta and send to client, for client, + // the [inKey, outKey], [inIv, outIv] should be reversed + RPCProtos.SaslCipherMeta.Builder builder = + RPCProtos.SaslCipherMeta.newBuilder(); + builder.setTransformation(transformation); + builder.setInIv(getByteString(outIv)); + builder.setInKey(getByteString(outKey)); + builder.setOutIv(getByteString(inIv)); + builder.setOutKey(getByteString(inKey)); + byte[] saslCipherMetaBytes = builder.build().toByteArray(); + // encrypt the aes cipher meta data with sasl server + return saslServer.wrap(saslCipherMetaBytes, 0, saslCipherMetaBytes.length); + } + + private ByteString getByteString(byte[] bytes) { + // return singleton to reduce object allocation + return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); + } + private boolean authorizeConnection() throws IOException { try { // If auth method is DIGEST, the token was obtained by the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 4637a01..065b22f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -192,7 +192,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); - HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration()); + HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(), false); UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java index 385b7b0..0fb7d4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.ThreadLocalRandom; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -138,6 +137,8 @@ public abstract class AbstractTestSecureIPC { abstract Class getRpcClientClass(); + abstract boolean isAesEnabled(); + @Rule public ExpectedException exception = ExpectedException.none(); @@ -166,9 +167,9 @@ public abstract class AbstractTestSecureIPC { krbKeytab = getKeytabFileForTesting(); krbPrincipal = getPrincipalForTesting(); ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); - clientConf = getSecuredConfiguration(); + clientConf = getSecuredConfiguration(isAesEnabled()); clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, getRpcClientClass().getName()); - serverConf = getSecuredConfiguration(); + serverConf = getSecuredConfiguration(isAesEnabled()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java index 237efe9..991423b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HBaseKerberosUtils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.security; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.base.Strings; @@ -67,18 +68,21 @@ public class HBaseKerberosUtils { return conf; } - public static Configuration getSecuredConfiguration() { + public static Configuration getSecuredConfiguration(boolean isAesEnabled) { Configuration conf = HBaseConfiguration.create(); - setSecuredConfiguration(conf); + setSecuredConfiguration(conf, isAesEnabled); return conf; } - public static void setSecuredConfiguration(Configuration conf) { + public static void setSecuredConfiguration(Configuration conf, boolean isAesEnabled) { conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); conf.set(KRB_KEYTAB_FILE, System.getProperty(KRB_KEYTAB_FILE)); conf.set(KRB_PRINCIPAL, System.getProperty(KRB_PRINCIPAL)); conf.set(MASTER_KRB_PRINCIPAL, System.getProperty(KRB_PRINCIPAL)); + if (isAesEnabled) { + conf.set(HConstants.CRYPTO_SASL_ENCRYPTION_AES_ENABLED_CONF_KEY, "true"); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsynSecureIPCWithAes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsynSecureIPCWithAes.java new file mode 100644 index 0000000..09382ed --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsynSecureIPCWithAes.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.hbase.ipc.AsyncRpcClient; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ SecurityTests.class, SmallTests.class }) +public class TestAsynSecureIPCWithAes extends AbstractTestSecureIPC { + Class getRpcClientClass() { + return AsyncRpcClient.class; + } + + boolean isAesEnabled() { + return true; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java index ea37915..37342bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestAsyncSecureIPC.java @@ -30,4 +30,8 @@ public class TestAsyncSecureIPC extends AbstractTestSecureIPC { Class getRpcClientClass() { return AsyncRpcClient.class; } + + boolean isAesEnabled() { + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java index 98ea221..0a7a53a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -30,4 +30,8 @@ public class TestSecureIPC extends AbstractTestSecureIPC { Class getRpcClientClass() { return RpcClientImpl.class; } + + boolean isAesEnabled() { + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPCWithAes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPCWithAes.java new file mode 100644 index 0000000..525d85d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPCWithAes.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientImpl; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.experimental.categories.Category; + +@Category({ SecurityTests.class, SmallTests.class }) +public class TestSecureIPCWithAes extends AbstractTestSecureIPC { + + Class getRpcClientClass() { + return RpcClientImpl.class; + } + + boolean isAesEnabled() { + return true; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestUsersOperationsWithSecureHadoop.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestUsersOperationsWithSecureHadoop.java index 0226d49..9861696 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestUsersOperationsWithSecureHadoop.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestUsersOperationsWithSecureHadoop.java @@ -99,7 +99,7 @@ public class TestUsersOperationsWithSecureHadoop { assertNotNull("KerberosKeytab was not specified", nnKeyTab); assertNotNull("KerberosPrincipal was not specified", dnPrincipal); - conf = getSecuredConfiguration(); + conf = getSecuredConfiguration(false); UserGroupInformation.setConfiguration(conf); User.login(conf, HBaseKerberosUtils.KRB_KEYTAB_FILE, HBaseKerberosUtils.KRB_PRINCIPAL, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java index a469537..bf6e61e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/SecureTestCluster.java @@ -97,7 +97,7 @@ public class SecureTestCluster { HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); - HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration()); + HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(), false); setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration()); TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, diff --git a/pom.xml b/pom.xml index e1d7181..75ca6e8 100644 --- a/pom.xml +++ b/pom.xml @@ -1306,6 +1306,7 @@ 2.11.6 1.46 1.0.0-RC2 + 1.0.0 2.4 1.8 @@ -1874,6 +1875,11 @@ kerb-simplekdc ${kerby.version} + + org.apache.commons + commons-crypto + ${commons-crypto.version} +