Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1037331) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -151,6 +151,9 @@ "_INTERMEDIATE_ARCHIVED"), METASTORE_INT_EXTRACTED("hive.metastore.archive.intermediate.extracted", "_INTERMEDIATE_EXTRACTED"), + METASTORE_KERBEROS_KEYTAB_FILE("hive.metastore.kerberos.keytab.file", ""), + METASTORE_KERBEROS_PRINCIPAL("hive.metastore.kerberos.principal", ""), + METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled", false), // Default parameters for creating tables NEWTABLEDEFAULTPARA("hive.table.parameters.default",""), Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1037331) +++ conf/hive-default.xml (working copy) @@ -608,6 +608,24 @@ + hive.metastore.sasl.enabled + false + If true, the metastore thrift interface will be secured with SASL. Clients must authenticate with Kerberos. + + + + hive.metastore.kerberos.keytab.file + + The path to the Kerberos Keytab file containing the metastore thrift server's service principal. + + + + hive.metastore.kerberos.principal + hive-metastore/_HOST@EXAMPLE.COM + The service principal for the metastore thrift server. The special string _HOST will be replaced automatically with the correct host name. + + + hive.optimize.reducededuplication true Remove extra map-reduce jobs if the data is already clustered by the same key which needs to be used again. This should always be set to true. Since it is a new feature, it has been made configurable. Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1037331) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -57,9 +57,12 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -68,18 +71,18 @@ import org.apache.thrift.transport.TTransportFactory; import com.facebook.fb303.FacebookBase; -import com.facebook.fb303.FacebookService; import com.facebook.fb303.fb_status; /** * TODO:pc remove application logic to a separate interface. */ public class HiveMetaStore extends ThriftHiveMetastore { + public static final Log LOG = LogFactory.getLog( + HiveMetaStore.class); public static class HMSHandler extends FacebookBase implements ThriftHiveMetastore.Iface { - public static final Log LOG = LogFactory.getLog(HiveMetaStore.class - .getName()); + public static final Log LOG = HiveMetaStore.LOG; private static boolean createDefaultDB = false; private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, @@ -2185,17 +2188,28 @@ int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE); + boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); - FacebookService.Processor processor = new ThriftHiveMetastore.Processor( - handler); + TProcessor processor = new ThriftHiveMetastore.Processor(handler); + TTransportFactory transFactory; + if (useSasl) { + HadoopThriftAuthBridge.Server authBridge = ShimLoader.getHadoopThriftAuthBridge().createServer( + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_KEYTAB_FILE), + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL)); + transFactory = authBridge.createTransportFactory(); + processor = authBridge.wrapProcessor(processor); + } else { + transFactory = new TTransportFactory(); + } + TThreadPoolServer.Options options = new TThreadPoolServer.Options(); options.minWorkerThreads = minWorkerThreads; options.maxWorkerThreads = maxWorkerThreads; TServer server = new TThreadPoolServer(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), + transFactory, transFactory, new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), options); HMSHandler.LOG.info("Started the new metaserver on port [" + port + "]..."); Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 1037331) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -20,6 +20,7 @@ import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -45,6 +46,8 @@ import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -62,6 +65,7 @@ private URI metastoreUris[]; private final boolean standAloneClient = false; private final HiveMetaHookLoader hookLoader; + private final HiveConf conf; // for thrift connects private int retries = 5; @@ -80,8 +84,8 @@ if (conf == null) { conf = new HiveConf(HiveMetaStoreClient.class); } + this.conf = conf; - boolean localMetaStore = conf.getBoolean("hive.metastore.local", false); if (localMetaStore) { // instantiate the metastore server handler directly instead of connecting @@ -167,18 +171,41 @@ } private void openStore(URI store) throws MetaException { - open = false; - transport = new TSocket(store.getHost(), store.getPort()); - ((TSocket) transport).setTimeout(20000); - TProtocol protocol = new TBinaryProtocol(transport); - client = new ThriftHiveMetastore.Client(protocol); for (int i = 0; i < retries && !open; ++i) { - try { + open = false; + transport = new TSocket(store.getHost(), store.getPort()); + ((TSocket) transport).setTimeout(20000); + + // Wrap thrift connection with SASL if enabled. + boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); + if (useSasl) { + try { + HadoopThriftAuthBridge.Client authBridge = + ShimLoader.getHadoopThriftAuthBridge().createClient(); + String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); + transport = authBridge.createClientTransport( + principalConfig, store.getHost(), "KERBEROS",transport); + } catch (IOException ioe) { + LOG.error("Couldn't create client transport", ioe); + throw new MetaException(ioe.toString()); + } + } + + TProtocol protocol = new TBinaryProtocol(transport); + client = new ThriftHiveMetastore.Client(protocol); + + try { transport.open(); open = true; } catch (TTransportException e) { + if (LOG.isDebugEnabled()) { + LOG.warn("failed to connect to MetaStore, re-trying...", e); + } else { + // Don't print full exception trace if DEBUG is not on. LOG.warn("failed to connect to MetaStore, re-trying..."); + } + try { Thread.sleep(1000); } catch (InterruptedException ignore) { Index: shims/build.xml =================================================================== --- shims/build.xml (revision 1037331) +++ shims/build.xml (working copy) @@ -32,6 +32,7 @@ + Index: shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java =================================================================== --- shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 0) +++ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (revision 0) @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + + import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + + + /** + * Functions that bridge Thrift's SASL transports to Hadoop's + * SASL callback handlers and authentication classes. + */ + public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge { + static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class); + + @Override + public Client createClient() { + return new Client(); + } + + @Override + public Server createServer(String keytabFile, String principalConf) throws TTransportException { + return new Server(keytabFile, principalConf); + } + + public static class Client extends HadoopThriftAuthBridge.Client { + /** + * Create a client-side SASL transport that wraps an underlying transport. + * + * @param method The authentication method to use. Currently only KERBEROS is + * supported. + * @param serverPrincipal The Kerberos principal of the target server. + * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + */ + @Override + public TTransport createClientTransport( + String principalConfig, String host, + String methodStr, TTransport underlyingTransport) + throws IOException { + AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + + switch (method) { + case KERBEROS: + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + try { + TTransport saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + names[0], names[1], + SaslRpcServer.SASL_PROPS, null, + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + } catch (SaslException se) { + throw new IOException("Could not instantiate SASL transport", se); + } + + default: + throw new IOException("Unsupported authentication method: " +method); + } + } + } + + public static class Server extends HadoopThriftAuthBridge.Server { + private final UserGroupInformation realUgi; + + /** + * TODO: javadoc + */ + private Server(String keytabFile, String principalConf) + throws TTransportException { + if (keytabFile == null || keytabFile.isEmpty()) { + throw new TTransportException("No keytab specified"); + } + if (principalConf == null || principalConf.isEmpty()) { + throw new TTransportException("No principal specified"); + } + + // Login from the keytab + String kerberosName; + try { + kerberosName = SecurityUtil.getServerPrincipal( + principalConf, null); + UserGroupInformation.loginUserFromKeytab( + kerberosName, keytabFile); + realUgi = UserGroupInformation.getLoginUser(); + assert realUgi.isFromKeytab(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + + /** + * Create a TTransportFactory that, upon connection of a client socket, + * negotiates a Kerberized SASL transport. The resulting TTransportFactory + * can be passed as both the input and output transport factory when + * instantiating a TThreadPoolServer, for example. + * + */ + @Override + public TTransportFactory createTransportFactory() throws TTransportException + { + // Parse out the kerberos principal, host, realm. + String kerberosName = realUgi.getUserName(); + final String names[] = SaslRpcServer.splitKerberosName(kerberosName); + if (names.length != 3) { + throw new TTransportException("Kerberos principal should have 3 parts: " +kerberosName); + } + + TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); + transFactory.addServerDefinition( + AuthMethod.KERBEROS.getMechanismName(), + names[0], names[1], // two parts of kerberos principal + SaslRpcServer.SASL_PROPS, + new SaslRpcServer.SaslGssCallbackHandler()); + + return new TUGIAssumingTransportFactory(transFactory, realUgi); + } + + /** + * Wrap a TProcessor in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + @Override + public TProcessor wrapProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor); + } + + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. + * + * This is used on the server side to set the UGI for each specific call. + */ + private class TUGIAssumingProcessor implements TProcessor { + final TProcessor wrapped; + + TUGIAssumingProcessor(TProcessor wrapped) { + this.wrapped = wrapped; + } + + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " +trans.getClass()); + } + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + String authId = saslTrans.getSaslServer().getAuthorizationID(); + + try { + UserGroupInformation clientUgi = UserGroupInformation.createProxyUser( + authId, UserGroupInformation.getLoginUser()); + return clientUgi.doAs(new PrivilegedExceptionAction() { + public Boolean run() { + try { + return wrapped.process(inProt, outProt); + } catch (TException te) { + throw new RuntimeException(te); + } + } + }); + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + } + } + + } + + /** + * A TransportFactory that wraps another one, but assumes a specified UGI + * before calling through. + * + * This is used on the server side to assume the server's Principal when accepting + * clients. + */ + private static class TUGIAssumingTransportFactory extends TTransportFactory { + private final UserGroupInformation ugi; + private final TTransportFactory wrapped; + + public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { + assert wrapped != null; + assert ugi != null; + + this.wrapped = wrapped; + this.ugi = ugi; + } + + @Override + public TTransport getTransport(final TTransport trans) { + return ugi.doAs(new PrivilegedAction() { + public TTransport run() { + return wrapped.getTransport(trans); + } + }); + } + } + + /** + * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient + * inside open(). So, we need to assume the correct UGI when the transport is opened + * so that the SASL mechanisms have access to the right principal. This transport + * wraps the Sasl transports to set up the right UGI context for open(). + * + * This is used on the client side, where the API explicitly opens a transport to + * the server. + */ + private static class TUGIAssumingTransport extends TFilterTransport { + private final UserGroupInformation ugi; + + public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) { + super(wrapped); + this.ugi = ugi; + } + + @Override + public void open() throws TTransportException { + try { + ugi.doAs(new PrivilegedExceptionAction() { + public Void run() { + try { + wrapped.open(); + } catch (TTransportException tte) { + // Wrap the transport exception in an RTE, since UGI.doAs() then goes + // and unwraps this for us out of the doAs block. We then unwrap one + // more time in our catch clause to get back the TTE. (ugh) + throw new RuntimeException(tte); + } + return null; + } + }); + } catch (IOException ioe) { + assert false : "Never thrown!"; + throw new RuntimeException("Received an ioe we never threw!", ioe); + } catch (InterruptedException ie) { + assert false : "We never expect to see an InterruptedException thrown in this block"; + throw new RuntimeException("Received an ie we never threw!", ie); + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TTransportException) { + throw (TTransportException)rte.getCause(); + } else { + throw rte; + } + } + } + } + + /** + * Transport that simply wraps another transport. + * This is the equivalent of FilterInputStream for Thrift transports. + */ + private static class TFilterTransport extends TTransport { + protected final TTransport wrapped; + + public TFilterTransport(TTransport wrapped) { + this.wrapped = wrapped; + } + + @Override + public void open() throws TTransportException { + wrapped.open(); + } + + @Override + public boolean isOpen() { + return wrapped.isOpen(); + } + + @Override + public boolean peek() { + return wrapped.peek(); + } + + @Override + public void close() { + wrapped.close(); + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + return wrapped.read(buf, off, len); + } + + @Override + public int readAll(byte[] buf, int off, int len) throws TTransportException { + return wrapped.readAll(buf, off, len); + } + + @Override + public void write(byte[] buf) throws TTransportException { + wrapped.write(buf); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + wrapped.write(buf, off, len); + } + + @Override + public void flush() throws TTransportException { + wrapped.flush(); + } + + @Override + public byte[] getBuffer() { + return wrapped.getBuffer(); + } + + @Override + public int getBufferPosition() { + return wrapped.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return wrapped.getBytesRemainingInBuffer(); + } + + @Override + public void consumeBuffer(int len) { + wrapped.consumeBuffer(len); + } + } + } Index: shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java (revision 1037331) +++ shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java (working copy) @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.VersionInfo; /** @@ -75,10 +76,23 @@ return jettyShims; } + public static synchronized HadoopThriftAuthBridge getHadoopThriftAuthBridge() { + if ("0.20S".equals(getMajorVersion())) { + return createShim("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S", + HadoopThriftAuthBridge.class); + } else { + return new HadoopThriftAuthBridge(); + } + } + @SuppressWarnings("unchecked") private static T loadShims(Map classMap, Class xface) { String vers = getMajorVersion(); String className = classMap.get(vers); + return createShim(className, xface); + } + + private static T createShim(String className, Class xface) { try { Class clazz = Class.forName(className); return xface.cast(clazz.newInstance()); Index: shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (revision 0) +++ shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hive.thrift; + + import java.io.IOException; + +import org.apache.thrift.TProcessor; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + + /** + * This class is only overridden by the secure hadoop shim. It allows + * the Thrift SASL support to bridge to Hadoop's UserGroupInformation + * infrastructure. + */ + public class HadoopThriftAuthBridge { + public Client createClient() { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + public Server createServer(String keytabFile, String principalConf) + throws TTransportException { + throw new UnsupportedOperationException( + "The current version of Hadoop does not support Authentication"); + } + + + public static abstract class Client { + public abstract TTransport createClientTransport( + String principalConfig, String host, + String methodStr, TTransport underlyingTransport) + throws IOException; + } + + public static abstract class Server { + public abstract TTransportFactory createTransportFactory() throws TTransportException; + public abstract TProcessor wrapProcessor(TProcessor processor); + } + } +