Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 5533) +++ conf/hive-default.xml (working copy) @@ -485,4 +485,22 @@ Whether sorting is enforced. If true, while inserting into the table, sorting is enforced. + + hive.metastore.ds.connection.url.hook + + Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used + + + + hive.metastore.ds.retry.attempts + 1 + The number of times to retry a metastore call if there were a connection error + + + + hive.metastore.ds.retry.interval + 1000 + The number of miliseconds between metastore retry attempts + + Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 5533) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; @@ -46,6 +47,7 @@ import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -76,14 +78,26 @@ private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, // right now they come from jpox.properties + private Warehouse wh; // hdfs warehouse - private final ThreadLocal threadLocalMS = new ThreadLocal() { + private final ThreadLocal threadLocalMS = + new ThreadLocal() { @Override - protected synchronized Object initialValue() { + protected synchronized RawStore initialValue() { return null; } }; + // Thread local configuration is needed as many threads could make changes + // to the conf using the connection hook + private final ThreadLocal threadLocalConf = + new ThreadLocal() { + @Override + protected synchronized Configuration initialValue() { + return null; + } + }; + // The next serial number to be assigned private boolean checkForDefaultDb; private static int nextSerialNum = 0; @@ -94,6 +108,12 @@ } }; + // Used for retrying JDO calls on datastore failures + private int retryInterval = 0; + private int retryLimit = 0; + private JDOConnectionURLHook urlHook = null; + private String urlHookClassName = ""; + public static Integer get() { return threadLocalId.get(); } @@ -128,48 +148,240 @@ alterHandler = (AlterHandler) ReflectionUtils.newInstance(getClass( alterHandlerName, AlterHandler.class), hiveConf); wh = new Warehouse(hiveConf); + + retryInterval = HiveConf.getIntVar(hiveConf, + HiveConf.ConfVars.METASTOREINTERVAL); + retryLimit = HiveConf.getIntVar(hiveConf, + HiveConf.ConfVars.METASTOREATTEMPTS); + // Using the hook on startup ensures that the hook always has priority + // over settings in *.xml. We can use hiveConf as only a single thread + // will be calling the constructor. + updateConnectionURL(hiveConf, null); + createDefaultDB(); return true; } + private String addPrefix(String s) { + return threadLocalId.get() + ": " + s; + } + /** + * A Command is a closure used to pass a block of code from individual + * functions to executeWithRetry, which centralizes connection error + * handling. Command is parameterized on the return type of the function. + * + * The general transformation is: + * + * From: + * String foo(int a) throws ExceptionB { + * + * } + * + * To: + * String foo(final int a) throws ExceptionB { + * String ret = null; + * try { + * ret = executeWithRetry(new Command() { + * String run(RawStore ms) { + * + * } + * } + * } catch (ExceptionB e) { + * throw e; + * } catch (Exception e) { + * // Since run is only supposed to throw ExceptionB it could only + * // be a runtime exception + * throw (RuntimeException)e; + * } + * } + * + * The catch blocks are used to ensure that the exceptions thrown by the + * follow the function definition. + */ + private static class Command { + T run(RawStore ms) throws Exception { + return null; + } + } + + private T executeWithRetry(Command cmd) throws Exception { + T ret = null; + + boolean gotNewConnectUrl = false; + boolean reloadConf = HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.METASTOREFORCERELOADCONF); + + if (reloadConf) { + updateConnectionURL(getConf(), null); + } + + int retryCount = 0; + Exception caughtException = null; + while(true) { + try { + RawStore ms = getMS(reloadConf || gotNewConnectUrl); + ret = cmd.run(ms); + break; + } catch (javax.jdo.JDOFatalDataStoreException e) { + caughtException = e; + } catch (javax.jdo.JDODataStoreException e) { + caughtException = e; + } + + if (retryCount >= retryLimit) { + throw caughtException; + } + + assert(retryInterval >= 0); + retryCount++; + LOG.error( + String.format( + "JDO datastore error. Retrying metastore command " + + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); + Thread.sleep(retryInterval); + // If we have a connection error, the JDO connection URL hook might + // provide us with a new URL to access the datastore. + String lastUrl = getConnectionURL(getConf()); + gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); + } + return ret; + } + + private Configuration getConf() { + Configuration conf = threadLocalConf.get(); + if (conf == null) { + conf = new Configuration(hiveConf); + threadLocalConf.set(conf); + } + return conf; + } + + /** + * Get a cached RawStore. + * * @return * @throws MetaException */ - private RawStore getMS() throws MetaException { + private RawStore getMS(boolean reloadConf) throws MetaException { RawStore ms = threadLocalMS.get(); if (ms == null) { - LOG.info(threadLocalId.get() - + ": Opening raw store with implemenation class:" - + rawStoreClassName); + LOG.info(addPrefix("Opening raw store with implemenation class:" + + rawStoreClassName)); ms = (RawStore) ReflectionUtils.newInstance(getClass(rawStoreClassName, - RawStore.class), hiveConf); + RawStore.class), getConf()); threadLocalMS.set(ms); ms = threadLocalMS.get(); } + + if (reloadConf) { + ms.setConf(getConf()); + } + return ms; } /** - * create default database if it doesn't exist - * - * @throws MetaException + * Updates the connection URL in hiveConf using the hook + * @return true if a new connection URL was loaded into the thread local + * configuration */ - private void createDefaultDB() throws MetaException { - if (HMSHandler.createDefaultDB || !checkForDefaultDb) { + private boolean updateConnectionURL(Configuration conf, String badUrl) + throws MetaException { + String connectUrl = null; + String currentUrl = getConnectionURL(conf); + try { + // We always call init because the hook name in the configuration could + // have changed. + initConnectionUrlHook(); + if (urlHook != null) { + if (badUrl != null) { + urlHook.notifyBadConnectionUrl(badUrl); + } + connectUrl = urlHook.getJdoConnectionUrl(hiveConf); + } + } catch (Exception e) { + LOG.error("Exception while getting connection URL from the hook: " + + e); + } + + if (connectUrl != null && !connectUrl.equals(currentUrl)) { + LOG.error(addPrefix( + String.format("Overriding %s with %s", + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl))); + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl); + return true; + } + return false; + } + + private static String getConnectionURL(Configuration conf) { + return conf.get( + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),""); + } + + // Multiple threads could try to initialize at the same time. + synchronized private void initConnectionUrlHook() + throws ClassNotFoundException { + + String className = + hiveConf.get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim(); + if (className.equals("")){ + urlHookClassName = ""; + urlHook = null; return; } + boolean urlHookChanged = !urlHookClassName.equals(className); + if (urlHook == null || urlHookChanged) { + urlHookClassName = className.trim(); + + Class urlHookClass = Class.forName(urlHookClassName, true, + JavaUtils.getClassLoader()); + urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null); + } + return; + } + + private void createDefaultDB_core(RawStore ms) throws MetaException { try { - getMS().getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME); + ms.getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME); } catch (NoSuchObjectException e) { - getMS().createDatabase( + ms.createDatabase( new Database(MetaStoreUtils.DEFAULT_DATABASE_NAME, wh .getDefaultDatabasePath(MetaStoreUtils.DEFAULT_DATABASE_NAME) .toString())); } HMSHandler.createDefaultDB = true; } + /** + * create default database if it doesn't exist + * + * @throws MetaException + */ + private void createDefaultDB() throws MetaException { + if (HMSHandler.createDefaultDB || !checkForDefaultDb) { + return; + } + try { + executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + createDefaultDB_core(ms); + return Boolean.TRUE; + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + + } + private Class getClass(String rawStoreClassName, Class class1) throws MetaException { try { @@ -196,58 +408,91 @@ @Override public void shutdown() { logStartFunction("Shutting down the object store..."); - try { - if (threadLocalMS.get() != null) { - getMS().shutdown(); - } - } catch (MetaException e) { - LOG.error("unable to shutdown metastore", e); + RawStore ms = threadLocalMS.get(); + if (ms != null) { + ms.shutdown(); } System.exit(0); } - public boolean create_database(String name, String location_uri) - throws AlreadyExistsException, MetaException { - incrementCounter("create_database"); - logStartFunction("create_database: " + name); + private boolean create_database_core(RawStore ms, final String name, + final String location_uri) throws AlreadyExistsException, MetaException { boolean success = false; try { - getMS().openTransaction(); + ms.openTransaction(); Database db = new Database(name, location_uri); - if (getMS().createDatabase(db) + if (ms.createDatabase(db) && wh.mkdirs(wh.getDefaultDatabasePath(name))) { - success = getMS().commitTransaction(); + success = ms.commitTransaction(); } } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); } } return success; } - public Database get_database(String name) throws NoSuchObjectException, + public boolean create_database(final String name, final String location_uri) + throws AlreadyExistsException, MetaException { + incrementCounter("create_database"); + logStartFunction("create_database: " + name); + + Boolean ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + boolean success = create_database_core(ms, name, location_uri); + return Boolean.valueOf(success); + } + }); + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + + return ret.booleanValue(); + } + + public Database get_database(final String name) throws NoSuchObjectException, MetaException { incrementCounter("get_database"); logStartFunction("get_database: " + name); - return getMS().getDatabase(name); + + Database db = null; + try { + db = executeWithRetry(new Command() { + @Override + Database run(RawStore ms) throws Exception { + return ms.getDatabase(name); + } + }); + } catch (MetaException e) { + throw e; + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return db; } - public boolean drop_database(String name) throws MetaException { - incrementCounter("drop_database"); - logStartFunction("drop_database: " + name); - if (name.equalsIgnoreCase(MetaStoreUtils.DEFAULT_DATABASE_NAME)) { - throw new MetaException("Can't drop default database"); - } + private boolean drop_database_core(RawStore ms, final String name) throws MetaException { boolean success = false; try { - getMS().openTransaction(); - if (getMS().dropDatabase(name)) { - success = getMS().commitTransaction(); + ms.openTransaction(); + if (ms.dropDatabase(name)) { + success = ms.commitTransaction(); } } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); } else { wh.deleteDir(wh.getDefaultDatabasePath(name), true); // it is not a terrible thing even if the data is not deleted @@ -256,13 +501,53 @@ return success; } + public boolean drop_database(final String name) throws MetaException { + incrementCounter("drop_database"); + logStartFunction("drop_database: " + name); + if (name.equalsIgnoreCase(MetaStoreUtils.DEFAULT_DATABASE_NAME)) { + throw new MetaException("Can't drop default database"); + } + + Boolean ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + boolean success = drop_database_core(ms, name); + return Boolean.valueOf(success); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret.booleanValue(); + } + public List get_databases() throws MetaException { incrementCounter("get_databases"); logStartFunction("get_databases"); - return getMS().getDatabases(); + + List ret = null; + try { + ret = executeWithRetry(new Command>() { + @Override + List run(RawStore ms) throws Exception { + return ms.getDatabases(); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } - public boolean create_type(Type type) throws AlreadyExistsException, + public boolean create_type(final Type type) throws AlreadyExistsException, MetaException, InvalidObjectException { incrementCounter("create_type"); logStartFunction("create_type: " + type.getName()); @@ -272,21 +557,70 @@ + " already exists"); } - // TODO:pc Validation of types should be done by clients or here???? - return getMS().createType(type); + Boolean ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + // TODO:pc Validation of types should be done by clients or here???? + return Boolean.valueOf(ms.createType(type)); + } + }); + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (InvalidObjectException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + + return ret.booleanValue(); } - public Type get_type(String name) throws MetaException { + public Type get_type(final String name) throws MetaException { incrementCounter("get_type"); logStartFunction("get_type: " + name); - return getMS().getType(name); + + Type ret; + try { + ret = executeWithRetry(new Command() { + @Override + Type run(RawStore ms) throws Exception { + return ms.getType(name); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } - public boolean drop_type(String name) throws MetaException { + public boolean drop_type(final String name) throws MetaException { incrementCounter("drop_type"); logStartFunction("drop_type: " + name); - // TODO:pc validate that there are no types that refer to this - return getMS().dropType(name); + + Boolean ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + // TODO:pc validate that there are no types that refer to this + return ms.dropType(name); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } public Map get_type_all(String name) throws MetaException { @@ -296,11 +630,8 @@ throw new MetaException("Not yet implemented"); } - public void create_table(Table tbl) throws AlreadyExistsException, - MetaException, InvalidObjectException { - incrementCounter("create_table"); - logStartFunction("create_table: db=" + tbl.getDbName() + " tbl=" - + tbl.getTableName()); + private void create_table_core(final RawStore ms, final Table tbl) + throws AlreadyExistsException, MetaException, InvalidObjectException { if (!MetaStoreUtils.validateName(tbl.getTableName()) || !MetaStoreUtils.validateColNames(tbl.getSd().getCols()) @@ -313,7 +644,7 @@ Path tblPath = null; boolean success = false, madeDir = false; try { - getMS().openTransaction(); + ms.openTransaction(); if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { if (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { @@ -326,7 +657,6 @@ } tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); } - tbl.getSd().setLocation(tblPath.toString()); } @@ -340,7 +670,7 @@ if (!wh.isDir(tblPath)) { if (!wh.mkdirs(tblPath)) { throw new MetaException(tblPath - + " is not a directory or unable to create one"); + + " is not a directory or unable to create one"); } madeDir = true; } @@ -351,12 +681,12 @@ tbl.setCreateTime((int) time); tbl.putToParameters(Constants.DDL_TIME, Long.toString(time)); - getMS().createTable(tbl); - success = getMS().commitTransaction(); + ms.createTable(tbl); + success = ms.commitTransaction(); } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); if (madeDir) { wh.deleteDir(tblPath, true); } @@ -364,6 +694,31 @@ } } + public void create_table(final Table tbl) throws AlreadyExistsException, + MetaException, InvalidObjectException { + incrementCounter("create_table"); + logStartFunction("create_table: db=" + tbl.getDbName() + " tbl=" + + tbl.getTableName()); + try { + executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + create_table_core(ms, tbl); + return Boolean.TRUE; + } + }); + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (InvalidObjectException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + } + public boolean is_table_exists(String dbname, String name) throws MetaException { try { @@ -373,17 +728,17 @@ } } - public void drop_table(String dbname, String name, boolean deleteData) + private void drop_table_core(final RawStore ms, final String dbname, + final String name, final boolean deleteData) throws NoSuchObjectException, MetaException { - incrementCounter("drop_table"); - logStartFunction("drop_table", dbname, name); + boolean success = false; boolean isExternal = false; Path tblPath = null; Table tbl = null; isExternal = false; try { - getMS().openTransaction(); + ms.openTransaction(); // drop any partitions tbl = get_table(dbname, name); if (tbl == null) { @@ -396,14 +751,14 @@ if (tbl.getSd().getLocation() != null) { tblPath = new Path(tbl.getSd().getLocation()); } - if (!getMS().dropTable(dbname, name)) { + if (!ms.dropTable(dbname, name)) { throw new MetaException("Unable to drop table"); } tbl = null; // table collections disappear after dropping - success = getMS().commitTransaction(); + success = ms.commitTransaction(); } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); } else if (deleteData && (tblPath != null) && !isExternal) { wh.deleteDir(tblPath, true); // ok even if the data is not deleted @@ -411,6 +766,30 @@ } } + public void drop_table(final String dbname, final String name, final boolean deleteData) + throws NoSuchObjectException, MetaException { + incrementCounter("drop_table"); + logStartFunction("drop_table", dbname, name); + + try { + executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + drop_table_core(ms, dbname, name, deleteData); + return Boolean.TRUE; + } + }); + } catch (NoSuchObjectException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + + } + /** * Is this an external table? * @@ -422,14 +801,30 @@ return MetaStoreUtils.isExternalTable(table); } - public Table get_table(String dbname, String name) throws MetaException, + public Table get_table(final String dbname, final String name) throws MetaException, NoSuchObjectException { + Table t = null; incrementCounter("get_table"); logStartFunction("get_table", dbname, name); - Table t = getMS().getTable(dbname, name); - if (t == null) { - throw new NoSuchObjectException(dbname + "." + name - + " table not found"); + try { + t = executeWithRetry(new Command() { + @Override + Table run(RawStore ms) throws Exception { + Table t = ms.getTable(dbname, name); + if (t == null) { + throw new NoSuchObjectException(dbname + "." + name + + " table not found"); + } + return t; + } + }); + } catch (NoSuchObjectException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; } return t; } @@ -442,20 +837,21 @@ return false; } - private Partition append_partition_common(String dbName, String tableName, + private Partition append_partition_common(RawStore ms, String dbName, String tableName, List part_vals) throws InvalidObjectException, AlreadyExistsException, MetaException { + Partition part = new Partition(); boolean success = false, madeDir = false; Path partLocation = null; try { - getMS().openTransaction(); + ms.openTransaction(); part = new Partition(); part.setDbName(dbName); part.setTableName(tableName); part.setValues(part_vals); - Table tbl = getMS().getTable(part.getDbName(), part.getTableName()); + Table tbl = ms.getTable(part.getDbName(), part.getTableName()); if (tbl == null) { throw new InvalidObjectException( "Unable to add partition because table or database do not exist"); @@ -485,24 +881,23 @@ part.setCreateTime((int) time); part.putToParameters(Constants.DDL_TIME, Long.toString(time)); - success = getMS().addPartition(part); + success = ms.addPartition(part); if (success) { - success = getMS().commitTransaction(); + success = ms.commitTransaction(); } } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); if (madeDir) { wh.deleteDir(partLocation, true); } } } return part; - } - public Partition append_partition(String dbName, String tableName, - List part_vals) throws InvalidObjectException, + public Partition append_partition(final String dbName, final String tableName, + final List part_vals) throws InvalidObjectException, AlreadyExistsException, MetaException { incrementCounter("append_partition"); logStartFunction("append_partition", dbName, tableName); @@ -511,48 +906,90 @@ LOG.debug(part); } } - return append_partition_common(dbName, tableName, part_vals); + + Partition ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Partition run(RawStore ms) throws Exception { + return append_partition_common(ms, dbName, tableName, part_vals); + } + }); + } catch (MetaException e) { + throw e; + } catch (InvalidObjectException e) { + throw e; + } catch (AlreadyExistsException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } - public int add_partitions(List parts) throws MetaException, - InvalidObjectException, AlreadyExistsException { - incrementCounter("add_partition"); - if (parts.size() == 0) { - return 0; - } + private int add_partitions_core(final RawStore ms, final List parts) + throws MetaException, InvalidObjectException, AlreadyExistsException { String db = parts.get(0).getDbName(); String tbl = parts.get(0).getTableName(); logStartFunction("add_partitions", db, tbl); boolean success = false; try { - getMS().openTransaction(); + ms.openTransaction(); for (Partition part : parts) { add_partition(part); } success = true; - getMS().commitTransaction(); + ms.commitTransaction(); } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); } } return parts.size(); } - public Partition add_partition(Partition part) + public int add_partitions(final List parts) throws MetaException, + InvalidObjectException, AlreadyExistsException { + incrementCounter("add_partition"); + if (parts.size() == 0) { + return 0; + } + + Integer ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Integer run(RawStore ms) throws Exception { + int ret = add_partitions_core(ms, parts); + return Integer.valueOf(ret); + } + }); + } catch (InvalidObjectException e) { + throw e; + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; + } + + private Partition add_partition_core(final RawStore ms, final Partition part) throws InvalidObjectException, AlreadyExistsException, MetaException { - incrementCounter("add_partition"); - logStartFunction("add_partition", part.getDbName(), part.getTableName()); boolean success = false, madeDir = false; Path partLocation = null; try { - getMS().openTransaction(); + ms.openTransaction(); Partition old_part = get_partition(part.getDbName(), part .getTableName(), part.getValues()); if (old_part != null) { throw new AlreadyExistsException("Partition already exists:" + part); } - Table tbl = getMS().getTable(part.getDbName(), part.getTableName()); + Table tbl = ms.getTable(part.getDbName(), part.getTableName()); if (tbl == null) { throw new InvalidObjectException( "Unable to add partition because table or database do not exist"); @@ -586,11 +1023,11 @@ part.setCreateTime((int) time); part.putToParameters(Constants.DDL_TIME, Long.toString(time)); - success = getMS().addPartition(part) && getMS().commitTransaction(); + success = ms.addPartition(part) && ms.commitTransaction(); } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); if (madeDir) { wh.deleteDir(partLocation, true); } @@ -599,13 +1036,42 @@ return part; } - private boolean drop_partition_common(String db_name, String tbl_name, - List part_vals, boolean deleteData) throws MetaException, NoSuchObjectException { + public Partition add_partition(final Partition part) + throws InvalidObjectException, AlreadyExistsException, MetaException { + incrementCounter("add_partition"); + logStartFunction("add_partition", part.getDbName(), part.getTableName()); + + Partition ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Partition run(RawStore ms) throws Exception { + return add_partition_core(ms, part); + } + }); + } catch (InvalidObjectException e) { + throw e; + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; + + } + + private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name, + List part_vals, final boolean deleteData) + throws MetaException, NoSuchObjectException { + boolean success = false; Path partPath = null; Table tbl = null; try { - getMS().openTransaction(); + ms.openTransaction(); Partition part = get_partition(db_name, tbl_name, part_vals); if (part == null) { throw new NoSuchObjectException("Partition doesn't exist. " @@ -614,15 +1080,15 @@ if (part.getSd() == null || part.getSd().getLocation() == null) { throw new MetaException("Partition metadata is corrupted"); } - if (!getMS().dropPartition(db_name, tbl_name, part_vals)) { + if (!ms.dropPartition(db_name, tbl_name, part_vals)) { throw new MetaException("Unable to drop partition"); } - success = getMS().commitTransaction(); + success = ms.commitTransaction(); partPath = new Path(part.getSd().getLocation()); tbl = get_table(db_name, tbl_name); } finally { if (!success) { - getMS().rollbackTransaction(); + ms.rollbackTransaction(); } else if (deleteData && (partPath != null)) { if (tbl != null && !isExternal(tbl)) { wh.deleteDir(partPath, true); @@ -632,53 +1098,145 @@ } return true; } - public boolean drop_partition(String db_name, String tbl_name, - List part_vals, boolean deleteData) + public boolean drop_partition(final String db_name, final String tbl_name, + final List part_vals, final boolean deleteData) throws NoSuchObjectException, MetaException, TException { incrementCounter("drop_partition"); logStartFunction("drop_partition", db_name, tbl_name); LOG.info("Partition values:" + part_vals); - return drop_partition_common(db_name, tbl_name, part_vals, deleteData); + Boolean ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + return Boolean.valueOf( + drop_partition_common(ms, db_name, tbl_name, part_vals, deleteData)); + } + }); + } catch (MetaException e) { + throw e; + } catch (NoSuchObjectException e) { + throw e; + } catch (TException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret.booleanValue(); + } - public Partition get_partition(String db_name, String tbl_name, - List part_vals) throws MetaException { + public Partition get_partition(final String db_name, final String tbl_name, + final List part_vals) throws MetaException { incrementCounter("get_partition"); logStartFunction("get_partition", db_name, tbl_name); - return getMS().getPartition(db_name, tbl_name, part_vals); + + Partition ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Partition run(RawStore ms) throws Exception { + return ms.getPartition(db_name, tbl_name, part_vals); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } - public List get_partitions(String db_name, String tbl_name, - short max_parts) throws NoSuchObjectException, MetaException { + public List get_partitions(final String db_name, final String tbl_name, + final short max_parts) throws NoSuchObjectException, MetaException { incrementCounter("get_partitions"); logStartFunction("get_partitions", db_name, tbl_name); - return getMS().getPartitions(db_name, tbl_name, max_parts); + + List ret = null; + try { + ret = executeWithRetry(new Command>() { + @Override + List run(RawStore ms) throws Exception { + return ms.getPartitions(db_name, tbl_name, max_parts); + } + }); + } catch (MetaException e) { + throw e; + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; + } - public List get_partition_names(String db_name, String tbl_name, - short max_parts) throws MetaException { + public List get_partition_names(final String db_name, final String tbl_name, + final short max_parts) throws MetaException { incrementCounter("get_partition_names"); logStartFunction("get_partition_names", db_name, tbl_name); - return getMS().listPartitionNames(db_name, tbl_name, max_parts); + + List ret = null; + try { + ret = executeWithRetry(new Command>() { + @Override + List run(RawStore ms) throws Exception { + return ms.listPartitionNames(db_name, tbl_name, max_parts); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } - public void alter_partition(String db_name, String tbl_name, - Partition new_part) throws InvalidOperationException, MetaException, - TException { - incrementCounter("alter_partition"); - logStartFunction("alter_partition", db_name, tbl_name); - LOG.info("Partition values:" + new_part.getValues()); + private void alter_partition_core(final RawStore ms, final String db_name, + final String tbl_name, final Partition new_part) + throws InvalidOperationException, MetaException, TException { try { new_part.putToParameters(Constants.DDL_TIME, Long.toString(System .currentTimeMillis() / 1000)); - getMS().alterPartition(db_name, tbl_name, new_part); + ms.alterPartition(db_name, tbl_name, new_part); } catch (InvalidObjectException e) { - LOG.error(StringUtils.stringifyException(e)); throw new InvalidOperationException("alter is not possible"); } } + public void alter_partition(final String db_name, final String tbl_name, + final Partition new_part) throws InvalidOperationException, MetaException, + TException { + incrementCounter("alter_partition"); + logStartFunction("alter_partition", db_name, tbl_name); + LOG.info("Partition values:" + new_part.getValues()); + + try { + executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + alter_partition_core(ms, db_name, tbl_name, new_part); + return Boolean.TRUE; + } + }); + } catch (InvalidOperationException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (TException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return; + } + public boolean create_index(Index index_def) throws IndexAlreadyExistsException, MetaException { incrementCounter("create_index"); @@ -692,21 +1250,54 @@ return "3.0"; } - public void alter_table(String dbname, String name, Table newTable) + public void alter_table(final String dbname, final String name, final Table newTable) throws InvalidOperationException, MetaException { incrementCounter("alter_table"); - logStartFunction("truncate_table: db=" + dbname + " tbl=" + name + logStartFunction("alter_table: db=" + dbname + " tbl=" + name + " newtbl=" + newTable.getTableName()); newTable.putToParameters(Constants.DDL_TIME, Long.toString(System .currentTimeMillis() / 1000)); - alterHandler.alterTable(getMS(), wh, dbname, name, newTable); + + try { + executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + alterHandler.alterTable(ms, wh, dbname, name, newTable); + return Boolean.TRUE; + } + }); + } catch (MetaException e) { + throw e; + } catch (InvalidOperationException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + } - public List get_tables(String dbname, String pattern) + public List get_tables(final String dbname, final String pattern) throws MetaException { incrementCounter("get_tables"); logStartFunction("get_tables: db=" + dbname + " pat=" + pattern); - return getMS().getTables(dbname, pattern); + + List ret; + try { + ret = executeWithRetry(new Command>() { + @Override + List run(RawStore ms) throws Exception { + return ms.getTables(dbname, pattern); + } + }); + } catch (MetaException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; + } public List get_fields(String db, String tableName) @@ -813,14 +1404,14 @@ return toReturn; } - private List getPartValsFromName(String dbName, String tblName, + private List getPartValsFromName(RawStore ms, String dbName, String tblName, String partName) throws MetaException, InvalidObjectException { // Unescape the partition name LinkedHashMap hm = Warehouse.makeSpecFromName(partName); // getPartition expects partition values in a list. use info from the // table to put the partition column values in order - Table t = getMS().getTable(dbName, tblName); + Table t = ms.getTable(dbName, tblName); if (t == null) { throw new InvalidObjectException(dbName + "." + tblName + " table not found"); @@ -830,7 +1421,7 @@ for(FieldSchema field : t.getPartitionKeys()) { String key = field.getName(); String val = hm.get(key); - if(val == null) { + if (val == null) { throw new InvalidObjectException("incomplete partition name - missing " + key); } partVals.add(val); @@ -838,54 +1429,128 @@ return partVals; } - public Partition get_partition_by_name(String db_name, String tbl_name, - String part_name) throws MetaException, NoSuchObjectException, TException { - incrementCounter("get_partition_by_name"); - logStartFunction("get_partition_by_name: db=" + db_name + " tbl=" - + tbl_name + " part=" + part_name); - + private Partition get_partition_by_name_core(final RawStore ms, final String db_name, + final String tbl_name, final String part_name) + throws MetaException, NoSuchObjectException, TException { List partVals = null; try { - partVals = getPartValsFromName(db_name, tbl_name, part_name); + partVals = getPartValsFromName(ms, db_name, tbl_name, part_name); } catch (InvalidObjectException e) { throw new NoSuchObjectException(e.getMessage()); } - Partition p = getMS().getPartition(db_name, tbl_name, partVals); + Partition p = ms.getPartition(db_name, tbl_name, partVals); - if(p == null) { + if (p == null) { throw new NoSuchObjectException(db_name + "." + tbl_name + " partition (" + part_name + ") not found"); } return p; } - public Partition append_partition_by_name(String db_name, String tbl_name, - String part_name) throws InvalidObjectException, + public Partition get_partition_by_name(final String db_name,final String tbl_name, + final String part_name) throws MetaException, NoSuchObjectException, TException { + + incrementCounter("get_partition_by_name"); + logStartFunction("get_partition_by_name: db=" + db_name + " tbl=" + + tbl_name + " part=" + part_name); + + Partition ret = null; + + try { + ret = executeWithRetry(new Command() { + @Override + Partition run(RawStore ms) throws Exception { + return get_partition_by_name_core(ms, db_name, tbl_name, part_name); + } + }); + } catch (MetaException e) { + throw e; + } catch (NoSuchObjectException e) { + throw e; + } catch (TException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; + } + + public Partition append_partition_by_name(final String db_name, final String tbl_name, + final String part_name) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { incrementCounter("append_partition_by_name"); logStartFunction("append_partition_by_name: db=" + db_name + " tbl=" + tbl_name + " part=" + part_name); - List partVals = getPartValsFromName(db_name, tbl_name, part_name); - return append_partition_common(db_name, tbl_name, partVals); + Partition ret = null; + try { + ret = executeWithRetry(new Command() { + @Override + Partition run(RawStore ms) throws Exception { + List partVals = getPartValsFromName(ms, db_name, tbl_name, part_name); + return append_partition_common(ms, db_name, tbl_name, partVals); + } + }); + } catch (InvalidObjectException e) { + throw e; + } catch (AlreadyExistsException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (TException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; + } + return ret; } + private boolean drop_partition_by_name_core(final RawStore ms, + final String db_name, final String tbl_name, final String part_name, + final boolean deleteData) throws NoSuchObjectException, + MetaException, TException { + + List partVals = null; + try { + partVals = getPartValsFromName(ms, db_name, tbl_name, part_name); + } catch (InvalidObjectException e) { + throw new NoSuchObjectException(e.getMessage()); + } + + return drop_partition_common(ms, db_name, tbl_name, partVals, deleteData); + } + @Override - public boolean drop_partition_by_name(String db_name, String tbl_name, - String part_name, boolean deleteData) throws NoSuchObjectException, + public boolean drop_partition_by_name(final String db_name, final String tbl_name, + final String part_name, final boolean deleteData) throws NoSuchObjectException, MetaException, TException { incrementCounter("drop_partition_by_name"); logStartFunction("drop_partition_by_name: db=" + db_name + " tbl=" + tbl_name + " part=" + part_name); - List partVals = null; + Boolean ret = null; try { - partVals = getPartValsFromName(db_name, tbl_name, part_name); - } catch (InvalidObjectException e) { - throw new NoSuchObjectException(e.getMessage()); + ret = executeWithRetry(new Command() { + @Override + Boolean run(RawStore ms) throws Exception { + return drop_partition_by_name_core(ms, db_name, tbl_name, + part_name, deleteData); + } + }); + } catch (NoSuchObjectException e) { + throw e; + } catch (MetaException e) { + throw e; + } catch (TException e) { + throw e; + } catch (Exception e) { + assert(e instanceof RuntimeException); + throw (RuntimeException)e; } - return drop_partition_common(db_name, tbl_name, partVals, deleteData); + return ret.booleanValue(); } @Override Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 5533) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy) @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Properties; import java.util.Map.Entry; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.jdo.JDOHelper; import javax.jdo.JDOObjectNotFoundException; @@ -70,6 +72,8 @@ public class ObjectStore implements RawStore, Configurable { private static Properties prop = null; private static PersistenceManagerFactory pmf = null; + + private static Lock pmfPropLock = new ReentrantLock(); private static final Log LOG = LogFactory.getLog(ObjectStore.class.getName()); private static enum TXN_STATUS { @@ -90,20 +94,48 @@ return hiveConf; } + /** + * Called whenever this object is instantiated using ReflectionUils, and also + * on connection retries. In cases of connection retries, conf will usually + * contain modified values. + */ @SuppressWarnings("nls") public void setConf(Configuration conf) { - hiveConf = conf; - if (isInitialized) { - return; - } else { - initialize(); + // Although an instance of ObjectStore is accessed by one thread, there may + // be many threads with ObjectStore instances. So the static variables + // pmf and prop need to be protected with locks. + pmfPropLock.lock(); + try { + isInitialized = false; + hiveConf = conf; + Properties propsFromConf = getDataSourceProps(conf); + boolean propsChanged = !propsFromConf.equals(prop); + + if (propsChanged) { + pmf = null; + prop = null; + } + + assert(!isActiveTransaction()); + shutdown(); + // Always want to re-create pm as we don't know if it were created by the + // most recent instance of the pmf + pm = null; + openTrasactionCalls = 0; + currentTransaction = null; + transactionStatus = TXN_STATUS.NO_STATE; + + initialize(propsFromConf); + + if (!isInitialized) { + throw new RuntimeException( + "Unable to create persistence manager. Check dss.log for details"); + } else { + LOG.info("Initialized ObjectStore"); + } + } finally { + pmfPropLock.unlock(); } - if (!isInitialized) { - throw new RuntimeException( - "Unable to create persistence manager. Check dss.log for details"); - } else { - LOG.info("Initialized ObjectStore"); - } } private ClassLoader classLoader; @@ -115,13 +147,11 @@ } @SuppressWarnings("nls") - private void initialize() { + private void initialize(Properties dsProps) { LOG.info("ObjectStore, initialize called"); - initDataSourceProps(); + prop = dsProps; pm = getPersistenceManager(); - if (pm != null) { - isInitialized = true; - } + isInitialized = pm != null; return; } @@ -130,13 +160,10 @@ * in jpox.properties. */ @SuppressWarnings("nls") - private void initDataSourceProps() { - if (prop != null) { - return; - } - prop = new Properties(); + private static Properties getDataSourceProps(Configuration conf) { + Properties prop = new Properties(); - Iterator> iter = hiveConf.iterator(); + Iterator> iter = conf.iterator(); while (iter.hasNext()) { Map.Entry e = iter.next(); if (e.getKey().contains("datanucleus") || e.getKey().contains("jdo")) { @@ -156,6 +183,7 @@ } } } + return prop; } private static PersistenceManagerFactory getPMF() { @@ -189,7 +217,7 @@ /** * Opens a new one or the one already created Every call of this function must * have corresponding commit or rollback function call - * + * * @return an active transaction */ @@ -210,7 +238,7 @@ /** * if this is the commit of the first open call then an actual commit is * called. - * + * * @return Always returns true */ @SuppressWarnings("nls") Index: metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java (revision 0) @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.hooks; + +import org.apache.hadoop.conf.Configuration; + +/** + * JDOConnectURLHook is used to get the URL that JDO uses to connect to the + * database that stores the metastore data. Classes implementing this must be + * thread-safe (for Thrift server). + */ +public interface JDOConnectionURLHook { + + /** + * Gets the connection URL to supply to JDO. In addition to initialization, + * this method will be called after a connection failure for each reconnect + * attempt. + * + * @param conf The configuration used to initialize this instance of the HMS + * @return + * @throws Exception + */ + public String getJdoConnectionUrl(Configuration conf) + throws Exception; + + /** + * Alerts this that the connection URL was bad. Can be used to collect stats, + * etc. + * + * @param url + */ + public void notifyBadConnectionUrl(String url); +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 5533) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -56,7 +56,7 @@ private boolean open = false; private URI metastoreUris[]; private final boolean standAloneClient = false; - private HiveMetaHookLoader hookLoader; + private final HiveMetaHookLoader hookLoader; // for thrift connects private int retries = 5; @@ -86,7 +86,7 @@ } // get the number retries - retries = conf.getInt("hive.metastore.connect.retries", 5); + retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METATORETHRIFTRETRIES); // user wants file store based configuration if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) { @@ -615,7 +615,16 @@ return client.get_partition_by_name(db, tableName, partName); } + public Partition appendPartitionByName(String dbName, String tableName, String partName) + throws InvalidObjectException, AlreadyExistsException, MetaException, TException { + return client.append_partition_by_name(dbName, tableName, partName); + } + public boolean dropPartitionByName(String dbName, String tableName, String partName, boolean deleteData) + throws NoSuchObjectException, MetaException, TException { + return client.drop_partition_by_name(dbName, tableName, partName, deleteData); + } + private HiveMetaHook getHook(Table tbl) throws MetaException { if (hookLoader == null) { return null; Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 5533) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -51,7 +51,14 @@ public static final HiveConf.ConfVars[] metaVars = { HiveConf.ConfVars.METASTOREDIRECTORY, HiveConf.ConfVars.METASTOREWAREHOUSE, - HiveConf.ConfVars.METASTOREURIS + HiveConf.ConfVars.METASTOREURIS, + HiveConf.ConfVars.METATORETHRIFTRETRIES, + HiveConf.ConfVars.METASTOREPWD, + HiveConf.ConfVars.METASTORECONNECTURLHOOK, + HiveConf.ConfVars.METASTORECONNECTURLKEY, + HiveConf.ConfVars.METASTOREATTEMPTS, + HiveConf.ConfVars.METASTOREINTERVAL, + HiveConf.ConfVars.METASTOREFORCERELOADCONF, }; /** @@ -89,11 +96,27 @@ HADOOPJOBNAME("mapred.job.name", null), HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false), - // MetaStore stuff. + // Metastore stuff. Be sure to update HiveConf.metaVars when you add + // something here! METASTOREDIRECTORY("hive.metastore.metadb.dir", ""), METASTOREWAREHOUSE("hive.metastore.warehouse.dir", ""), METASTOREURIS("hive.metastore.uris", ""), + // Number of times to retry a connection to a Thrift metastore server + METATORETHRIFTRETRIES("hive.metastore.connect.retries", ""), METASTOREPWD("javax.jdo.option.ConnectionPassword", ""), + // Class name of JDO connection url hook + METASTORECONNECTURLHOOK("hive.metastore.ds.connection.url.hook", ""), + // Name of the connection url in the configuration + METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL", ""), + // Number of attempts to retry connecting after there is a JDO datastore err + METASTOREATTEMPTS("hive.metastore.ds.retry.attempts", 1), + // Number of miliseconds to wait between attepting + METASTOREINTERVAL("hive.metastore.ds.retry.interval", 1000), + // Whether to force reloading of the metastore configuration (including + // the connection URL, before the next metastore query that accesses the + // datastore. Once reloaded, the this value is reset to false. Used for + // testing only. + METASTOREFORCERELOADCONF("hive.metastore.force.reload.conf", false), // CLI CLIIGNOREERRORS("hive.cli.errors.ignore", false), @@ -261,6 +284,7 @@ this.defaultBoolVal = defaultBoolVal; } + @Override public String toString() { return varname; } Index: contrib/src/test/results/clientpositive/url_hook.q.out =================================================================== --- contrib/src/test/results/clientpositive/url_hook.q.out (revision 0) +++ contrib/src/test/results/clientpositive/url_hook.q.out (revision 0) @@ -0,0 +1,19 @@ +PREHOOK: query: SHOW TABLES 'src' +PREHOOK: type: SHOWTABLES +POSTHOOK: query: SHOW TABLES 'src' +POSTHOOK: type: SHOWTABLES +src +PREHOOK: query: SHOW TABLES 'src' +PREHOOK: type: SHOWTABLES +POSTHOOK: query: SHOW TABLES 'src' +POSTHOOK: type: SHOWTABLES +PREHOOK: query: SHOW TABLES 'src' +PREHOOK: type: SHOWTABLES +POSTHOOK: query: SHOW TABLES 'src' +POSTHOOK: type: SHOWTABLES +src +PREHOOK: query: SHOW TABLES 'src' +PREHOOK: type: SHOWTABLES +POSTHOOK: query: SHOW TABLES 'src' +POSTHOOK: type: SHOWTABLES +src Index: contrib/src/test/queries/clientpositive/url_hook.q =================================================================== --- contrib/src/test/queries/clientpositive/url_hook.q (revision 0) +++ contrib/src/test/queries/clientpositive/url_hook.q (revision 0) @@ -0,0 +1,9 @@ +add jar ../build/contrib/hive_contrib.jar; +set hive.metastore.force.reload.conf=true; +SHOW TABLES 'src'; +set hive.metastore.ds.connection.url.hook=org.apache.hadoop.hive.contrib.metastore.hooks.TestURLHook; +SHOW TABLES 'src'; +SHOW TABLES 'src'; +set hive.metastore.force.reload.conf=false; +set hive.metastore.ds.connection.url.hook=; +SHOW TABLES 'src'; Index: contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java (revision 0) @@ -0,0 +1,30 @@ +package org.apache.hadoop.hive.contrib.metastore.hooks; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; + +/** + * First returns a url for a blank DB, then returns a URL for the original DB. + * For testing the feature in url_hook.q + */ +public class TestURLHook implements JDOConnectionURLHook { + + static String originalUrl = null; + @Override + public String getJdoConnectionUrl(Configuration conf) throws Exception { + if (originalUrl == null) { + originalUrl = conf.get(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, ""); + return "jdbc:derby:;databaseName=../build/test/junit_metastore_db_blank;create=true"; + } else { + return originalUrl; + } + + } + + @Override + public void notifyBadConnectionUrl(String url) { + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 5533) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,7 +49,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; @@ -99,8 +97,9 @@ Hive db = hiveDB.get(); if (db != null) { for (HiveConf.ConfVars oneVar : HiveConf.metaVars) { - String oldVar = db.getConf().getVar(oneVar); - String newVar = c.getVar(oneVar); + // Since metaVars are all of different types, use string for comparison + String oldVar = db.getConf().get(oneVar.varname, ""); + String newVar = c.get(oneVar.varname, ""); if (oldVar.compareToIgnoreCase(newVar) != 0) { needsRefresh = true; break;