Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 5401)
+++ conf/hive-default.xml (working copy)
@@ -485,4 +485,22 @@
Whether sorting is enforced. If true, while inserting into the table, sorting is enforced.
+
+ hive.metastore.ds.connection.url.hook
+
+ Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used
+
+
+
+ hive.metastore.ds.retry.attempts
+ 1
+ The number of times to retry a metastore call if there were a connection error
+
+
+
+ hive.metastore.ds.retry.interval
+ 1000
+ The number of miliseconds between metastore retry attempts
+
+
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 5401)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy)
@@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
@@ -46,6 +47,7 @@
import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -76,14 +78,26 @@
private String rawStoreClassName;
private final HiveConf hiveConf; // stores datastore (jpox) properties,
// right now they come from jpox.properties
+
private Warehouse wh; // hdfs warehouse
- private final ThreadLocal threadLocalMS = new ThreadLocal() {
+ private final ThreadLocal threadLocalMS =
+ new ThreadLocal() {
@Override
- protected synchronized Object initialValue() {
+ protected synchronized RawStore initialValue() {
return null;
}
};
+ // Thread local configuration is needed as many threads could make changes
+ // to the conf using the connection hook
+ private final ThreadLocal threadLocalConf =
+ new ThreadLocal() {
+ @Override
+ protected synchronized Configuration initialValue() {
+ return null;
+ }
+ };
+
// The next serial number to be assigned
private boolean checkForDefaultDb;
private static int nextSerialNum = 0;
@@ -94,6 +108,12 @@
}
};
+ // Used for retrying JDO calls on datastore failures
+ private int retryInterval = 0;
+ private int retryLimit = 0;
+ private JDOConnectionURLHook urlHook = null;
+ private String urlHookClassName = "";
+
public static Integer get() {
return threadLocalId.get();
}
@@ -128,48 +148,241 @@
alterHandler = (AlterHandler) ReflectionUtils.newInstance(getClass(
alterHandlerName, AlterHandler.class), hiveConf);
wh = new Warehouse(hiveConf);
+
+ retryInterval = HiveConf.getIntVar(hiveConf,
+ HiveConf.ConfVars.METASTOREINTERVAL);
+ retryLimit = HiveConf.getIntVar(hiveConf,
+ HiveConf.ConfVars.METASTOREATTEMPTS);
+ // Using the hook on startup ensures that the hook always has priority
+ // over settings in *.xml. We can use hiveConf as only a single thread
+ // will be calling the constructor.
+ updateConnectionURL(hiveConf, null);
+
createDefaultDB();
return true;
}
+ private String addPrefix(String s) {
+ return threadLocalId.get() + ": " + s;
+ }
+
/**
+ * A Command is a closure used to pass a block of code from individual
+ * functions to executeWithRetry, which centralizes connection error
+ * handling. Command is parameterized on the return type of the function.
+ *
+ * The general transformation is:
+ *
+ * From:
+ * String foo(int a) throws ExceptionB {
+ *
+ * }
+ *
+ * To:
+ * String foo(final int a) throws ExceptionB {
+ * String ret = null;
+ * try {
+ * ret = executeWithRetry(new Command() {
+ * String run(RawStore ms) {
+ *
+ * }
+ * }
+ * } catch (ExceptionB e) {
+ * throw e;
+ * } catch (Exception e) {
+ * // Since run is only supposed to throw ExceptionB it could only
+ * // be a runtime exception
+ * throw (RuntimeException)e;
+ * }
+ * }
+ *
+ * The catch blocks are used to ensure that the exceptions thrown by the
+ * follow the function definition.
+ */
+ private static class Command {
+ T run(RawStore ms) throws Exception {
+ return null;
+ }
+ }
+
+ private T executeWithRetry(Command cmd) throws Exception {
+ T ret = null;
+
+ boolean gotNewConnectUrl = false;
+ boolean reloadConf = HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.METASTOREFORCERELOADCONF);
+
+ if (reloadConf) {
+ updateConnectionURL(getConf(), null);
+ }
+
+ int retryCount = 0;
+ Exception caughtException = null;
+ while(true) {
+ try {
+ RawStore ms = getMS(reloadConf || gotNewConnectUrl);
+ ret = cmd.run(ms);
+ break;
+ } catch (javax.jdo.JDOFatalDataStoreException e) {
+ caughtException = e;
+ } catch (javax.jdo.JDODataStoreException e) {
+ caughtException = e;
+ }
+
+ if (retryCount >= retryLimit) {
+ throw caughtException;
+ }
+
+ assert(retryInterval >= 0);
+ retryCount++;
+ LOG.error(
+ String.format(
+ "JDO datastore error. Retrying metastore command " +
+ "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
+ Thread.sleep(retryInterval);
+ // If we have a connection error, the JDO connection URL hook might
+ // provide us with a new URL to access the datastore.
+ String lastUrl = getConnectionURL(getConf());
+ gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
+ }
+ return ret;
+ }
+
+ private Configuration getConf() {
+ Configuration conf = threadLocalConf.get();
+ if (conf == null) {
+ conf = new Configuration(hiveConf);
+ threadLocalConf.set(conf);
+ }
+ return conf;
+ }
+
+ /**
+ * Get a cached RawStore.
+ *
* @return
* @throws MetaException
*/
- private RawStore getMS() throws MetaException {
+ private RawStore getMS(boolean reloadConf) throws MetaException {
RawStore ms = threadLocalMS.get();
if (ms == null) {
- LOG.info(threadLocalId.get()
- + ": Opening raw store with implemenation class:"
- + rawStoreClassName);
+ LOG.info(addPrefix("Opening raw store with implemenation class:"
+ + rawStoreClassName));
ms = (RawStore) ReflectionUtils.newInstance(getClass(rawStoreClassName,
- RawStore.class), hiveConf);
+ RawStore.class), getConf());
threadLocalMS.set(ms);
ms = threadLocalMS.get();
}
+
+ if (reloadConf) {
+ ms.setConf(getConf());
+ }
+
return ms;
}
/**
- * create default database if it doesn't exist
- *
- * @throws MetaException
+ * Updates the connection URL in hiveConf using the hook
+ * @return true if a new connection URL was loaded into the thread local
+ * configuration
*/
- private void createDefaultDB() throws MetaException {
- if (HMSHandler.createDefaultDB || !checkForDefaultDb) {
+ private boolean updateConnectionURL(Configuration conf, String badUrl)
+ throws MetaException {
+ String connectUrl = null;
+ String currentUrl = getConnectionURL(conf);
+ try {
+ // We always call init because the hook name in the configuration could
+ // have changed.
+ initConnectionUrlHook();
+ if (urlHook != null) {
+ if (badUrl != null) {
+ urlHook.notifyBadConnectionUrl(badUrl);
+ }
+ connectUrl = urlHook.getJdoConnectionUrl(hiveConf);
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while getting connection URL from the hook: " +
+ e);
+ }
+
+ if (connectUrl != null && !connectUrl.equals(currentUrl)) {
+ LOG.error(addPrefix(
+ String.format("Overriding %s with %s",
+ HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
+ connectUrl)));
+ conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),
+ connectUrl);
+ return true;
+ }
+ return false;
+
+ }
+
+ private static String getConnectionURL(Configuration conf) {
+ return conf.get(
+ HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(),"");
+ }
+
+ // Multiple threads could try to initialize at the same time.
+ synchronized private void initConnectionUrlHook()
+ throws ClassNotFoundException {
+
+ String className =
+ hiveConf.get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim();
+ if (className.equals("")){
+ urlHookClassName = "";
+ urlHook = null;
return;
}
+ boolean urlHookChanged = !urlHookClassName.equals(className);
+ if (urlHook == null || urlHookChanged) {
+ urlHookClassName = className.trim();
+
+ Class > urlHookClass = Class.forName(urlHookClassName, true,
+ JavaUtils.getClassLoader());
+ urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null);
+ }
+ return;
+ }
+
+ private void createDefaultDB_core(RawStore ms) throws MetaException {
try {
- getMS().getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME);
+ ms.getDatabase(MetaStoreUtils.DEFAULT_DATABASE_NAME);
} catch (NoSuchObjectException e) {
- getMS().createDatabase(
+ ms.createDatabase(
new Database(MetaStoreUtils.DEFAULT_DATABASE_NAME, wh
.getDefaultDatabasePath(MetaStoreUtils.DEFAULT_DATABASE_NAME)
.toString()));
}
HMSHandler.createDefaultDB = true;
}
+ /**
+ * create default database if it doesn't exist
+ *
+ * @throws MetaException
+ */
+ private void createDefaultDB() throws MetaException {
+ if (HMSHandler.createDefaultDB || !checkForDefaultDb) {
+ return;
+ }
+ try {
+ executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ createDefaultDB_core(ms);
+ return Boolean.TRUE;
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+
+ }
+
private Class> getClass(String rawStoreClassName, Class> class1)
throws MetaException {
try {
@@ -196,58 +409,91 @@
@Override
public void shutdown() {
logStartFunction("Shutting down the object store...");
- try {
- if (threadLocalMS.get() != null) {
- getMS().shutdown();
- }
- } catch (MetaException e) {
- LOG.error("unable to shutdown metastore", e);
+ RawStore ms = threadLocalMS.get();
+ if (ms != null) {
+ ms.shutdown();
}
System.exit(0);
}
- public boolean create_database(String name, String location_uri)
- throws AlreadyExistsException, MetaException {
- incrementCounter("create_database");
- logStartFunction("create_database: " + name);
+ private boolean create_database_core(RawStore ms, final String name,
+ final String location_uri) throws AlreadyExistsException, MetaException {
boolean success = false;
try {
- getMS().openTransaction();
+ ms.openTransaction();
Database db = new Database(name, location_uri);
- if (getMS().createDatabase(db)
+ if (ms.createDatabase(db)
&& wh.mkdirs(wh.getDefaultDatabasePath(name))) {
- success = getMS().commitTransaction();
+ success = ms.commitTransaction();
}
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
}
}
return success;
}
- public Database get_database(String name) throws NoSuchObjectException,
+ public boolean create_database(final String name, final String location_uri)
+ throws AlreadyExistsException, MetaException {
+ incrementCounter("create_database");
+ logStartFunction("create_database: " + name);
+
+ Boolean ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ boolean success = create_database_core(ms, name, location_uri);
+ return Boolean.valueOf(success);
+ }
+ });
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+
+ return ret.booleanValue();
+ }
+
+ public Database get_database(final String name) throws NoSuchObjectException,
MetaException {
incrementCounter("get_database");
logStartFunction("get_database: " + name);
- return getMS().getDatabase(name);
+
+ Database db = null;
+ try {
+ db = executeWithRetry(new Command() {
+ @Override
+ Database run(RawStore ms) throws Exception {
+ return ms.getDatabase(name);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return db;
}
- public boolean drop_database(String name) throws MetaException {
- incrementCounter("drop_database");
- logStartFunction("drop_database: " + name);
- if (name.equalsIgnoreCase(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
- throw new MetaException("Can't drop default database");
- }
+ private boolean drop_database_core(RawStore ms, final String name) throws MetaException {
boolean success = false;
try {
- getMS().openTransaction();
- if (getMS().dropDatabase(name)) {
- success = getMS().commitTransaction();
+ ms.openTransaction();
+ if (ms.dropDatabase(name)) {
+ success = ms.commitTransaction();
}
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
} else {
wh.deleteDir(wh.getDefaultDatabasePath(name), true);
// it is not a terrible thing even if the data is not deleted
@@ -256,13 +502,53 @@
return success;
}
+ public boolean drop_database(final String name) throws MetaException {
+ incrementCounter("drop_database");
+ logStartFunction("drop_database: " + name);
+ if (name.equalsIgnoreCase(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
+ throw new MetaException("Can't drop default database");
+ }
+
+ Boolean ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ boolean success = drop_database_core(ms, name);
+ return Boolean.valueOf(success);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret.booleanValue();
+ }
+
public List get_databases() throws MetaException {
incrementCounter("get_databases");
logStartFunction("get_databases");
- return getMS().getDatabases();
+
+ List ret = null;
+ try {
+ ret = executeWithRetry(new Command>() {
+ @Override
+ List run(RawStore ms) throws Exception {
+ return ms.getDatabases();
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
- public boolean create_type(Type type) throws AlreadyExistsException,
+ public boolean create_type(final Type type) throws AlreadyExistsException,
MetaException, InvalidObjectException {
incrementCounter("create_type");
logStartFunction("create_type: " + type.getName());
@@ -272,21 +558,70 @@
+ " already exists");
}
- // TODO:pc Validation of types should be done by clients or here????
- return getMS().createType(type);
+ Boolean ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ // TODO:pc Validation of types should be done by clients or here????
+ return Boolean.valueOf(ms.createType(type));
+ }
+ });
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+
+ return ret.booleanValue();
}
- public Type get_type(String name) throws MetaException {
+ public Type get_type(final String name) throws MetaException {
incrementCounter("get_type");
logStartFunction("get_type: " + name);
- return getMS().getType(name);
+
+ Type ret;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Type run(RawStore ms) throws Exception {
+ return ms.getType(name);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
- public boolean drop_type(String name) throws MetaException {
+ public boolean drop_type(final String name) throws MetaException {
incrementCounter("drop_type");
logStartFunction("drop_type: " + name);
- // TODO:pc validate that there are no types that refer to this
- return getMS().dropType(name);
+
+ Boolean ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ // TODO:pc validate that there are no types that refer to this
+ return ms.dropType(name);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
public Map get_type_all(String name) throws MetaException {
@@ -296,11 +631,8 @@
throw new MetaException("Not yet implemented");
}
- public void create_table(Table tbl) throws AlreadyExistsException,
- MetaException, InvalidObjectException {
- incrementCounter("create_table");
- logStartFunction("create_table: db=" + tbl.getDbName() + " tbl="
- + tbl.getTableName());
+ private void create_table_core(final RawStore ms, final Table tbl)
+ throws AlreadyExistsException, MetaException, InvalidObjectException {
if (!MetaStoreUtils.validateName(tbl.getTableName())
|| !MetaStoreUtils.validateColNames(tbl.getSd().getCols())
@@ -313,7 +645,7 @@
Path tblPath = null;
boolean success = false, madeDir = false;
try {
- getMS().openTransaction();
+ ms.openTransaction();
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
if (tbl.getSd().getLocation() == null
|| tbl.getSd().getLocation().isEmpty()) {
@@ -326,7 +658,6 @@
}
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
-
tbl.getSd().setLocation(tblPath.toString());
}
@@ -340,7 +671,7 @@
if (!wh.isDir(tblPath)) {
if (!wh.mkdirs(tblPath)) {
throw new MetaException(tblPath
- + " is not a directory or unable to create one");
+ + " is not a directory or unable to create one");
}
madeDir = true;
}
@@ -351,12 +682,12 @@
tbl.setCreateTime((int) time);
tbl.putToParameters(Constants.DDL_TIME, Long.toString(time));
- getMS().createTable(tbl);
- success = getMS().commitTransaction();
+ ms.createTable(tbl);
+ success = ms.commitTransaction();
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(tblPath, true);
}
@@ -364,6 +695,31 @@
}
}
+ public void create_table(final Table tbl) throws AlreadyExistsException,
+ MetaException, InvalidObjectException {
+ incrementCounter("create_table");
+ logStartFunction("create_table: db=" + tbl.getDbName() + " tbl="
+ + tbl.getTableName());
+ try {
+ executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ create_table_core(ms, tbl);
+ return Boolean.TRUE;
+ }
+ });
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ }
+
public boolean is_table_exists(String dbname, String name)
throws MetaException {
try {
@@ -373,17 +729,17 @@
}
}
- public void drop_table(String dbname, String name, boolean deleteData)
+ private void drop_table_core(final RawStore ms, final String dbname,
+ final String name, final boolean deleteData)
throws NoSuchObjectException, MetaException {
- incrementCounter("drop_table");
- logStartFunction("drop_table", dbname, name);
+
boolean success = false;
boolean isExternal = false;
Path tblPath = null;
Table tbl = null;
isExternal = false;
try {
- getMS().openTransaction();
+ ms.openTransaction();
// drop any partitions
tbl = get_table(dbname, name);
if (tbl == null) {
@@ -396,14 +752,14 @@
if (tbl.getSd().getLocation() != null) {
tblPath = new Path(tbl.getSd().getLocation());
}
- if (!getMS().dropTable(dbname, name)) {
+ if (!ms.dropTable(dbname, name)) {
throw new MetaException("Unable to drop table");
}
tbl = null; // table collections disappear after dropping
- success = getMS().commitTransaction();
+ success = ms.commitTransaction();
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
} else if (deleteData && (tblPath != null) && !isExternal) {
wh.deleteDir(tblPath, true);
// ok even if the data is not deleted
@@ -411,6 +767,30 @@
}
}
+ public void drop_table(final String dbname, final String name, final boolean deleteData)
+ throws NoSuchObjectException, MetaException {
+ incrementCounter("drop_table");
+ logStartFunction("drop_table", dbname, name);
+
+ try {
+ executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ drop_table_core(ms, dbname, name, deleteData);
+ return Boolean.TRUE;
+ }
+ });
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+
+ }
+
/**
* Is this an external table?
*
@@ -422,14 +802,30 @@
return MetaStoreUtils.isExternalTable(table);
}
- public Table get_table(String dbname, String name) throws MetaException,
+ public Table get_table(final String dbname, final String name) throws MetaException,
NoSuchObjectException {
+ Table t = null;
incrementCounter("get_table");
logStartFunction("get_table", dbname, name);
- Table t = getMS().getTable(dbname, name);
- if (t == null) {
- throw new NoSuchObjectException(dbname + "." + name
- + " table not found");
+ try {
+ t = executeWithRetry(new Command() {
+ @Override
+ Table run(RawStore ms) throws Exception {
+ Table t = ms.getTable(dbname, name);
+ if (t == null) {
+ throw new NoSuchObjectException(dbname + "." + name
+ + " table not found");
+ }
+ return t;
+ }
+ });
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
}
return t;
}
@@ -442,20 +838,21 @@
return false;
}
- private Partition append_partition_common(String dbName, String tableName,
+ private Partition append_partition_common(RawStore ms, String dbName, String tableName,
List part_vals) throws InvalidObjectException,
AlreadyExistsException, MetaException {
+
Partition part = new Partition();
boolean success = false, madeDir = false;
Path partLocation = null;
try {
- getMS().openTransaction();
+ ms.openTransaction();
part = new Partition();
part.setDbName(dbName);
part.setTableName(tableName);
part.setValues(part_vals);
- Table tbl = getMS().getTable(part.getDbName(), part.getTableName());
+ Table tbl = ms.getTable(part.getDbName(), part.getTableName());
if (tbl == null) {
throw new InvalidObjectException(
"Unable to add partition because table or database do not exist");
@@ -485,24 +882,23 @@
part.setCreateTime((int) time);
part.putToParameters(Constants.DDL_TIME, Long.toString(time));
- success = getMS().addPartition(part);
+ success = ms.addPartition(part);
if (success) {
- success = getMS().commitTransaction();
+ success = ms.commitTransaction();
}
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(partLocation, true);
}
}
}
return part;
-
}
- public Partition append_partition(String dbName, String tableName,
- List part_vals) throws InvalidObjectException,
+ public Partition append_partition(final String dbName, final String tableName,
+ final List part_vals) throws InvalidObjectException,
AlreadyExistsException, MetaException {
incrementCounter("append_partition");
logStartFunction("append_partition", dbName, tableName);
@@ -511,48 +907,90 @@
LOG.debug(part);
}
}
- return append_partition_common(dbName, tableName, part_vals);
+
+ Partition ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Partition run(RawStore ms) throws Exception {
+ return append_partition_common(ms, dbName, tableName, part_vals);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
- public int add_partitions(List parts) throws MetaException,
- InvalidObjectException, AlreadyExistsException {
- incrementCounter("add_partition");
- if (parts.size() == 0) {
- return 0;
- }
+ private int add_partitions_core(final RawStore ms, final List parts)
+ throws MetaException, InvalidObjectException, AlreadyExistsException {
String db = parts.get(0).getDbName();
String tbl = parts.get(0).getTableName();
logStartFunction("add_partitions", db, tbl);
boolean success = false;
try {
- getMS().openTransaction();
+ ms.openTransaction();
for (Partition part : parts) {
add_partition(part);
}
success = true;
- getMS().commitTransaction();
+ ms.commitTransaction();
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
}
}
return parts.size();
}
- public Partition add_partition(Partition part)
+ public int add_partitions(final List parts) throws MetaException,
+ InvalidObjectException, AlreadyExistsException {
+ incrementCounter("add_partition");
+ if (parts.size() == 0) {
+ return 0;
+ }
+
+ Integer ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Integer run(RawStore ms) throws Exception {
+ int ret = add_partitions_core(ms, parts);
+ return Integer.valueOf(ret);
+ }
+ });
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
+ }
+
+ private Partition add_partition_core(final RawStore ms, final Partition part)
throws InvalidObjectException, AlreadyExistsException, MetaException {
- incrementCounter("add_partition");
- logStartFunction("add_partition", part.getDbName(), part.getTableName());
boolean success = false, madeDir = false;
Path partLocation = null;
try {
- getMS().openTransaction();
+ ms.openTransaction();
Partition old_part = get_partition(part.getDbName(), part
.getTableName(), part.getValues());
if (old_part != null) {
throw new AlreadyExistsException("Partition already exists:" + part);
}
- Table tbl = getMS().getTable(part.getDbName(), part.getTableName());
+ Table tbl = ms.getTable(part.getDbName(), part.getTableName());
if (tbl == null) {
throw new InvalidObjectException(
"Unable to add partition because table or database do not exist");
@@ -586,11 +1024,11 @@
part.setCreateTime((int) time);
part.putToParameters(Constants.DDL_TIME, Long.toString(time));
- success = getMS().addPartition(part) && getMS().commitTransaction();
+ success = ms.addPartition(part) && ms.commitTransaction();
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(partLocation, true);
}
@@ -599,13 +1037,42 @@
return part;
}
- private boolean drop_partition_common(String db_name, String tbl_name,
- List part_vals, boolean deleteData) throws MetaException, NoSuchObjectException {
+ public Partition add_partition(final Partition part)
+ throws InvalidObjectException, AlreadyExistsException, MetaException {
+ incrementCounter("add_partition");
+ logStartFunction("add_partition", part.getDbName(), part.getTableName());
+
+ Partition ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Partition run(RawStore ms) throws Exception {
+ return add_partition_core(ms, part);
+ }
+ });
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
+
+ }
+
+ private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
+ List part_vals, final boolean deleteData)
+ throws MetaException, NoSuchObjectException {
+
boolean success = false;
Path partPath = null;
Table tbl = null;
try {
- getMS().openTransaction();
+ ms.openTransaction();
Partition part = get_partition(db_name, tbl_name, part_vals);
if (part == null) {
throw new NoSuchObjectException("Partition doesn't exist. "
@@ -614,15 +1081,15 @@
if (part.getSd() == null || part.getSd().getLocation() == null) {
throw new MetaException("Partition metadata is corrupted");
}
- if (!getMS().dropPartition(db_name, tbl_name, part_vals)) {
+ if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
throw new MetaException("Unable to drop partition");
}
- success = getMS().commitTransaction();
+ success = ms.commitTransaction();
partPath = new Path(part.getSd().getLocation());
tbl = get_table(db_name, tbl_name);
} finally {
if (!success) {
- getMS().rollbackTransaction();
+ ms.rollbackTransaction();
} else if (deleteData && (partPath != null)) {
if (tbl != null && !isExternal(tbl)) {
wh.deleteDir(partPath, true);
@@ -632,53 +1099,145 @@
}
return true;
}
- public boolean drop_partition(String db_name, String tbl_name,
- List part_vals, boolean deleteData)
+ public boolean drop_partition(final String db_name, final String tbl_name,
+ final List part_vals, final boolean deleteData)
throws NoSuchObjectException, MetaException, TException {
incrementCounter("drop_partition");
logStartFunction("drop_partition", db_name, tbl_name);
LOG.info("Partition values:" + part_vals);
- return drop_partition_common(db_name, tbl_name, part_vals, deleteData);
+ Boolean ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ return Boolean.valueOf(
+ drop_partition_common(ms, db_name, tbl_name, part_vals, deleteData));
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (TException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret.booleanValue();
+
}
- public Partition get_partition(String db_name, String tbl_name,
- List part_vals) throws MetaException {
+ public Partition get_partition(final String db_name, final String tbl_name,
+ final List part_vals) throws MetaException {
incrementCounter("get_partition");
logStartFunction("get_partition", db_name, tbl_name);
- return getMS().getPartition(db_name, tbl_name, part_vals);
+
+ Partition ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Partition run(RawStore ms) throws Exception {
+ return ms.getPartition(db_name, tbl_name, part_vals);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
- public List get_partitions(String db_name, String tbl_name,
- short max_parts) throws NoSuchObjectException, MetaException {
+ public List get_partitions(final String db_name, final String tbl_name,
+ final short max_parts) throws NoSuchObjectException, MetaException {
incrementCounter("get_partitions");
logStartFunction("get_partitions", db_name, tbl_name);
- return getMS().getPartitions(db_name, tbl_name, max_parts);
+
+ List ret = null;
+ try {
+ ret = executeWithRetry(new Command>() {
+ @Override
+ List run(RawStore ms) throws Exception {
+ return ms.getPartitions(db_name, tbl_name, max_parts);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
+
}
- public List get_partition_names(String db_name, String tbl_name,
- short max_parts) throws MetaException {
+ public List get_partition_names(final String db_name, final String tbl_name,
+ final short max_parts) throws MetaException {
incrementCounter("get_partition_names");
logStartFunction("get_partition_names", db_name, tbl_name);
- return getMS().listPartitionNames(db_name, tbl_name, max_parts);
+
+ List ret = null;
+ try {
+ ret = executeWithRetry(new Command>() {
+ @Override
+ List run(RawStore ms) throws Exception {
+ return ms.listPartitionNames(db_name, tbl_name, max_parts);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
- public void alter_partition(String db_name, String tbl_name,
- Partition new_part) throws InvalidOperationException, MetaException,
- TException {
- incrementCounter("alter_partition");
- logStartFunction("alter_partition", db_name, tbl_name);
- LOG.info("Partition values:" + new_part.getValues());
+ private void alter_partition_core(final RawStore ms, final String db_name,
+ final String tbl_name, final Partition new_part)
+ throws InvalidOperationException, MetaException, TException {
try {
new_part.putToParameters(Constants.DDL_TIME, Long.toString(System
.currentTimeMillis() / 1000));
- getMS().alterPartition(db_name, tbl_name, new_part);
+ ms.alterPartition(db_name, tbl_name, new_part);
} catch (InvalidObjectException e) {
- LOG.error(StringUtils.stringifyException(e));
throw new InvalidOperationException("alter is not possible");
}
}
+ public void alter_partition(final String db_name, final String tbl_name,
+ final Partition new_part) throws InvalidOperationException, MetaException,
+ TException {
+ incrementCounter("alter_partition");
+ logStartFunction("alter_partition", db_name, tbl_name);
+ LOG.info("Partition values:" + new_part.getValues());
+
+ try {
+ executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ alter_partition_core(ms, db_name, tbl_name, new_part);
+ return Boolean.TRUE;
+ }
+ });
+ } catch (InvalidOperationException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (TException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return;
+ }
+
public boolean create_index(Index index_def)
throws IndexAlreadyExistsException, MetaException {
incrementCounter("create_index");
@@ -692,21 +1251,54 @@
return "3.0";
}
- public void alter_table(String dbname, String name, Table newTable)
+ public void alter_table(final String dbname, final String name, final Table newTable)
throws InvalidOperationException, MetaException {
incrementCounter("alter_table");
- logStartFunction("truncate_table: db=" + dbname + " tbl=" + name
+ logStartFunction("alter_table: db=" + dbname + " tbl=" + name
+ " newtbl=" + newTable.getTableName());
newTable.putToParameters(Constants.DDL_TIME, Long.toString(System
.currentTimeMillis() / 1000));
- alterHandler.alterTable(getMS(), wh, dbname, name, newTable);
+
+ try {
+ executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ alterHandler.alterTable(ms, wh, dbname, name, newTable);
+ return Boolean.TRUE;
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (InvalidOperationException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+
}
- public List get_tables(String dbname, String pattern)
+ public List get_tables(final String dbname, final String pattern)
throws MetaException {
incrementCounter("get_tables");
logStartFunction("get_tables: db=" + dbname + " pat=" + pattern);
- return getMS().getTables(dbname, pattern);
+
+ List ret;
+ try {
+ ret = executeWithRetry(new Command>() {
+ @Override
+ List run(RawStore ms) throws Exception {
+ return ms.getTables(dbname, pattern);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
+
}
public List get_fields(String db, String tableName)
@@ -813,14 +1405,14 @@
return toReturn;
}
- private List getPartValsFromName(String dbName, String tblName,
+ private List getPartValsFromName(RawStore ms, String dbName, String tblName,
String partName) throws MetaException, InvalidObjectException {
// Unescape the partition name
LinkedHashMap hm = Warehouse.makeSpecFromName(partName);
// getPartition expects partition values in a list. use info from the
// table to put the partition column values in order
- Table t = getMS().getTable(dbName, tblName);
+ Table t = ms.getTable(dbName, tblName);
if (t == null) {
throw new InvalidObjectException(dbName + "." + tblName
+ " table not found");
@@ -830,7 +1422,7 @@
for(FieldSchema field : t.getPartitionKeys()) {
String key = field.getName();
String val = hm.get(key);
- if(val == null) {
+ if (val == null) {
throw new InvalidObjectException("incomplete partition name - missing " + key);
}
partVals.add(val);
@@ -838,54 +1430,128 @@
return partVals;
}
- public Partition get_partition_by_name(String db_name, String tbl_name,
- String part_name) throws MetaException, NoSuchObjectException, TException {
- incrementCounter("get_partition_by_name");
- logStartFunction("get_partition_by_name: db=" + db_name + " tbl="
- + tbl_name + " part=" + part_name);
-
+ private Partition get_partition_by_name_core(final RawStore ms, final String db_name,
+ final String tbl_name, final String part_name)
+ throws MetaException, NoSuchObjectException, TException {
List partVals = null;
try {
- partVals = getPartValsFromName(db_name, tbl_name, part_name);
+ partVals = getPartValsFromName(ms, db_name, tbl_name, part_name);
} catch (InvalidObjectException e) {
throw new NoSuchObjectException(e.getMessage());
}
- Partition p = getMS().getPartition(db_name, tbl_name, partVals);
+ Partition p = ms.getPartition(db_name, tbl_name, partVals);
- if(p == null) {
+ if (p == null) {
throw new NoSuchObjectException(db_name + "." + tbl_name
+ " partition (" + part_name + ") not found");
}
return p;
}
- public Partition append_partition_by_name(String db_name, String tbl_name,
- String part_name) throws InvalidObjectException,
+ public Partition get_partition_by_name(final String db_name,final String tbl_name,
+ final String part_name) throws MetaException, NoSuchObjectException, TException {
+
+ incrementCounter("get_partition_by_name");
+ logStartFunction("get_partition_by_name: db=" + db_name + " tbl="
+ + tbl_name + " part=" + part_name);
+
+ Partition ret = null;
+
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Partition run(RawStore ms) throws Exception {
+ return get_partition_by_name_core(ms, db_name, tbl_name, part_name);
+ }
+ });
+ } catch (MetaException e) {
+ throw e;
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (TException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
+ }
+
+ public Partition append_partition_by_name(final String db_name, final String tbl_name,
+ final String part_name) throws InvalidObjectException,
AlreadyExistsException, MetaException, TException {
incrementCounter("append_partition_by_name");
logStartFunction("append_partition_by_name: db=" + db_name + " tbl="
+ tbl_name + " part=" + part_name);
- List partVals = getPartValsFromName(db_name, tbl_name, part_name);
- return append_partition_common(db_name, tbl_name, partVals);
+ Partition ret = null;
+ try {
+ ret = executeWithRetry(new Command() {
+ @Override
+ Partition run(RawStore ms) throws Exception {
+ List partVals = getPartValsFromName(ms, db_name, tbl_name, part_name);
+ return append_partition_common(ms, db_name, tbl_name, partVals);
+ }
+ });
+ } catch (InvalidObjectException e) {
+ throw e;
+ } catch (AlreadyExistsException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (TException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
+ }
+ return ret;
}
+ private boolean drop_partition_by_name_core(final RawStore ms,
+ final String db_name, final String tbl_name, final String part_name,
+ final boolean deleteData) throws NoSuchObjectException,
+ MetaException, TException {
+
+ List partVals = null;
+ try {
+ partVals = getPartValsFromName(ms, db_name, tbl_name, part_name);
+ } catch (InvalidObjectException e) {
+ throw new NoSuchObjectException(e.getMessage());
+ }
+
+ return drop_partition_common(ms, db_name, tbl_name, partVals, deleteData);
+ }
+
@Override
- public boolean drop_partition_by_name(String db_name, String tbl_name,
- String part_name, boolean deleteData) throws NoSuchObjectException,
+ public boolean drop_partition_by_name(final String db_name, final String tbl_name,
+ final String part_name, final boolean deleteData) throws NoSuchObjectException,
MetaException, TException {
incrementCounter("drop_partition_by_name");
logStartFunction("drop_partition_by_name: db=" + db_name + " tbl="
+ tbl_name + " part=" + part_name);
- List partVals = null;
+ Boolean ret = null;
try {
- partVals = getPartValsFromName(db_name, tbl_name, part_name);
- } catch (InvalidObjectException e) {
- throw new NoSuchObjectException(e.getMessage());
+ ret = executeWithRetry(new Command() {
+ @Override
+ Boolean run(RawStore ms) throws Exception {
+ return drop_partition_by_name_core(ms, db_name, tbl_name,
+ part_name, deleteData);
+ }
+ });
+ } catch (NoSuchObjectException e) {
+ throw e;
+ } catch (MetaException e) {
+ throw e;
+ } catch (TException e) {
+ throw e;
+ } catch (Exception e) {
+ assert(e instanceof RuntimeException);
+ throw (RuntimeException)e;
}
- return drop_partition_common(db_name, tbl_name, partVals, deleteData);
+ return ret.booleanValue();
}
@Override
Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 5401)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy)
@@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jdo.JDOHelper;
import javax.jdo.JDOObjectNotFoundException;
@@ -70,6 +72,8 @@
public class ObjectStore implements RawStore, Configurable {
private static Properties prop = null;
private static PersistenceManagerFactory pmf = null;
+
+ private static Lock pmfPropLock = new ReentrantLock();
private static final Log LOG = LogFactory.getLog(ObjectStore.class.getName());
private static enum TXN_STATUS {
@@ -90,20 +94,48 @@
return hiveConf;
}
+ /**
+ * Called whenever this object is instantiated using ReflectionUils, and also
+ * on connection retries. In cases of connection retries, conf will usually
+ * contain modified values.
+ */
@SuppressWarnings("nls")
public void setConf(Configuration conf) {
- hiveConf = conf;
- if (isInitialized) {
- return;
- } else {
- initialize();
+ // Although an instance of ObjectStore is accessed by one thread, there may
+ // be many threads with ObjectStore instances. So the static variables
+ // pmf and prop need to be protected with locks.
+ pmfPropLock.lock();
+ try {
+ isInitialized = false;
+ hiveConf = conf;
+ Properties propsFromConf = getDataSourceProps(conf);
+ boolean propsChanged = !propsFromConf.equals(prop);
+
+ if (propsChanged) {
+ pmf = null;
+ prop = null;
+ }
+
+ assert(!isActiveTransaction());
+ shutdown();
+ // Always want to re-create pm as we don't know if it were created by the
+ // most recent instance of the pmf
+ pm = null;
+ openTrasactionCalls = 0;
+ currentTransaction = null;
+ transactionStatus = TXN_STATUS.NO_STATE;
+
+ initialize(propsFromConf);
+
+ if (!isInitialized) {
+ throw new RuntimeException(
+ "Unable to create persistence manager. Check dss.log for details");
+ } else {
+ LOG.info("Initialized ObjectStore");
+ }
+ } finally {
+ pmfPropLock.unlock();
}
- if (!isInitialized) {
- throw new RuntimeException(
- "Unable to create persistence manager. Check dss.log for details");
- } else {
- LOG.info("Initialized ObjectStore");
- }
}
private ClassLoader classLoader;
@@ -115,13 +147,11 @@
}
@SuppressWarnings("nls")
- private void initialize() {
+ private void initialize(Properties dsProps) {
LOG.info("ObjectStore, initialize called");
- initDataSourceProps();
+ prop = dsProps;
pm = getPersistenceManager();
- if (pm != null) {
- isInitialized = true;
- }
+ isInitialized = pm != null;
return;
}
@@ -130,13 +160,10 @@
* in jpox.properties.
*/
@SuppressWarnings("nls")
- private void initDataSourceProps() {
- if (prop != null) {
- return;
- }
- prop = new Properties();
+ private static Properties getDataSourceProps(Configuration conf) {
+ Properties prop = new Properties();
- Iterator> iter = hiveConf.iterator();
+ Iterator> iter = conf.iterator();
while (iter.hasNext()) {
Map.Entry e = iter.next();
if (e.getKey().contains("datanucleus") || e.getKey().contains("jdo")) {
@@ -156,6 +183,7 @@
}
}
}
+ return prop;
}
private static PersistenceManagerFactory getPMF() {
@@ -189,7 +217,7 @@
/**
* Opens a new one or the one already created Every call of this function must
* have corresponding commit or rollback function call
- *
+ *
* @return an active transaction
*/
@@ -210,7 +238,7 @@
/**
* if this is the commit of the first open call then an actual commit is
* called.
- *
+ *
* @return Always returns true
*/
@SuppressWarnings("nls")
Index: metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/hooks/JDOConnectionURLHook.java (revision 0)
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hooks;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * JDOConnectURLHook is used to get the URL that JDO uses to connect to the
+ * database that stores the metastore data. Classes implementing this must be
+ * thread-safe (for Thrift server).
+ */
+public interface JDOConnectionURLHook {
+
+ /**
+ * Gets the connection URL to supply to JDO. In addition to initialization,
+ * this method will be called after a connection failure for each reconnect
+ * attempt.
+ *
+ * @param conf The configuration used to initialize this instance of the HMS
+ * @return
+ * @throws Exception
+ */
+ public String getJdoConnectionUrl(Configuration conf)
+ throws Exception;
+
+ /**
+ * Alerts this that the connection URL was bad. Can be used to collect stats,
+ * etc.
+ *
+ * @param url
+ */
+ public void notifyBadConnectionUrl(String url);
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 5401)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy)
@@ -56,7 +56,7 @@
private boolean open = false;
private URI metastoreUris[];
private final boolean standAloneClient = false;
- private HiveMetaHookLoader hookLoader;
+ private final HiveMetaHookLoader hookLoader;
// for thrift connects
private int retries = 5;
@@ -86,7 +86,7 @@
}
// get the number retries
- retries = conf.getInt("hive.metastore.connect.retries", 5);
+ retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METATORETHRIFTRETRIES);
// user wants file store based configuration
if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
@@ -615,7 +615,16 @@
return client.get_partition_by_name(db, tableName, partName);
}
+ public Partition appendPartitionByName(String dbName, String tableName, String partName)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ return client.append_partition_by_name(dbName, tableName, partName);
+ }
+ public boolean dropPartitionByName(String dbName, String tableName, String partName, boolean deleteData)
+ throws NoSuchObjectException, MetaException, TException {
+ return client.drop_partition_by_name(dbName, tableName, partName, deleteData);
+ }
+
private HiveMetaHook getHook(Table tbl) throws MetaException {
if (hookLoader == null) {
return null;
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 5401)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -89,11 +89,26 @@
HADOOPJOBNAME("mapred.job.name", null),
HADOOPSPECULATIVEEXECREDUCERS("mapred.reduce.tasks.speculative.execution", false),
- // MetaStore stuff.
+ // Metastore stuff.
METASTOREDIRECTORY("hive.metastore.metadb.dir", ""),
METASTOREWAREHOUSE("hive.metastore.warehouse.dir", ""),
METASTOREURIS("hive.metastore.uris", ""),
+ // Number of times to retry a connection to a Thrift metastore server
+ METATORETHRIFTRETRIES("hive.metastore.connect.retries", ""),
METASTOREPWD("javax.jdo.option.ConnectionPassword", ""),
+ // Class name of JDO connection url hook
+ METASTORECONNECTURLHOOK("hive.metastore.ds.connection.url.hook", ""),
+ // Name of the connection url in the configuration
+ METASTORECONNECTURLKEY("javax.jdo.option.ConnectionURL", ""),
+ // Number of attempts to retry connecting after there is a JDO datastore err
+ METASTOREATTEMPTS("hive.metastore.ds.retry.attempts", 1),
+ // Number of miliseconds to wait between attepting
+ METASTOREINTERVAL("hive.metastore.ds.retry.interval", 1000),
+ // Whether to force reloading of the metastore configuration (including
+ // the connection URL, before the next metastore query that accesses the
+ // datastore. Once reloaded, the this value is reset to false. Used for
+ // testing only.
+ METASTOREFORCERELOADCONF("hive.metastore.force.reload.conf", false),
// CLI
CLIIGNOREERRORS("hive.cli.errors.ignore", false),
@@ -261,6 +276,7 @@
this.defaultBoolVal = defaultBoolVal;
}
+ @Override
public String toString() {
return varname;
}
Index: contrib/src/test/results/clientnegative/url_hook.q.out
===================================================================
--- contrib/src/test/results/clientnegative/url_hook.q.out (revision 0)
+++ contrib/src/test/results/clientnegative/url_hook.q.out (revision 0)
@@ -0,0 +1 @@
+FAILED: Error in semantic analysis: Unable to fetch table src
Index: contrib/src/test/queries/clientnegative/url_hook.q
===================================================================
--- contrib/src/test/queries/clientnegative/url_hook.q (revision 0)
+++ contrib/src/test/queries/clientnegative/url_hook.q (revision 0)
@@ -0,0 +1,6 @@
+add jar ../build/contrib/hive_contrib.jar;
+set hive.metastore.ds.connection.url.hook=org.apache.hadoop.hive.contrib.metastore.hooks.TestURLHook;
+set hive.metastore.retry.attempts=0;
+set hive.metastore.force.reload.conf=true;
+
+SELECT key FROM src LIMIT 1;
\ No newline at end of file
Index: contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java (revision 0)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/metastore/hooks/TestURLHook.java (revision 0)
@@ -0,0 +1,22 @@
+package org.apache.hadoop.hive.contrib.metastore.hooks;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
+
+/**
+ * Returns a bad JDO connection URL for testing purposes.
+ */
+public class TestURLHook implements JDOConnectionURLHook {
+
+
+ @Override
+ public String getJdoConnectionUrl(Configuration conf) throws Exception {
+ return "BADURL";
+ }
+
+ @Override
+ public void notifyBadConnectionUrl(String url) {
+
+ }
+
+}