Index: metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (revision 1098374) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.commands.MetaStoreCommand; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege; import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; @@ -67,6 +68,8 @@ */ public abstract void rollbackTransaction(); + public abstract T execute(MetaStoreCommand cmd) throws Exception; + public abstract void createDatabase(Database db) throws InvalidObjectException, MetaException; Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1098374) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -21,7 +21,6 @@ import static org.apache.commons.lang.StringUtils.join; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; -import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; import java.io.IOException; import java.net.InetAddress; @@ -65,6 +64,8 @@ import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.commands.DatabaseCreateCmd; +import org.apache.hadoop.hive.metastore.commands.MetaStoreCommand; import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege; @@ -117,8 +118,18 @@ new ThreadLocal() { @Override protected synchronized RawStore initialValue() { - return null; + LOG.info(addPrefix("Opening raw store with implemenation class:" + + rawStoreClassName)); + RawStore ms; + try { + ms = (RawStore) ReflectionUtils.newInstance(HMSHandler.this.getClass(rawStoreClassName, + RawStore.class), getConf()); + } catch (MetaException e) { + throw new RuntimeException(e); } + ms.setConf(getConf()); + return ms; + } }; // Thread local configuration is needed as many threads could make changes @@ -127,7 +138,7 @@ new ThreadLocal() { @Override protected synchronized Configuration initialValue() { - return null; + return new Configuration(hiveConf); } }; @@ -166,9 +177,9 @@ // The next serial number to be assigned private boolean checkForDefaultDb; private static int nextSerialNum = 0; - private static ThreadLocal threadLocalId = new ThreadLocal() { + private static ThreadLocal threadLocalId = new ThreadLocal() { @Override - protected synchronized Object initialValue() { + protected synchronized Integer initialValue() { return new Integer(nextSerialNum++); } }; @@ -243,7 +254,7 @@ return true; } - private String addPrefix(String s) { + public static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } @@ -328,13 +339,8 @@ } private Configuration getConf() { - Configuration conf = threadLocalConf.get(); - if (conf == null) { - conf = new Configuration(hiveConf); - threadLocalConf.set(conf); + return threadLocalConf.get(); } - return conf; - } /** * Get a cached RawStore. @@ -343,20 +349,11 @@ * @throws MetaException */ private RawStore getMS(boolean reloadConf) throws MetaException { - RawStore ms = threadLocalMS.get(); - if (ms == null) { - LOG.info(addPrefix("Opening raw store with implemenation class:" - + rawStoreClassName)); - ms = (RawStore) ReflectionUtils.newInstance(getClass(rawStoreClassName, - RawStore.class), getConf()); - threadLocalMS.set(ms); - ms = threadLocalMS.get(); - } + RawStore ms = threadLocalMS.get(); if (reloadConf) { ms.setConf(getConf()); } - return ms; } @@ -526,29 +523,6 @@ System.exit(0); } - private void create_database_core(RawStore ms, final Database db) - throws AlreadyExistsException, InvalidObjectException, MetaException, - IOException { - if (!validateName(db.getName())) { - throw new InvalidObjectException(db.getName() + " is not a valid database name"); - } - boolean success = false; - try { - ms.openTransaction(); - if (null == db.getLocationUri()) { - db.setLocationUri(wh.getDefaultDatabasePath(db.getName()).toString()); - } - ms.createDatabase(db); - success = ms.commitTransaction(); - } finally { - if (!success) { - ms.rollbackTransaction(); - } else { - wh.mkdirs(new Path(db.getLocationUri())); - } - } - } - public void create_database(final Database db) throws AlreadyExistsException, InvalidObjectException, MetaException { startFunction("create_database", ": " @@ -563,13 +537,8 @@ } catch (NoSuchObjectException e) { // expected } - executeWithRetry(new Command() { - @Override - Boolean run(RawStore ms) throws Exception { - create_database_core(ms, db); - return Boolean.TRUE; - } - }); + MetaStoreCommand cmd = new DatabaseCreateCmd(db, wh); + threadLocalMS.get().execute(cmd); } catch (AlreadyExistsException e) { throw e; } catch (InvalidObjectException e) { Index: metastore/src/java/org/apache/hadoop/hive/metastore/commands/MetaStoreCommand.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/commands/MetaStoreCommand.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/commands/MetaStoreCommand.java (revision 0) @@ -0,0 +1,8 @@ +package org.apache.hadoop.hive.metastore.commands; + +import org.apache.hadoop.hive.metastore.RawStore; + +public interface MetaStoreCommand { + + public abstract T run(RawStore rs) throws Exception; +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/commands/DatabaseCreateCmd.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/commands/DatabaseCreateCmd.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/commands/DatabaseCreateCmd.java (revision 0) @@ -0,0 +1,44 @@ +package org.apache.hadoop.hive.metastore.commands; + +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; + +public class DatabaseCreateCmd implements MetaStoreCommand { + + Database db; + Warehouse wh; + + public DatabaseCreateCmd(Database database, Warehouse whouse){ + db = database; + wh = whouse; + } + + @Override + public Boolean run(RawStore ms) throws Exception{ + if (!validateName(db.getName())) { + throw new InvalidObjectException(db.getName() + " is not a valid database name"); + } + boolean success = false; + try { + ms.openTransaction(); + if (null == db.getLocationUri()) { + db.setLocationUri(wh.getDefaultDatabasePath(db.getName()).toString()); + } + ms.createDatabase(db); + success = ms.commitTransaction(); + } finally { + if (!success) { + ms.rollbackTransaction(); + } else { + wh.mkdirs(new Path(db.getLocationUri())); + } + return success; + } + } + +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 1098374) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.commands.MetaStoreCommand; +import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MFieldSchema; @@ -89,6 +92,7 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.ANTLRNoCaseStringStream; import org.apache.hadoop.hive.metastore.parser.FilterLexer; import org.apache.hadoop.hive.metastore.parser.FilterParser; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; /** @@ -137,6 +141,9 @@ return hiveConf; } + private int retryInterval; + private int retryLimit; + /** * Called whenever this object is instantiated using ReflectionUils, and also * on connection retries. In cases of connection retries, conf will usually @@ -170,6 +177,11 @@ initialize(propsFromConf); + retryInterval = HiveConf.getIntVar(conf, + HiveConf.ConfVars.METASTOREINTERVAL); + retryLimit = HiveConf.getIntVar(conf, + HiveConf.ConfVars.METASTOREATTEMPTS); + if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); @@ -3357,4 +3369,104 @@ return mSecurityColumnList; } + @Override + public T execute(MetaStoreCommand cmd) throws Exception{ + + boolean gotNewConnectUrl = false; + int retryCount = 0; + Exception caughtException; + while(true) { + try { + if(gotNewConnectUrl) { + this.setConf(getConf()); } + + return cmd.run(this); + } catch (Exception e) { + caughtException = e; + } + + if (retryCount >= retryLimit) { + throw caughtException; + } + + assert(retryInterval >= 0); + retryCount++; + LOG.error( + String.format( + "JDO datastore error. Retrying metastore command " + + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); + Thread.sleep(retryInterval); + // If we have a connection error, the JDO connection URL hook might + // provide us with a new URL to access the datastore. + String lastUrl = getConnectionURL(getConf()); + gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); + } + } + + private JDOConnectionURLHook urlHook = null; + private String urlHookClassName = ""; + + /** + * Updates the connection URL in hiveConf using the hook + * @return true if a new connection URL was loaded into the thread local + * configuration + */ + private boolean updateConnectionURL(Configuration conf, String badUrl) + throws MetaException { + String connectUrl = null; + String currentUrl = getConnectionURL(getConf()); + try { + // We always call init because the hook name in the configuration could + // have changed. + initConnectionUrlHook(); + if (urlHook != null) { + if (badUrl != null) { + urlHook.notifyBadConnectionUrl(badUrl); + } + connectUrl = urlHook.getJdoConnectionUrl(conf); + } + } catch (Exception e) { + LOG.error("Exception while getting connection URL from the hook: " + + e); + } + + if (connectUrl != null && !connectUrl.equals(currentUrl)) { + LOG.error(HiveMetaStore.HMSHandler.addPrefix( + String.format("Overriding %s with %s", + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl))); + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl); + return true; + } + return false; + } + + private static String getConnectionURL(Configuration conf) { + return conf.get( + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), ""); + } + + // Multiple threads could try to initialize at the same time. + synchronized private void initConnectionUrlHook() + throws ClassNotFoundException { + + String className = + getConf().get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim(); + if (className.equals("")) { + urlHookClassName = ""; + urlHook = null; + return; + } + boolean urlHookChanged = !urlHookClassName.equals(className); + if (urlHook == null || urlHookChanged) { + urlHookClassName = className.trim(); + + Class urlHookClass = Class.forName(urlHookClassName, true, + JavaUtils.getClassLoader()); + urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null); + } + return; + } +}