Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 1379615) +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy) @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -1029,4 +1030,36 @@ throw new MetaException(rawStoreClassName + " class not found"); } } + + /** + * Create an object of the given class. + * @param theClass + * @param parameterTypes + * an array of parameterTypes for the constructor + * @param initargs + * the list of arguments for the constructor + */ + public static T newInstance(Class theClass, Class[] parameterTypes, + Object[] initargs) { + // Perform some sanity checks on the arguments. + if (parameterTypes.length != initargs.length) { + throw new IllegalArgumentException( + "Number of constructor parameter types doesn't match number of arguments"); + } + for (int i = 0; i < parameterTypes.length; i++) { + Class clazz = parameterTypes[i]; + if (!(clazz.isInstance(initargs[i]))) { + throw new IllegalArgumentException("Object : " + initargs[i] + + " is not an instance of " + clazz); + } + } + + try { + Constructor meth = theClass.getDeclaredConstructor(parameterTypes); + meth.setAccessible(true); + return meth.newInstance(initargs); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate " + theClass.getName(), e); + } + } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java (working copy) @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.metastore; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; + +public class RetryingMetaStoreClient implements InvocationHandler { + + private static final Log LOG = LogFactory.getLog(RetryingMetaStoreClient.class.getName()); + private static final int RETRY_LIMIT = 1; + + private final IMetaStoreClient base; + private final HiveConf hiveConf; + + protected RetryingMetaStoreClient(HiveConf hiveConf, HiveMetaHookLoader hookLoader, + Class msClientClass) throws MetaException { + this.hiveConf = hiveConf; + this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(msClientClass, new Class[] { + HiveConf.class, HiveMetaHookLoader.class}, new Object[] {hiveConf, hookLoader}); + } + + public static IMetaStoreClient getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader, + String mscClassName) throws MetaException { + + Class baseClass = (Class) + MetaStoreUtils.getClass(mscClassName); + + RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, hookLoader, baseClass); + + return (IMetaStoreClient) Proxy.newProxyInstance(RetryingMetaStoreClient.class.getClassLoader(), + baseClass.getInterfaces(), handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Object ret = null; + int retriesMade = 0; + Exception caughtException = null; + while (true) { + try { + ret = method.invoke(base, args); + break; + } catch (UndeclaredThrowableException e) { + throw e.getCause(); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof TException) { + caughtException = e; + } else { + throw e.getCause(); + } + } + + if (retriesMade >= RETRY_LIMIT) { + throw caughtException; + } + retriesMade++; + LOG.error("MetaStoreClient lost connection. Attempting to reconnect."); + base.connect(); + } + return ret; + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision 1379615) +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (working copy) @@ -50,6 +50,11 @@ * merged */ public interface IMetaStoreClient { + + /** + * @return True if this MetaStoreClient was able to successfully connect to the MetaStore. + */ + public boolean connect(); public void close(); Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 1379615) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import javax.security.auth.login.LoginException; @@ -75,6 +76,8 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import com.facebook.fb303.fb_status; + /** * Hive Metastore Client. */ @@ -160,6 +163,49 @@ } /** + * Swaps the first element of the metastoreUris array with a random element from the + * remainder of the array. + */ + private void promoteRandomMetaStoreURI() { + if (metastoreUris.length <= 1) { + return; + } + Random rng = new Random(); + int index = rng.nextInt(metastoreUris.length - 1) + 1; + URI tmp = metastoreUris[0]; + metastoreUris[0] = metastoreUris[index]; + metastoreUris[index] = tmp; + } + + public boolean isConnected() { + try { + return (client.getStatus() == fb_status.ALIVE); + } catch (Exception e) { + LOG.error("Unable to query status of MetaStore Client", e); + return false; + } + } + + public boolean connect() { + if (localMetaStore) { + // For direct DB connections we don't yet support reestablishing connections. + return isConnected(); + } else { + try { + // Swap the first element of the metastoreUris[] with a random element from the rest + // of the array. Rationale being that this method will generally be called when the default + // connection has died and the default connection is likely to be the first array element. + promoteRandomMetaStoreURI(); + open(); + return true; + } catch (Exception e) { + LOG.error("Unable to connect with MetaStore", e); + return false; + } + } + } + + /** * @param dbname * @param tbl_name * @param new_tbl Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1379615) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -240,6 +240,10 @@ METASTOREURIS("hive.metastore.uris", ""), // Number of times to retry a connection to a Thrift metastore server METASTORETHRIFTRETRIES("hive.metastore.connect.retries", 5), + // The IMetaStoreClient implementation to use. + METASTORE_CLIENT_CLS("hive.metastore.client.class", + "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"), + // Number of seconds the client should wait between connection attempts METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", 1), // Socket timeout for the client connection (in seconds) @@ -314,7 +318,7 @@ // Parameters to copy over when creating a table with Create Table Like. DDL_CTL_PARAMETERS_WHITELIST("hive.ddl.createtablelike.properties.whitelist", ""), METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", - "org.apache.hadoop.hive.metastore.ObjectStore"), + "org.apache.hadoop.hive.metastore.ObjectStore"), METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"), METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass", @@ -330,7 +334,7 @@ CLIPRINTCURRENTDB("hive.cli.print.current.db", false), HIVE_METASTORE_FS_HANDLER_CLS("hive.metastore.fs.handler.class", "org.apache.hadoop.hive.metastore.HiveMetaStoreFsImpl"), - + // Things we log in the jobconf // session identifier Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1379615) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -2090,7 +2091,8 @@ } } }; - return new HiveMetaStoreClient(conf, hookLoader); + return RetryingMetaStoreClient.getProxy(conf, hookLoader, + conf.get(HiveConf.ConfVars.METASTORE_CLIENT_CLS.varname)); } /** @@ -2101,7 +2103,7 @@ private IMetaStoreClient getMSC() throws MetaException { if (metaStoreClient == null) { metaStoreClient = createMetaStoreClient(); - } + } return metaStoreClient; }