commit bd62f382675a5c698271ccdb45dd17ec9e21b635 Author: Todd Lipcon Date: Mon Sep 20 17:45:22 2010 -0700 Metastore with SASL+Kerberos basically functional Bit of cleanup, add confs Move SASL stuff into shim layer Some refactoring to add inner Client and Server classes Metastore now impersonates clients and creates directories as them diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bed5b5d..e980ffd 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -150,6 +150,9 @@ public class HiveConf extends Configuration { "_INTERMEDIATE_ARCHIVED"), METASTORE_INT_EXTRACTED("hive.metastore.archive.intermediate.extracted", "_INTERMEDIATE_EXTRACTED"), + METASTORE_KERBEROS_KEYTAB_FILE("hive.metastore.kerberos.keytab.file", ""), + METASTORE_KERBEROS_PRINCIPAL("hive.metastore.kerberos.principal", ""), + METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled", false), // Default parameters for creating tables NEWTABLEDEFAULTPARA("hive.table.parameters.default",""), diff --git conf/hive-default.xml conf/hive-default.xml index f31250e..aaa38b8 100644 --- conf/hive-default.xml +++ conf/hive-default.xml @@ -570,6 +570,24 @@ + hive.metastore.sasl.enabled + false + If true, the metastore thrift interface will be secured with SASL. Clients must authenticate with Kerberos. + + + + hive.metastore.kerberos.keytab.file + + The path to the Kerberos Keytab file containing the metastore thrift server's service principal. + + + + hive.metastore.kerberos.principal + hive-metastore/_HOST@EXAMPLE.COM + The service principal for the metastore thrift server. The special string _HOST will be replaced automatically with the correct host name. + + + hive.optimize.reducededuplication true Remove extra map-reduce jobs if the data is already clustered by the same key which needs to be used again. This should always be set to true. Since it is a new feature, it has been made configurable. diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 9c409a9..0259b3c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -57,9 +57,12 @@ import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -75,11 +78,12 @@ import com.facebook.fb303.fb_status; * TODO:pc remove application logic to a separate interface. */ public class HiveMetaStore extends ThriftHiveMetastore { + public static final Log LOG = LogFactory.getLog( + HiveMetaStore.class); public static class HMSHandler extends FacebookBase implements ThriftHiveMetastore.Iface { - public static final Log LOG = LogFactory.getLog(HiveMetaStore.class - .getName()); + public static final Log LOG = HiveMetaStore.LOG; private static boolean createDefaultDB = false; private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, @@ -2122,17 +2126,31 @@ public class HiveMetaStore extends ThriftHiveMetastore { int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE); + boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); - FacebookService.Processor processor = new ThriftHiveMetastore.Processor( + TProcessor processor = new ThriftHiveMetastore.Processor( handler); + + TTransportFactory transFactory; + if (useSasl) { + HadoopThriftAuthBridge.Server authBridge = ShimLoader.getHadoopThriftAuthBridge().createServer( + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); + transFactory = authBridge.createTransportFactory(); + processor = authBridge.wrapProcessor(processor); + } else { + transFactory = new TTransportFactory(); + } + + TThreadPoolServer.Options options = new TThreadPoolServer.Options(); options.minWorkerThreads = minWorkerThreads; options.maxWorkerThreads = maxWorkerThreads; TServer server = new TThreadPoolServer(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), + transFactory, transFactory, new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); HMSHandler.LOG.info("Started the new metaserver on port [" + port + "]..."); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index c13f45c..5e66251 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -45,6 +46,8 @@ import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -63,6 +66,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient { private final boolean standAloneClient = false; private final HiveMetaHookLoader hookLoader; + private HiveConf conf; + // for thrift connects private int retries = 5; @@ -80,6 +85,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient { if (conf == null) { conf = new HiveConf(HiveMetaStoreClient.class); } + this.conf = conf; boolean localMetaStore = conf.getBoolean("hive.metastore.local", false); @@ -167,18 +173,41 @@ public class HiveMetaStoreClient implements IMetaStoreClient { } private void openStore(URI store) throws MetaException { - open = false; - transport = new TSocket(store.getHost(), store.getPort()); - ((TSocket) transport).setTimeout(20000); - TProtocol protocol = new TBinaryProtocol(transport); - client = new ThriftHiveMetastore.Client(protocol); - for (int i = 0; i < retries && !open; ++i) { + + open = false; + transport = new TSocket(store.getHost(), store.getPort()); + ((TSocket) transport).setTimeout(20000); + + // Wrap thrift connection with SASL if enabled. + boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); + if (useSasl) { + try { + HadoopThriftAuthBridge.Client authBridge = + ShimLoader.getHadoopThriftAuthBridge().createClient(); + String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); + transport = authBridge.createClientTransport( + principalConfig, store.getHost(), "KERBEROS", + transport); + } catch (IOException ioe) { + LOG.error("Couldn't create client transport", ioe); + throw new MetaException(ioe.toString()); + } + } + + TProtocol protocol = new TBinaryProtocol(transport); + client = new ThriftHiveMetastore.Client(protocol); + try { transport.open(); open = true; } catch (TTransportException e) { - LOG.warn("failed to connect to MetaStore, re-trying..."); + if (LOG.isDebugEnabled()) { + LOG.warn("failed to connect to MetaStore, re-trying...", e); + } else { + // Don't print full exception trace if DEBUG is not on. + LOG.warn("failed to connect to MetaStore, re-trying..."); + } try { Thread.sleep(1000); } catch (InterruptedException ignore) { diff --git shims/build.xml shims/build.xml index 161578a..eedca2d 100644 --- shims/build.xml +++ shims/build.xml @@ -32,6 +32,7 @@ to call at top-level: ant deploy-contrib compile-core-test + diff --git shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java new file mode 100644 index 0000000..d549014 --- /dev/null +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.thrift; + +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import javax.security.sasl.SaslException; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.hadoop.security.*; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + + +/** + * Functions that bridge Thrift's SASL transports to Hadoop's + * SASL callback handlers and authentication classes. + */ +public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { + static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); + + @Override + public Client createClient() { + return new Client(); + } + + @Override + public Server createServer(String keytabFile, String principalConf) throws TTransportException { + return new Server(keytabFile, principalConf); + } + + public static class Client extends HadoopThriftAuthBridge.Client { + /** + * Create a client-side SASL transport that wraps an underlying transport. + * + * @param method The authentication method to use. Currently only KERBEROS is + * supported. + * @param serverPrincipal The Kerberos principal of the target server. + * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + */ + @Override + public TTransport createClientTransport( + String principalConfig, String host, + String methodStr, TTransport underlyingTransport) + throws IOException { + AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + + switch (method) { + case KERBEROS: + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + try { + TTransport saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + names[0], names[1], + SaslRpcServer.SASL_PROPS, null, + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + } catch (SaslException se) { + throw new IOException("Could not instantiate SASL transport", se); + } + + default: + throw new IOException("Unsupported authentication method: " + method); + } + } + } + + public static class Server extends HadoopThriftAuthBridge.Server { + private final UserGroupInformation realUgi; + + /** + * TODO: javadoc + */ + private Server(String keytabFile, String principalConf) + throws TTransportException { + if (keytabFile == null || keytabFile.isEmpty()) { + throw new TTransportException("No keytab specified"); + } + if (principalConf == null || principalConf.isEmpty()) { + throw new TTransportException("No principal specified"); + } + + // Login from the keytab + UserGroupInformation ugi; + String kerberosName; + try { + kerberosName = SecurityUtil.getServerPrincipal( + principalConf, null); + realUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI( + kerberosName, keytabFile); + assert realUgi.isFromKeytab(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + + /** + * Create a TTransportFactory that, upon connection of a client socket, + * negotiates a Kerberized SASL transport. The resulting TTransportFactory + * can be passed as both the input and output transport factory when + * instantiating a TThreadPoolServer, for example. + * + */ + @Override + public TTransportFactory createTransportFactory() throws TTransportException + { + // Parse out the kerberos principal, host, realm. + String kerberosName = realUgi.getUserName(); + final String names[] = SaslRpcServer.splitKerberosName(kerberosName); + if (names.length != 3) { + throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); + } + + TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); + transFactory.addServerDefinition( + AuthMethod.KERBEROS.getMechanismName(), + names[0], names[1], // two parts of kerberos principal + SaslRpcServer.SASL_PROPS, + new SaslRpcServer.SaslGssCallbackHandler()); + + return new TUGIAssumingTransportFactory(transFactory, realUgi); + } + + /** + * Wrap a TProcessor in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + @Override + public TProcessor wrapProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor); + } + + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. + * + * This is used on the server side to set the UGI for each specific call. + */ + private class TUGIAssumingProcessor implements TProcessor { + final TProcessor wrapped; + + TUGIAssumingProcessor(TProcessor wrapped) { + this.wrapped = wrapped; + } + + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " + trans.getClass()); + } + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + String authId = saslTrans.getSaslServer().getAuthorizationID(); + System.err.println("AUTH ID ======>" + authId); + + UserGroupInformation clientUgi = UserGroupInformation.createProxyUser( + authId, realUgi); + + try { + return clientUgi.doAs(new PrivilegedExceptionAction() { + public Boolean run() { + try { + return wrapped.process(inProt, outProt); + } catch (TException te) { + throw new RuntimeException(te); + } + } + }); + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + } + } + + } + + /** + * A TransportFactory that wraps another one, but assumes a specified UGI + * before calling through. + * + * This is used on the server side to assume the server's Principal when accepting + * clients. + */ + private static class TUGIAssumingTransportFactory extends TTransportFactory { + private final UserGroupInformation ugi; + private final TTransportFactory wrapped; + + public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { + assert wrapped != null; + assert ugi != null; + + this.wrapped = wrapped; + this.ugi = ugi; + } + + @Override + public TTransport getTransport(final TTransport trans) { + return ugi.doAs(new PrivilegedAction() { + public TTransport run() { + return wrapped.getTransport(trans); + } + }); + } + } + + /** + * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient + * inside open(). So, we need to assume the correct UGI when the transport is opened + * so that the SASL mechanisms have access to the right principal. This transport + * wraps the Sasl transports to set up the right UGI context for open(). + * + * This is used on the client side, where the API explicitly opens a transport to + * the server. + */ + private static class TUGIAssumingTransport extends TFilterTransport { + private final UserGroupInformation ugi; + + public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) { + super(wrapped); + this.ugi = ugi; + } + + @Override + public void open() throws TTransportException { + try { + ugi.doAs(new PrivilegedExceptionAction() { + public Void run() { + try { + wrapped.open(); + } catch (TTransportException tte) { + // Wrap the transport exception in an RTE, since UGI.doAs() then goes + // and unwraps this for us out of the doAs block. We then unwrap one + // more time in our catch clause to get back the TTE. (ugh) + throw new RuntimeException(tte); + } + return null; + } + }); + } catch (IOException ioe) { + assert false : "Never thrown!"; + throw new RuntimeException("Received an ioe we never threw!", ioe); + } catch (InterruptedException ie) { + assert false : "We never expect to see an InterruptedException thrown in this block"; + throw new RuntimeException("Received an ie we never threw!", ie); + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TTransportException) { + throw (TTransportException)rte.getCause(); + } else { + throw rte; + } + } + } + } + + /** + * Transport that simply wraps another transport. + * This is the equivalent of FilterInputStream for Thrift transports. + */ + private static class TFilterTransport extends TTransport { + protected final TTransport wrapped; + + public TFilterTransport(TTransport wrapped) { + this.wrapped = wrapped; + } + + @Override + public void open() throws TTransportException { + wrapped.open(); + } + + @Override + public boolean isOpen() { + return wrapped.isOpen(); + } + + @Override + public boolean peek() { + return wrapped.peek(); + } + + @Override + public void close() { + wrapped.close(); + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + return wrapped.read(buf, off, len); + } + + @Override + public int readAll(byte[] buf, int off, int len) throws TTransportException { + return wrapped.readAll(buf, off, len); + } + + @Override + public void write(byte[] buf) throws TTransportException { + wrapped.write(buf); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + wrapped.write(buf, off, len); + } + + @Override + public void flush() throws TTransportException { + wrapped.flush(); + } + + @Override + public byte[] getBuffer() { + return wrapped.getBuffer(); + } + + @Override + public int getBufferPosition() { + return wrapped.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return wrapped.getBytesRemainingInBuffer(); + } + + @Override + public void consumeBuffer(int len) { + wrapped.consumeBuffer(len); + } + } +} diff --git shims/src/0.20S/java/org/apache/thrift/EncodingUtils.java shims/src/0.20S/java/org/apache/thrift/EncodingUtils.java new file mode 100644 index 0000000..072de93 --- /dev/null +++ shims/src/0.20S/java/org/apache/thrift/EncodingUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +/** + * Utility methods for use when encoding/decoding raw data as byte arrays. + */ +public class EncodingUtils { + + /** + * Encode integer as a series of 4 bytes into buf + * starting at position 0 within that buffer. + * + * @param integer + * The integer to encode. + * @param buf + * The buffer to write to. + */ + public static final void encodeBigEndian(final int integer, final byte[] buf) { + encodeBigEndian(integer, buf, 0); + } + + /** + * Encode integer as a series of 4 bytes into buf + * starting at position offset. + * + * @param integer + * The integer to encode. + * @param buf + * The buffer to write to. + * @param offset + * The offset within buf to start the encoding. + */ + public static final void encodeBigEndian(final int integer, final byte[] buf, int offset) { + buf[offset] = (byte) (0xff & (integer >> 24)); + buf[offset + 1] = (byte) (0xff & (integer >> 16)); + buf[offset + 2] = (byte) (0xff & (integer >> 8)); + buf[offset + 3] = (byte) (0xff & (integer)); + } + + /** + * Decode a series of 4 bytes from buf, starting at position 0, + * and interpret them as an integer. + * + * @param buf + * The buffer to read from. + * @return An integer, as read from the buffer. + */ + public static final int decodeBigEndian(final byte[] buf) { + return decodeBigEndian(buf, 0); + } + + /** + * Decode a series of 4 bytes from buf, start at + * offset, and interpret them as an integer. + * + * @param buf + * The buffer to read from. + * @param offset + * The offset with buf to start the decoding. + * @return An integer, as read from the buffer. + */ + public static final int decodeBigEndian(final byte[] buf, int offset) { + return ((buf[offset] & 0xff) << 24) | ((buf[offset + 1] & 0xff) << 16) + | ((buf[offset + 2] & 0xff) << 8) | ((buf[offset + 3] & 0xff)); + } + +} diff --git shims/src/0.20S/java/org/apache/thrift/package-info.java shims/src/0.20S/java/org/apache/thrift/package-info.java new file mode 100644 index 0000000..5051dd0 --- /dev/null +++ shims/src/0.20S/java/org/apache/thrift/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive is currently built against Thrift 0.4.0, which does not include these + * SASL transports. Once we upgrade to Thrift 0.5.0 or later, this subtree + * should be removed. + */ +package org.apache.thrift; diff --git shims/src/0.20S/java/org/apache/thrift/transport/TSaslClientTransport.java shims/src/0.20S/java/org/apache/thrift/transport/TSaslClientTransport.java new file mode 100644 index 0000000..8c1d0e5 --- /dev/null +++ shims/src/0.20S/java/org/apache/thrift/transport/TSaslClientTransport.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.util.Map; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.thrift.EncodingUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wraps another Thrift TTransport, but performs SASL client + * negotiation on the call to open(). This class will wrap ensuing + * communication over it, if a SASL QOP is negotiated with the other party. + */ +public class TSaslClientTransport extends TSaslTransport { + + private static final Logger LOGGER = LoggerFactory.getLogger(TSaslClientTransport.class); + + /** + * The name of the mechanism this client supports. + */ + private final String mechanism; + + /** + * Uses the given SaslClient. + * + * @param saslClient + * The SaslClient to use for the subsequent SASL + * negotiation. + * @param transport + * Transport underlying this one. + */ + public TSaslClientTransport(SaslClient saslClient, TTransport transport) { + super(saslClient, transport); + mechanism = saslClient.getMechanismName(); + } + + /** + * Creates a SaslClient using the given SASL-specific parameters. + * See the Java documentation for Sasl.createSaslClient for the + * details of the parameters. + * + * @param transport + * The underlying Thrift transport. + * @throws SaslException + */ + public TSaslClientTransport(String mechanism, String authorizationId, String protocol, + String serverName, Map props, CallbackHandler cbh, TTransport transport) + throws SaslException { + super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName, + props, cbh), transport); + this.mechanism = mechanism; + } + + + @Override + protected SaslRole getRole() { + return SaslRole.CLIENT; + } + + /** + * Performs the client side of the initial portion of the Thrift SASL + * protocol. Generates and sends the initial response to the server, including + * which mechanism this client wants to use. + */ + @Override + protected void handleSaslStartMessage() throws TTransportException, SaslException { + SaslClient saslClient = getSaslClient(); + + byte[] initialResponse = new byte[0]; + if (saslClient.hasInitialResponse()) + initialResponse = saslClient.evaluateChallenge(initialResponse); + + LOGGER.debug("Sending mechanism name {} and initial response of length {}", mechanism, + initialResponse.length); + + byte[] mechanismBytes = mechanism.getBytes(); + sendSaslMessage(NegotiationStatus.START, + mechanismBytes); + // Send initial response + sendSaslMessage(saslClient.isComplete() ? NegotiationStatus.COMPLETE : NegotiationStatus.OK, + initialResponse); + underlyingTransport.flush(); + } +} diff --git shims/src/0.20S/java/org/apache/thrift/transport/TSaslServerTransport.java shims/src/0.20S/java/org/apache/thrift/transport/TSaslServerTransport.java new file mode 100644 index 0000000..8abcf36 --- /dev/null +++ shims/src/0.20S/java/org/apache/thrift/transport/TSaslServerTransport.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wraps another Thrift TTransport, but performs SASL server + * negotiation on the call to open(). This class will wrap ensuing + * communication over it, if a SASL QOP is negotiated with the other party. + */ +public class TSaslServerTransport extends TSaslTransport { + + private static final Logger LOGGER = LoggerFactory.getLogger(TSaslServerTransport.class); + + /** + * Mapping from SASL mechanism name -> all the parameters required to + * instantiate a SASL server. + */ + private Map serverDefinitionMap = new HashMap(); + + /** + * Contains all the parameters used to define a SASL server implementation. + */ + private static class TSaslServerDefinition { + public String mechanism; + public String protocol; + public String serverName; + public Map props; + public CallbackHandler cbh; + + public TSaslServerDefinition(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + this.mechanism = mechanism; + this.protocol = protocol; + this.serverName = serverName; + this.props = props; + this.cbh = cbh; + } + } + + /** + * Uses the given underlying transport. Assumes that addServerDefinition is + * called later. + * + * @param transport + * Transport underlying this one. + */ + public TSaslServerTransport(TTransport transport) { + super(transport); + } + + /** + * Creates a SaslServer using the given SASL-specific parameters. + * See the Java documentation for Sasl.createSaslServer for the + * details of the parameters. + * + * @param transport + * The underlying Thrift transport. + */ + public TSaslServerTransport(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh, TTransport transport) { + super(transport); + addServerDefinition(mechanism, protocol, serverName, props, cbh); + } + + private TSaslServerTransport(Map serverDefinitionMap, TTransport transport) { + super(transport); + this.serverDefinitionMap.putAll(serverDefinitionMap); + } + + /** + * Add a supported server definition to this transport. See the Java + * documentation for Sasl.createSaslServer for the details of the + * parameters. + */ + public void addServerDefinition(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + serverDefinitionMap.put(mechanism, new TSaslServerDefinition(mechanism, protocol, serverName, + props, cbh)); + } + + @Override + protected SaslRole getRole() { + return SaslRole.SERVER; + } + + /** + * Performs the server side of the initial portion of the Thrift SASL protocol. + * Receives the initial response from the client, creates a SASL server using + * the mechanism requested by the client (if this server supports it), and + * sends the first challenge back to the client. + */ + @Override + protected void handleSaslStartMessage() throws TTransportException, SaslException { + SaslResponse message = receiveSaslMessage(); + + LOGGER.debug("Received start message with status {}", message.status); + if (message.status != NegotiationStatus.START) { + sendAndThrowMessage(NegotiationStatus.ERROR, "Expecting START status, received " + message.status); + } + + // Get the mechanism name. + String mechanismName = new String(message.payload); + TSaslServerDefinition serverDefinition = serverDefinitionMap.get(mechanismName); + LOGGER.debug("Received mechanism name '{}'", mechanismName); + + if (serverDefinition == null) { + sendAndThrowMessage(NegotiationStatus.BAD, "Unsupported mechanism type " + mechanismName); + } + SaslServer saslServer = Sasl.createSaslServer(serverDefinition.mechanism, + serverDefinition.protocol, serverDefinition.serverName, serverDefinition.props, + serverDefinition.cbh); + setSaslServer(saslServer); + } + + /** + * TTransportFactory to create + * TSaslServerTransports. Ensures that a given + * underlying TTransport instance receives the same + * TSaslServerTransport. This is kind of an awful hack to work + * around the fact that Thrift is designed assuming that + * TTransport instances are stateless, and thus the existing + * TServers use different TTransport instances for + * input and output. + */ + public static class Factory extends TTransportFactory { + + /** + * This is the implementation of the awful hack described above. + * WeakHashMap is used to ensure that we don't leak memory. + */ + private static Map transportMap = + Collections.synchronizedMap(new WeakHashMap()); + + /** + * Mapping from SASL mechanism name -> all the parameters required to + * instantiate a SASL server. + */ + private Map serverDefinitionMap = new HashMap(); + + /** + * Create a new Factory. Assumes that addServerDefinition will + * be called later. + */ + public Factory() { + super(); + } + + /** + * Create a new Factory, initially with the single server + * definition given. You may still call addServerDefinition + * later. See the Java documentation for Sasl.createSaslServer + * for the details of the parameters. + */ + public Factory(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + super(); + addServerDefinition(mechanism, protocol, serverName, props, cbh); + } + + /** + * Add a supported server definition to the transports created by this + * factory. See the Java documentation for + * Sasl.createSaslServer for the details of the parameters. + */ + public void addServerDefinition(String mechanism, String protocol, String serverName, + Map props, CallbackHandler cbh) { + serverDefinitionMap.put(mechanism, new TSaslServerDefinition(mechanism, protocol, serverName, + props, cbh)); + } + + /** + * Get a new TSaslServerTransport instance, or reuse the + * existing one if a TSaslServerTransport has already been + * created before using the given TTransport as an underlying + * transport. This ensures that a given underlying transport instance + * receives the same TSaslServerTransport. + */ + @Override + public TTransport getTransport(TTransport base) { + TSaslServerTransport ret = transportMap.get(base); + if (ret == null) { + LOGGER.debug("transport map does not contain key", base); + ret = new TSaslServerTransport(serverDefinitionMap, base); + try { + ret.open(); + } catch (TTransportException e) { + LOGGER.debug("failed to open server transport", e); + throw new RuntimeException(e); + } + transportMap.put(base, ret); + } else { + LOGGER.debug("transport map does contain key {}", base); + } + return ret; + } + } +} diff --git shims/src/0.20S/java/org/apache/thrift/transport/TSaslTransport.java shims/src/0.20S/java/org/apache/thrift/transport/TSaslTransport.java new file mode 100644 index 0000000..2ff8403 --- /dev/null +++ shims/src/0.20S/java/org/apache/thrift/transport/TSaslTransport.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TByteArrayOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A superclass for SASL client/server thrift transports. A subclass need only + * implement the open method. + */ +abstract class TSaslTransport extends TTransport { + + private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class); + + protected static final int DEFAULT_MAX_LENGTH = 0x7FFFFFFF; + + protected static final int MECHANISM_NAME_BYTES = 1; + protected static final int STATUS_BYTES = 1; + protected static final int PAYLOAD_LENGTH_BYTES = 4; + + protected static enum SaslRole { + SERVER, CLIENT; + } + + /** + * Status bytes used during the initial Thrift SASL handshake. + */ + protected static enum NegotiationStatus { + START((byte)0x01), + OK((byte)0x02), + BAD((byte)0x03), + ERROR((byte)0x04), + COMPLETE((byte)0x05); + + private final byte value; + + private static final Map reverseMap = + new HashMap(); + static { + for (NegotiationStatus s : NegotiationStatus.class.getEnumConstants()) { + reverseMap.put(s.getValue(), s); + } + } + + private NegotiationStatus(byte val) { + this.value = val; + } + + public byte getValue() { + return value; + } + + public static NegotiationStatus byValue(byte val) { + return reverseMap.get(val); + } + } + + /** + * Transport underlying this one. + */ + protected TTransport underlyingTransport; + + /** + * Either a SASL client or a SASL server. + */ + private SaslParticipant sasl; + + /** + * Whether or not we should wrap/unwrap reads/writes. Determined by whether or + * not a QOP is negotiated during the SASL handshake. + */ + private boolean shouldWrap = false; + + /** + * Buffer for input. + */ + private TMemoryInputTransport readBuffer = new TMemoryInputTransport(); + + /** + * Buffer for output. + */ + private final TByteArrayOutputStream writeBuffer = new TByteArrayOutputStream(1024); + + /** + * Create a TSaslTransport. It's assumed that setSaslServer will be called + * later to initialize the SASL endpoint underlying this transport. + * + * @param underlyingTransport + * The thrift transport which this transport is wrapping. + */ + protected TSaslTransport(TTransport underlyingTransport) { + this.underlyingTransport = underlyingTransport; + } + + /** + * Create a TSaslTransport which acts as a client. + * + * @param saslClient + * The SaslClient which this transport will use for SASL + * negotiation. + * @param underlyingTransport + * The thrift transport which this transport is wrapping. + */ + protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) { + sasl = new SaslParticipant(saslClient); + this.underlyingTransport = underlyingTransport; + } + + protected void setSaslServer(SaslServer saslServer) { + sasl = new SaslParticipant(saslServer); + } + + // Used to read the status byte and payload length. + private final byte[] messageHeader = new byte[STATUS_BYTES + PAYLOAD_LENGTH_BYTES]; + + /** + * Send a complete Thrift SASL message. + * + * @param status + * The status to send. + * @param payload + * The data to send as the payload of this message. + * @throws TTransportException + */ + protected void sendSaslMessage(NegotiationStatus status, byte[] payload) throws TTransportException { + if (payload == null) + payload = new byte[0]; + + messageHeader[0] = status.getValue(); + EncodingUtils.encodeBigEndian(payload.length, messageHeader, STATUS_BYTES); + + LOGGER.debug(getRole() + ": Writing message with status {} and payload length {}", + status, payload.length); + underlyingTransport.write(messageHeader); + underlyingTransport.write(payload); + underlyingTransport.flush(); + } + + /** + * Read a complete Thrift SASL message. + * + * @return The SASL status and payload from this message. + * @throws TTransportException + * Thrown if there is a failure reading from the underlying + * transport, or if a status code of BAD or ERROR is encountered. + */ + protected SaslResponse receiveSaslMessage() throws TTransportException { + underlyingTransport.readAll(messageHeader, 0, messageHeader.length); + + byte statusByte = messageHeader[0]; + byte[] payload = new byte[EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES)]; + underlyingTransport.readAll(payload, 0, payload.length); + + NegotiationStatus status = NegotiationStatus.byValue(statusByte); + if (status == null) { + sendAndThrowMessage(NegotiationStatus.ERROR, "Invalid status " + statusByte); + } else if (status == NegotiationStatus.BAD || status == NegotiationStatus.ERROR) { + try { + String remoteMessage = new String(payload, "UTF-8"); + throw new TTransportException("Peer indicated failure: " + remoteMessage); + } catch (UnsupportedEncodingException e) { + throw new TTransportException(e); + } + } + + LOGGER.debug(getRole() + ": Received message with status {} and payload length {}", + status, payload.length); + return new SaslResponse(status, payload); + } + + /** + * Send a Thrift SASL message with the given status (usaully BAD or ERROR) and + * string message, and then throw a TTransportException with the given + * message. + * + * @param status + * The Thrift SASL status code to send. Usually BAD or ERROR. + * @param message + * The optional message to send to the other side. + * @throws TTransportException + * Always thrown with the message provided. + */ + protected void sendAndThrowMessage(NegotiationStatus status, String message) throws TTransportException { + try { + sendSaslMessage(status, message.getBytes()); + } catch (Exception e) { + LOGGER.warn("Could not send failure response", e); + message += "\nAlso, could not send response: " + e.toString(); + } + throw new TTransportException(message); + } + + /** + * Implemented by subclasses to start the Thrift SASL handshake process. When + * this method completes, the SaslParticipant in this class is + * assumed to be initialized. + * + * @throws TTransportException + * @throws SaslException + */ + abstract protected void handleSaslStartMessage() throws TTransportException, SaslException; + + protected abstract SaslRole getRole(); + + /** + * Opens the underlying transport if it's not already open and then performs + * SASL negotiation. If a QOP is negoiated during this SASL handshake, it used + * for all communication on this transport after this call is complete. + */ + @Override + public void open() throws TTransportException { + LOGGER.debug("opening transport {}", this); + if (sasl != null && sasl.isComplete()) + throw new TTransportException("SASL transport already open"); + + if (!underlyingTransport.isOpen()) + underlyingTransport.open(); + + try { + // Negotiate a SASL mechanism. The client also sends its + // initial response, or an empty one. + handleSaslStartMessage(); + LOGGER.debug("{}: Start message handled", getRole()); + + SaslResponse message = null; + while (!sasl.isComplete()) { + message = receiveSaslMessage(); + if (message.status != NegotiationStatus.COMPLETE && + message.status != NegotiationStatus.OK) { + throw new TTransportException("Expected COMPLETE or OK, got " + message.status); + } + + byte[] challenge = sasl.evaluateChallengeOrResponse(message.payload); + + // If we are the client, and the server indicates COMPLETE, we don't need to + // send back any further response. + if (message.status == NegotiationStatus.COMPLETE && + getRole() == SaslRole.CLIENT) { + LOGGER.debug("{}: All done!", getRole()); + break; + } + + sendSaslMessage(sasl.isComplete() ? NegotiationStatus.COMPLETE : NegotiationStatus.OK, + challenge); + } + LOGGER.debug("{}: Main negotiation loop complete", getRole()); + + assert sasl.isComplete(); + + // If we're the client, and we're complete, but the server isn't + // complete yet, we need to wait for its response. This will occur + // with ANONYMOUS auth, for example, where we send an initial response + // and are immediately complete. + if (getRole() == SaslRole.CLIENT && + (message == null || message.status == NegotiationStatus.OK)) { + LOGGER.debug("{}: SASL Client receiving last message", getRole()); + message = receiveSaslMessage(); + if (message.status != NegotiationStatus.COMPLETE) { + throw new TTransportException( + "Expected SASL COMPLETE, but got " + message.status); + } + } + } catch (SaslException e) { + try { + sendAndThrowMessage(NegotiationStatus.BAD, e.getMessage()); + } finally { + underlyingTransport.close(); + } + } + + String qop = (String) sasl.getNegotiatedProperty(Sasl.QOP); + if (qop != null && !qop.equalsIgnoreCase("auth")) + shouldWrap = true; + } + + /** + * Get the underlying SaslClient. + * + * @return The SaslClient, or null if this transport + * is backed by a SaslServer. + */ + public SaslClient getSaslClient() { + return sasl.saslClient; + } + + /** + * Get the underlying SaslServer. + * + * @return The SaslServer, or null if this transport + * is backed by a SaslClient. + */ + public SaslServer getSaslServer() { + return sasl.saslServer; + } + + /** + * Read a 4-byte word from the underlying transport and interpret it as an + * integer. + * + * @return The length prefix of the next SASL message to read. + * @throws TTransportException + * Thrown if reading from the underlying transport fails. + */ + protected int readLength() throws TTransportException { + byte[] lenBuf = new byte[4]; + underlyingTransport.readAll(lenBuf, 0, lenBuf.length); + return EncodingUtils.decodeBigEndian(lenBuf); + } + + /** + * Write the given integer as 4 bytes to the underlying transport. + * + * @param length + * The length prefix of the next SASL message to write. + * @throws TTransportException + * Thrown if writing to the underlying transport fails. + */ + protected void writeLength(int length) throws TTransportException { + byte[] lenBuf = new byte[4]; + TFramedTransport.encodeFrameSize(length, lenBuf); + underlyingTransport.write(lenBuf); + } + + // Below is the SASL implementation of the TTransport interface. + + /** + * Closes the underlying transport and disposes of the SASL implementation + * underlying this transport. + */ + @Override + public void close() { + underlyingTransport.close(); + try { + sasl.dispose(); + } catch (SaslException e) { + // Not much we can do here. + } + } + + /** + * True if the underlying transport is open and the SASL handshake is + * complete. + */ + @Override + public boolean isOpen() { + return underlyingTransport.isOpen() && sasl != null && sasl.isComplete(); + } + + /** + * Read from the underlying transport. Unwraps the contents if a QOP was + * negotiated during the SASL handshake. + */ + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + if (!isOpen()) + throw new TTransportException("SASL authentication not complete"); + + int got = readBuffer.read(buf, off, len); + if (got > 0) { + return got; + } + + // Read another frame of data + try { + readFrame(); + } catch (SaslException e) { + throw new TTransportException(e); + } + + return readBuffer.read(buf, off, len); + } + + /** + * Read a single frame of data from the underlying transport, unwrapping if + * necessary. + * + * @throws TTransportException + * Thrown if there's an error reading from the underlying transport. + * @throws SaslException + * Thrown if there's an error unwrapping the data. + */ + private void readFrame() throws TTransportException, SaslException { + int dataLength = readLength(); + + if (dataLength < 0) + throw new TTransportException("Read a negative frame size (" + dataLength + ")!"); + + byte[] buff = new byte[dataLength]; + LOGGER.debug("{}: reading data length: {}", getRole(), dataLength); + underlyingTransport.readAll(buff, 0, dataLength); + if (shouldWrap) { + buff = sasl.unwrap(buff, 0, buff.length); + LOGGER.debug("data length after unwrap: {}", buff.length); + } + readBuffer.reset(buff); + } + + /** + * Write to the underlying transport. + */ + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + if (!isOpen()) + throw new TTransportException("SASL authentication not complete"); + + writeBuffer.write(buf, off, len); + } + + /** + * Flushes to the underlying transport. Wraps the contents if a QOP was + * negotiated during the SASL handshake. + */ + @Override + public void flush() throws TTransportException { + byte[] buf = writeBuffer.get(); + int dataLength = writeBuffer.len(); + writeBuffer.reset(); + + if (shouldWrap) { + LOGGER.debug("data length before wrap: {}", dataLength); + try { + buf = sasl.wrap(buf, 0, dataLength); + } catch (SaslException e) { + throw new TTransportException(e); + } + dataLength = buf.length; + } + LOGGER.debug("writing data length: {}", dataLength); + writeLength(dataLength); + underlyingTransport.write(buf, 0, dataLength); + underlyingTransport.flush(); + } + + /** + * Used exclusively by readSaslMessage to return both a status and data. + */ + protected static class SaslResponse { + public NegotiationStatus status; + public byte[] payload; + + public SaslResponse(NegotiationStatus status, byte[] payload) { + this.status = status; + this.payload = payload; + } + } + + /** + * Used to abstract over the SaslServer and + * SaslClient classes, which share a lot of their interface, but + * unfortunately don't share a common superclass. + */ + private static class SaslParticipant { + // One of these will always be null. + public SaslServer saslServer; + public SaslClient saslClient; + + public SaslParticipant(SaslServer saslServer) { + this.saslServer = saslServer; + } + + public SaslParticipant(SaslClient saslClient) { + this.saslClient = saslClient; + } + + public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException { + if (saslClient != null) { + return saslClient.evaluateChallenge(challengeOrResponse); + } else { + return saslServer.evaluateResponse(challengeOrResponse); + } + } + + public boolean isComplete() { + if (saslClient != null) + return saslClient.isComplete(); + else + return saslServer.isComplete(); + } + + public void dispose() throws SaslException { + if (saslClient != null) + saslClient.dispose(); + else + saslServer.dispose(); + } + + public byte[] unwrap(byte[] buf, int off, int len) throws SaslException { + if (saslClient != null) + return saslClient.unwrap(buf, off, len); + else + return saslServer.unwrap(buf, off, len); + } + + public byte[] wrap(byte[] buf, int off, int len) throws SaslException { + if (saslClient != null) + return saslClient.wrap(buf, off, len); + else + return saslServer.wrap(buf, off, len); + } + + public Object getNegotiatedProperty(String propName) { + if (saslClient != null) + return saslClient.getNegotiatedProperty(propName); + else + return saslServer.getNegotiatedProperty(propName); + } + } +} diff --git shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java index 6eb4768..30a2567 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; /** * ShimLoader. @@ -81,10 +82,23 @@ public abstract class ShimLoader { return jettyShims; } + public static synchronized HadoopThriftAuthBridge getHadoopThriftAuthBridge() { + if ("0.20S".equals(getMajorVersion())) { + return createShim("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S", + HadoopThriftAuthBridge.class); + } else { + return new HadoopThriftAuthBridge(); + } + } + @SuppressWarnings("unchecked") private static T loadShims(Map classMap, Class xface) { String vers = getMajorVersion(); String className = classMap.get(vers); + return createShim(className, xface); + } + + private static T createShim(String className, Class xface) { try { Class clazz = Class.forName(className); return xface.cast(clazz.newInstance()); diff --git shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java new file mode 100644 index 0000000..3524802 --- /dev/null +++ shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.TProcessor; + +/** + * This class is only overridden by the secure hadoop shim. It allows + * the Thrift SASL support to bridge to Hadoop's UserGroupInformation + * infrastructure. + */ +public class HadoopThriftAuthBridge { + public Client createClient() { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public Server createServer(String keytabFile, String principalConf) + throws TTransportException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + + public static abstract class Client { + public abstract TTransport createClientTransport( + String principalConfig, String host, + String methodStr, TTransport underlyingTransport) + throws IOException; + } + + public static abstract class Server { + public abstract TTransportFactory createTransportFactory() throws TTransportException; + public abstract TProcessor wrapProcessor(TProcessor processor); + } +} \ No newline at end of file