diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ea5aed8..d0d9f6c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1592,6 +1592,11 @@ HIVE_SSL_PROTOCOL_BLACKLIST("hive.ssl.protocol.blacklist", "SSLv2,SSLv2Hello,SSLv3", "SSL Versions to disable for all Hive Servers"), + // General Thrift configs (Thrift configs common to Metastore and HiveServer2) + HIVE_THRIFT_SASL_MESSAGE_LIMIT("hive.thrift.sasl.message.limit", 104857600, + "If the length of incoming sasl message is greater than this, regard it as invalid and close the transport. " + + "Zero or less value disables this. Default is 100MB."), + // HiveServer2 specific configs HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null), "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " + diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index d1ef305..4c6cad6 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -5741,9 +5741,9 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, * configuration overrides * @throws Throwable */ - public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - HiveConf conf, Lock startLock, Condition startCondition, - MetaStoreThread.BooleanPointer startedServing) throws Throwable { + public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, HiveConf conf, + Lock startLock, Condition startCondition, MetaStoreThread.BooleanPointer startedServing) + throws Throwable { try { isMetaStoreRemote = true; // Server will create new threads up to max as necessary. After an idle @@ -5755,61 +5755,56 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); - TServerTransport serverTransport = tcpKeepAlive ? - new TServerSocketKeepAlive(port) : new TServerSocket(port); + TServerTransport serverTransport = + tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); TProcessor processor; TTransportFactory transFactory; - HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, - false); + HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false); IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); if (useSasl) { // we are in secure mode. if (useFramedTransport) { throw new HiveMetaException("Framed transport is not supported with SASL enabled."); } - saslServer = bridge.createServer( - conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), - conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); + int saslMessageLimit = conf.getIntVar(ConfVars.HIVE_THRIFT_SASL_MESSAGE_LIMIT); + saslServer = + bridge.createServer(conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); // start delegation token manager - saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS(), ServerMode.METASTORE); - transFactory = saslServer.createTransportFactory( - MetaStoreUtils.getMetaStoreSaslProperties(conf)); - processor = saslServer.wrapProcessor( - new ThriftHiveMetastore.Processor(handler)); + saslServer.startDelegationTokenSecretManager(conf, baseHandler.getMS(), + ServerMode.METASTORE); + transFactory = + saslServer.createTransportFactory(MetaStoreUtils.getMetaStoreSaslProperties(conf), + saslMessageLimit); + processor = + saslServer.wrapProcessor(new ThriftHiveMetastore.Processor(handler)); LOG.info("Starting DB backed MetaStore Server in Secure Mode"); } else { // we are in unsecure mode. if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) { - transFactory = useFramedTransport ? - new ChainedTTransportFactory(new TFramedTransport.Factory(), - new TUGIContainingTransport.Factory()) - : new TUGIContainingTransport.Factory(); - + transFactory = + useFramedTransport ? new ChainedTTransportFactory(new TFramedTransport.Factory(), + new TUGIContainingTransport.Factory()) : new TUGIContainingTransport.Factory(); processor = new TUGIBasedProcessor(handler); LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); } else { - transFactory = useFramedTransport ? - new TFramedTransport.Factory() : new TTransportFactory(); + transFactory = + useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory(); processor = new TSetIpAddressProcessor(handler); LOG.info("Starting DB backed MetaStore Server"); } } - TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport) - .processor(processor) - .transportFactory(transFactory) - .protocolFactory(new TBinaryProtocol.Factory()) - .minWorkerThreads(minWorkerThreads) - .maxWorkerThreads(maxWorkerThreads); + TThreadPoolServer.Args args = + new TThreadPoolServer.Args(serverTransport).processor(processor) + .transportFactory(transFactory).protocolFactory(new TBinaryProtocol.Factory()) + .minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads); TServer tServer = new TThreadPoolServer(args); - HMSHandler.LOG.info("Started the new metaserver on port [" + port - + "]..."); - HMSHandler.LOG.info("Options.minWorkerThreads = " - + minWorkerThreads); - HMSHandler.LOG.info("Options.maxWorkerThreads = " - + maxWorkerThreads); + HMSHandler.LOG.info("Started the new metaserver on port [" + port + "]..."); + HMSHandler.LOG.info("Options.minWorkerThreads = " + minWorkerThreads); + HMSHandler.LOG.info("Options.maxWorkerThreads = " + maxWorkerThreads); HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); if (startLock != null) { diff --git a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index 23ba79c..b4fc77b 100644 --- a/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -18,7 +18,6 @@ package org.apache.hive.service.auth; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -28,7 +27,6 @@ import java.util.Map; import javax.net.ssl.SSLServerSocket; -import javax.security.auth.login.LoginException; import javax.security.sasl.Sasl; import org.apache.hadoop.hive.conf.HiveConf; @@ -57,30 +55,28 @@ public class HiveAuthFactory { private static final Logger LOG = LoggerFactory.getLogger(HiveAuthFactory.class); - public enum AuthTypes { - NOSASL("NOSASL"), - NONE("NONE"), - LDAP("LDAP"), - KERBEROS("KERBEROS"), - CUSTOM("CUSTOM"), - PAM("PAM"); - - private final String authType; - - AuthTypes(String authType) { - this.authType = authType; - } - - public String getAuthName() { - return authType; - } + NOSASL, NONE, LDAP, KERBEROS, CUSTOM, PAM + } + public static enum TransTypes { + HTTP { + AuthTypes getDefaultAuthType() { + return AuthTypes.NOSASL; + } + }, + BINARY { + AuthTypes getDefaultAuthType() { + return AuthTypes.NONE; + } + }; + abstract AuthTypes getDefaultAuthType(); } - private HadoopThriftAuthBridge.Server saslServer; - private String authTypeStr; - private final String transportMode; + private final HadoopThriftAuthBridge.Server saslServer; + private final AuthTypes authType; + private final TransTypes transportType; + private final int saslMessageLimit; private final HiveConf conf; public static final String HS2_PROXY_USER = "hive.server2.proxy.user"; @@ -88,30 +84,28 @@ public String getAuthName() { public HiveAuthFactory(HiveConf conf) throws TTransportException { this.conf = conf; - transportMode = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); - authTypeStr = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); - - // In http mode we use NOSASL as the default auth type - if ("http".equalsIgnoreCase(transportMode)) { - if (authTypeStr == null) { - authTypeStr = AuthTypes.NOSASL.getAuthName(); + saslMessageLimit = conf.getIntVar(ConfVars.HIVE_THRIFT_SASL_MESSAGE_LIMIT); + String transTypeStr = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); + String authTypeStr = conf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); + transportType = TransTypes.valueOf(transTypeStr.toUpperCase()); + authType = + authTypeStr == null ? transportType.getDefaultAuthType() : AuthTypes.valueOf(authTypeStr + .toUpperCase()); + if (transportType == TransTypes.BINARY + && authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.name()) + && ShimLoader.getHadoopShims().isSecureShimImpl()) { + saslServer = + ShimLoader.getHadoopThriftAuthBridge().createServer( + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), + conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); + // start delegation token manager + try { + saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2); + } catch (Exception e) { + throw new TTransportException("Failed to start token manager", e); } } else { - if (authTypeStr == null) { - authTypeStr = AuthTypes.NONE.getAuthName(); - } - if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName()) - && ShimLoader.getHadoopShims().isSecureShimImpl()) { - saslServer = ShimLoader.getHadoopThriftAuthBridge() - .createServer(conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB), - conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); - // start delegation token manager - try { - saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2); - } catch (IOException e) { - throw new TTransportException("Failed to start token manager", e); - } - } + saslServer = null; } } @@ -123,42 +117,28 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException { return saslProps; } - public TTransportFactory getAuthTransFactory() throws LoginException { - TTransportFactory transportFactory; - if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { - try { - transportFactory = saslServer.createTransportFactory(getSaslProperties()); - } catch (TTransportException e) { - throw new LoginException(e.getMessage()); - } - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.NONE.getAuthName())) { - transportFactory = PlainSaslHelper.getPlainTransportFactory(authTypeStr); - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.LDAP.getAuthName())) { - transportFactory = PlainSaslHelper.getPlainTransportFactory(authTypeStr); - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.PAM.getAuthName())) { - transportFactory = PlainSaslHelper.getPlainTransportFactory(authTypeStr); - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.NOSASL.getAuthName())) { - transportFactory = new TTransportFactory(); - } else if (authTypeStr.equalsIgnoreCase(AuthTypes.CUSTOM.getAuthName())) { - transportFactory = PlainSaslHelper.getPlainTransportFactory(authTypeStr); - } else { - throw new LoginException("Unsupported authentication type " + authTypeStr); + public TTransportFactory getAuthTransFactory() throws Exception { + if (authType == AuthTypes.KERBEROS) { + return saslServer.createTransportFactory(getSaslProperties(), saslMessageLimit); } - return transportFactory; + if (authType == AuthTypes.NOSASL) { + return new TTransportFactory(); + } + return PlainSaslHelper.getPlainTransportFactory(authType.name(), saslMessageLimit); } /** * Returns the thrift processor factory for HiveServer2 running in binary mode + * * @param service * @return * @throws LoginException */ - public TProcessorFactory getAuthProcFactory(ThriftCLIService service) throws LoginException { - if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { + public TProcessorFactory getAuthProcFactory(ThriftCLIService service) { + if (authType == AuthTypes.KERBEROS) { return KerberosSaslHelper.getKerberosProcessorFactory(saslServer, service); - } else { - return PlainSaslHelper.getPlainProcessorFactory(service); } + return PlainSaslHelper.getPlainProcessorFactory(service); } public String getRemoteUser() { diff --git a/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java b/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java index afc1441..a7b0814 100644 --- a/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java +++ b/service/src/java/org/apache/hive/service/auth/PlainSaslHelper.java @@ -30,6 +30,7 @@ import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.SaslException; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods; import org.apache.hive.service.auth.PlainSaslServer.SaslPlainProvider; import org.apache.hive.service.cli.thrift.TCLIService.Iface; @@ -42,7 +43,6 @@ import org.apache.thrift.transport.TTransportFactory; public final class PlainSaslHelper { - public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService service) { return new SQLPlainProcessorFactory(service); } @@ -52,15 +52,18 @@ public static TProcessorFactory getPlainProcessorFactory(ThriftCLIService servic Security.addProvider(new SaslPlainProvider()); } - public static TTransportFactory getPlainTransportFactory(String authTypeStr) - throws LoginException { + public static TTransportFactory getPlainTransportFactory(String authTypeStr, int saslMessageLimit) + throws LoginException, AuthenticationException { + if (saslMessageLimit > 0) { + HadoopThriftAuthBridge.HiveThriftTransportFactory factory = + new HadoopThriftAuthBridge.HiveThriftTransportFactory(saslMessageLimit); + factory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap(), + new PlainServerCallbackHandler(authTypeStr)); + return factory; + } TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); - try { - saslFactory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap(), + saslFactory.addServerDefinition("PLAIN", authTypeStr, null, new HashMap(), new PlainServerCallbackHandler(authTypeStr)); - } catch (AuthenticationException e) { - throw new LoginException("Error setting callback handler" + e); - } return saslFactory; } diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java index 624ac6b..3e3dab3 100644 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java +++ b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java @@ -100,7 +100,8 @@ public Client createClientWithConf(String authMethod) { } @Override - public Server createServer(String keytabFile, String principalConf) throws TTransportException { + public Server createServer(String keytabFile, String principalConf) + throws TTransportException { return new Server(keytabFile, principalConf); } @@ -328,6 +329,7 @@ public Server() throws TTransportException { throw new TTransportException(ioe); } } + /** * Create a server with a kerberos keytab/principal. */ @@ -339,7 +341,6 @@ protected Server(String keytabFile, String principalConf) if (principalConf == null || principalConf.isEmpty()) { throw new TTransportException("No principal specified"); } - // Login from the keytab String kerberosName; try { @@ -355,15 +356,14 @@ protected Server(String keytabFile, String principalConf) } /** - * Create a TTransportFactory that, upon connection of a client socket, - * negotiates a Kerberized SASL transport. The resulting TTransportFactory - * can be passed as both the input and output transport factory when - * instantiating a TThreadPoolServer, for example. + * Create a TTransportFactory that, upon connection of a client socket, negotiates a Kerberized + * SASL transport. The resulting TTransportFactory can be passed as both the input and output + * transport factory when instantiating a TThreadPoolServer, for example. * * @param saslProps Map of SASL properties */ @Override - public TTransportFactory createTransportFactory(Map saslProps) + public TTransportFactory createTransportFactory(Map saslProps, int saslMessageLimit) throws TTransportException { // Parse out the kerberos principal, host, realm. String kerberosName = realUgi.getUserName(); @@ -371,17 +371,23 @@ public TTransportFactory createTransportFactory(Map saslProps) if (names.length != 3) { throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); } - + if (saslMessageLimit > 0) { + HadoopThriftAuthBridge.HiveThriftTransportFactory factory = + new HadoopThriftAuthBridge.HiveThriftTransportFactory(saslMessageLimit); + factory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName(), names[0], names[1], + saslProps, new SaslRpcServer.SaslGssCallbackHandler()); + factory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, + SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler( + secretManager)); + return new TUGIAssumingTransportFactory(factory, realUgi); + } TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); - transFactory.addServerDefinition( - AuthMethod.KERBEROS.getMechanismName(), - names[0], names[1], // two parts of kerberos principal - saslProps, - new SaslRpcServer.SaslGssCallbackHandler()); - transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), - null, SaslRpcServer.SASL_DEFAULT_REALM, - saslProps, new SaslDigestCallbackHandler(secretManager)); - + transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName(), names[0], names[1], + saslProps, new SaslRpcServer.SaslGssCallbackHandler()); + transFactory + .addServerDefinition(AuthMethod.DIGEST.getMechanismName(), null, + SaslRpcServer.SASL_DEFAULT_REALM, saslProps, new SaslDigestCallbackHandler( + secretManager)); return new TUGIAssumingTransportFactory(transFactory, realUgi); } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java index d011c67..8bb3708 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -19,15 +19,26 @@ package org.apache.hadoop.hive.thrift; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.ref.WeakReference; import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.WeakHashMap; + +import javax.security.auth.callback.CallbackHandler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.EncodingUtils; import org.apache.thrift.TProcessor; +import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is only overridden by the secure hadoop shim. It allows @@ -58,8 +69,7 @@ public String getServerPrincipal(String principalConfig, String host) "The current version of Hadoop does not support Authentication"); } - public Server createServer(String keytabFile, String principalConf) - throws TTransportException { + public Server createServer(String keytabFile, String principalConf) throws TTransportException { throw new UnsupportedOperationException( "The current version of Hadoop does not support Authentication"); } @@ -102,7 +112,9 @@ public abstract TTransport createClientTransport( public enum ServerMode { HIVESERVER2, METASTORE }; - public abstract TTransportFactory createTransportFactory(Map saslProps) throws TTransportException; + + public abstract TTransportFactory createTransportFactory(Map saslProps, + int saslMessageLimit) throws TTransportException; public abstract TProcessor wrapProcessor(TProcessor processor); public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor); public abstract InetAddress getRemoteAddress(); @@ -117,5 +129,103 @@ public abstract String getDelegationTokenWithService(String owner, String renewe public abstract void cancelDelegationToken(String tokenStrForm) throws IOException; public abstract String getUserFromToken(String tokenStr) throws IOException; } + + public static class HiveThriftTransportFactory extends TTransportFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(TSaslServerTransport.class); + private final int saslMessageLimit; + + public HiveThriftTransportFactory(int saslMessageLimit) { + this.saslMessageLimit = saslMessageLimit; + } + + private static class TSaslServerDefinition { + public String mechanism; + public String protocol; + public String serverName; + public Map props; + public CallbackHandler cbh; + + public TSaslServerDefinition(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + this.mechanism = mechanism; + this.protocol = protocol; + this.serverName = serverName; + this.props = props; + this.cbh = cbh; + } + } + + private static Map> transportMap = Collections + .synchronizedMap(new WeakHashMap>()); + private Map serverDefinitionMap = + new HashMap(); + + public void addServerDefinition(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + serverDefinitionMap.put(mechanism, new TSaslServerDefinition(mechanism, protocol, serverName, + props, cbh)); + } + + @Override + public TTransport getTransport(TTransport base) { + WeakReference ret = transportMap.get(base); + TSaslServerTransport transport = ret == null ? null : ret.get(); + if (transport == null) { + LOGGER.debug("transport map does not contain key {}", base); + transport = newSaslTransport(base); + try { + transport.open(); + } catch (TTransportException e) { + LOGGER.debug("failed to open server transport", e); + throw new RuntimeException(e); + } + transportMap.put(base, new WeakReference(transport)); + } else { + LOGGER.debug("transport map does contain key {}", base); + } + return transport; + } + + private TSaslServerTransport newSaslTransport(final TTransport base) { + TSaslServerTransport transport = new TSaslServerTransport(base) { + private final byte[] messageHeader = new byte[STATUS_BYTES + PAYLOAD_LENGTH_BYTES]; + + @Override + protected SaslResponse receiveSaslMessage() throws TTransportException { + underlyingTransport.readAll(messageHeader, 0, messageHeader.length); + byte statusByte = messageHeader[0]; + int length = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES); + if (length > saslMessageLimit) { + base.close(); + throw new TTransportException("Sasl message is too big (" + length + " bytes)"); + } + byte[] payload = new byte[length]; + underlyingTransport.readAll(payload, 0, payload.length); + NegotiationStatus status = NegotiationStatus.byValue(statusByte); + if (status == null) { + sendAndThrowMessage(NegotiationStatus.ERROR, "Invalid status " + statusByte); + } else if (status == NegotiationStatus.BAD || status == NegotiationStatus.ERROR) { + try { + String remoteMessage = new String(payload, "UTF-8"); + throw new TTransportException("Peer indicated failure: " + remoteMessage); + } catch (UnsupportedEncodingException e) { + throw new TTransportException(e); + } + } + if (LOGGER.isDebugEnabled()) + LOGGER.debug(getRole() + ": Received message with status {} and payload length {}", + status, payload.length); + return new SaslResponse(status, payload); + } + }; + for (Map.Entry entry : serverDefinitionMap.entrySet()) { + TSaslServerDefinition definition = entry.getValue(); + transport.addServerDefinition(entry.getKey(), definition.protocol, definition.serverName, + definition.props, definition.cbh); + } + return transport; + } + } + }