Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1413432) +++ conf/hive-default.xml.template (working copy) @@ -23,7 +23,7 @@ - + mapred.reduce.tasks @@ -306,6 +306,12 @@ + hive.metastore.failure.retries + 3 + Number of retries upon failure of Thrift metastore calls + + + hive.metastore.client.connect.retry.delay 1 Number of seconds for the client to wait between consecutive connection attempts @@ -450,7 +456,7 @@ The main difference between this paramater and hive.optimize.skewjoin is that this parameter uses the skew information stored in the metastore to optimize the plan at compile time itself. - If there is no skew information in the metadata, this parameter will not have any affect. + If there is no skew information in the metadata, this parameter will not have any affect. Both hive.optimize.skewjoin.compiletime and hive.optimize.skewjoin should be set to true. Ideally, hive.optimize.skewjoin should be renamed as hive.optimize.skewjoin.runtime, but not doing so for backward compatibility. @@ -465,7 +471,7 @@ false Whether to remove the union and push the operators between union and the filesink above - union. This avoids an extra scan of the output by union. This is independently useful for union + union. This avoids an extra scan of the output by union. This is independently useful for union queries, and specially useful when hive.optimize.skewjoin.compiletime is set to true, since an extra union is inserted. @@ -479,7 +485,7 @@ hive.mapred.supports.subdirectories false Whether the version of hadoop which is running supports sub-directories for tables/partitions. - Many hive optimizations can be applied if the hadoop version supports sub-directories for + Many hive optimizations can be applied if the hadoop version supports sub-directories for tables/partitions. It was added by MAPREDUCE-1501 @@ -494,9 +500,9 @@ hive.map.groupby.sorted false - If the bucketing/sorting properties of the table exactly match the grouping key, whether to + If the bucketing/sorting properties of the table exactly match the grouping key, whether to perform the group by in the mapper by using BucketizedHiveInputFormat. The only downside to this - is that it limits the number of mappers to the number of files. + is that it limits the number of mappers to the number of files. @@ -531,7 +537,7 @@ The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of processing those keys, store them temporarily in a hdfs directory. In a follow-up map-reduce job, process those skewed keys. The same key need not be skewed for all the tables, and so, - the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a + the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a map-join. @@ -542,8 +548,8 @@ Default directory name used in list bucketing. List bucketing feature will create sub-directory for each skewed-value and a default directory for non-skewed value. This config specifies the default name for the default directory. - Sub-directory is created by list bucketing DML and under partition directory. User doesn't need - to know how to construct the canonical path. It just gives user choice if they want to change + Sub-directory is created by list bucketing DML and under partition directory. User doesn't need + to know how to construct the canonical path. It just gives user choice if they want to change the default directory name. For example, there are 2 skewed column c1 and c2. 2 skewed value: (1,a) and (2,b). subdirectory: /c1=1/c2=a/ @@ -580,7 +586,7 @@ hive.mapred.mode nonstrict - The mode in which the hive operations are being performed. + The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run. They include: Cartesian Product. No partition being picked up for a query. @@ -594,8 +600,8 @@ hive.enforce.bucketmapjoin false If the user asked for bucketed map-side join, and it cannot be performed, - should the query fail or not ? For eg, if the buckets in the tables being joined are - not a multiple of each other, bucketed map-side join cannot be performed, and the + should the query fail or not ? For eg, if the buckets in the tables being joined are + not a multiple of each other, bucketed map-side join cannot be performed, and the query will fail if hive.enforce.bucketmapjoin is set to true. @@ -1074,8 +1080,8 @@ hive.stats.reliable false - Whether queries will fail because stats cannot be collected completely accurately. - If this is set to true, reading/writing from/into a partition may fail becuase the stats + Whether queries will fail because stats cannot be collected completely accurately. + If this is set to true, reading/writing from/into a partition may fail becuase the stats could not be computed accurately. @@ -1253,45 +1259,45 @@ hive.security.authorization.manager org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider the hive client authorization manager class name. - The user defined authorization class should implement interface org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider. + The user defined authorization class should implement interface org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider. hive.security.authenticator.manager org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator - hive client authenticator manager class name. + hive client authenticator manager class name. The user defined authenticator should implement interface org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider. hive.security.authorization.createtable.user.grants - the privileges automatically granted to some users whenever a table gets created. - An example like "userX,userY:select;userZ:create" will grant select privilege to userX and userY, + the privileges automatically granted to some users whenever a table gets created. + An example like "userX,userY:select;userZ:create" will grant select privilege to userX and userY, and grant create privilege to userZ whenever a new table created. hive.security.authorization.createtable.group.grants - the privileges automatically granted to some groups whenever a table gets created. - An example like "groupX,groupY:select;groupZ:create" will grant select privilege to groupX and groupY, + the privileges automatically granted to some groups whenever a table gets created. + An example like "groupX,groupY:select;groupZ:create" will grant select privilege to groupX and groupY, and grant create privilege to groupZ whenever a new table created. hive.security.authorization.createtable.role.grants - the privileges automatically granted to some roles whenever a table gets created. - An example like "roleX,roleY:select;roleZ:create" will grant select privilege to roleX and roleY, + the privileges automatically granted to some roles whenever a table gets created. + An example like "roleX,roleY:select;roleZ:create" will grant select privilege to roleX and roleY, and grant create privilege to roleZ whenever a new table created. hive.security.authorization.createtable.owner.grants - the privileges automatically granted to the owner whenever a table gets created. + the privileges automatically granted to the owner whenever a table gets created. An example like "select,drop" will grant select and drop privilege to the owner of the table @@ -1299,7 +1305,7 @@ hive.metastore.authorization.storage.checks false Should the metastore do authorization checks against the underlying storage - for operations like drop-partition (disallow the drop-partition if the user in + for operations like drop-partition (disallow the drop-partition if the user in question doesn't have permissions to delete the corresponding directory on the storage). @@ -1313,7 +1319,7 @@ hive.index.compact.file.ignore.hdfs false - True the hdfs location stored in the index file will be igbored at runtime. + True the hdfs location stored in the index file will be igbored at runtime. If the data got moved or the name of the cluster got changed, the index data should still be usable. @@ -1357,7 +1363,7 @@ hive.lock.mapred.only.operation false - This param is to control whether or not only do lock on queries + This param is to control whether or not only do lock on queries that need to execute at least one mapred job. @@ -1391,7 +1397,7 @@ hive.rework.mapredwork false - should rework the mapred work or not. + should rework the mapred work or not. This is first introduced by SymlinkTextInputFormat to replace symlink files with real paths at compile time. @@ -1399,9 +1405,9 @@ hive.exec.concatenate.check.index true If this sets to true, hive will throw error when doing - 'alter table tbl_name [partSpec] concatenate' on a table/partition - that has indexes on it. The reason the user want to set this to true - is because it can help user to avoid handling all index drop, recreation, + 'alter table tbl_name [partSpec] concatenate' on a table/partition + that has indexes on it. The reason the user want to set this to true + is because it can help user to avoid handling all index drop, recreation, rebuild work. This is very helpful for tables with thousands of partitions. @@ -1416,14 +1422,14 @@ hive.io.exception.handlers A list of io exception handler class names. This is used - to construct a list exception handlers to handle exceptions thrown + to construct a list exception handlers to handle exceptions thrown by record readers hive.autogen.columnalias.prefix.label _c - String used as a prefix when auto generating column alias. + String used as a prefix when auto generating column alias. By default the prefix label will be appended with a column position number to form the column alias. Auto generation would happen if an aggregate function is used in a select clause without an explicit alias. @@ -1454,15 +1460,15 @@ hive.insert.into.multilevel.dirs false - Where to insert into multilevel directories like + Where to insert into multilevel directories like "insert directory '/HIVEFT25686/chinna/' from table" hive.warehouse.subdir.inherit.perms false - Set this to true if the the table directories should inherit the - permission of the warehouse or database directory instead of being created + Set this to true if the the table directories should inherit the + permission of the warehouse or database directory instead of being created with the permissions derived from dfs umask @@ -1515,7 +1521,7 @@ hive.multi.insert.move.tasks.share.dependencies false - If this is set all move tasks for tables/partitions (not directories) at the end of a + If this is set all move tasks for tables/partitions (not directories) at the end of a multi-insert query will only begin once the dependencies for all these move tasks have been met. Advantages: If concurrency is enabled, the locks will only be released once the query has Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java (revision 1413432) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartitionRemote.java (working copy) @@ -42,7 +42,7 @@ t.setDaemon(true); t.start(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:29111"); - hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); Thread.sleep(30000); } } Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java (revision 1413432) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java (working copy) @@ -99,7 +99,7 @@ hiveConf = new HiveConf(this.getClass()); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); - hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java (revision 1413432) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java (working copy) @@ -66,7 +66,7 @@ Thread.sleep(40000); hiveConf = new HiveConf(this.getClass()); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); - hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreAuthorization.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreAuthorization.java (revision 1413432) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreAuthorization.java (working copy) @@ -40,7 +40,7 @@ System.setProperty(HiveConf.ConfVars.METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS.varname, "true"); conf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); - conf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3); + conf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); conf.setIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, 60); } Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 1413432) +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy) @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -1029,4 +1030,36 @@ throw new MetaException(rawStoreClassName + " class not found"); } } + + /** + * Create an object of the given class. + * @param theClass + * @param parameterTypes + * an array of parameterTypes for the constructor + * @param initargs + * the list of arguments for the constructor + */ + public static T newInstance(Class theClass, Class[] parameterTypes, + Object[] initargs) { + // Perform some sanity checks on the arguments. + if (parameterTypes.length != initargs.length) { + throw new IllegalArgumentException( + "Number of constructor parameter types doesn't match number of arguments"); + } + for (int i = 0; i < parameterTypes.length; i++) { + Class clazz = parameterTypes[i]; + if (!(clazz.isInstance(initargs[i]))) { + throw new IllegalArgumentException("Object : " + initargs[i] + + " is not an instance of " + clazz); + } + } + + try { + Constructor meth = theClass.getDeclaredConstructor(parameterTypes); + meth.setAccessible(true); + return meth.newInstance(initargs); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate " + theClass.getName(), e); + } + } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (revision 1413432) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (working copy) @@ -33,6 +33,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import javax.security.auth.login.LoginException; @@ -122,7 +123,7 @@ } // get the number retries - retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTRETRIES); + retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); retryDelaySeconds = conf.getIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY); // user wants file store based configuration @@ -163,6 +164,35 @@ } /** + * Swaps the first element of the metastoreUris array with a random element from the + * remainder of the array. + */ + private void promoteRandomMetaStoreURI() { + if (metastoreUris.length <= 1) { + return; + } + Random rng = new Random(); + int index = rng.nextInt(metastoreUris.length - 1) + 1; + URI tmp = metastoreUris[0]; + metastoreUris[0] = metastoreUris[index]; + metastoreUris[index] = tmp; + } + + public void reconnect() throws MetaException { + if (localMetaStore) { + // For direct DB connections we don't yet support reestablishing connections. + throw new MetaException("For direct MetaStore DB connections, we don't support retries" + + " at the client level."); + } else { + // Swap the first element of the metastoreUris[] with a random element from the rest + // of the array. Rationale being that this method will generally be called when the default + // connection has died and the default connection is likely to be the first array element. + promoteRandomMetaStoreURI(); + open(); + } + } + + /** * @param dbname * @param tbl_name * @param new_tbl @@ -196,113 +226,104 @@ } private void open() throws MetaException { - for (URI store : metastoreUris) { - LOG.info("Trying to connect to HiveMetaStore with URI " + store); - try { - openStore(store); - } catch (MetaException e) { - LOG.error("Unable to connect to HiveMetaStore with URI " + store, e); - } - if (isConnected) { - LOG.info("Connected to HiveMetaStore with URI " + store); - break; - } - } - if (!isConnected) { - throw new MetaException("Could not connect to HiveMetaStore using any of the provided URIs: " - + Arrays.asList(metastoreUris)); - } - } - - private void openStore(URI store) throws MetaException { - isConnected = false; TTransportException tte = null; HadoopShims shim = ShimLoader.getHadoopShims(); boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); - transport = new TSocket(store.getHost(), store.getPort()); int clientSocketTimeout = conf.getIntVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT); - transport = new TSocket(store.getHost(), store.getPort(), 1000 * clientSocketTimeout); + for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { + for (URI store : metastoreUris) { + LOG.info("Trying to connect to metastore with URI " + store); + try { + //openStore(store); + transport = new TSocket(store.getHost(), store.getPort(), 1000 * clientSocketTimeout); + if (useSasl) { + // Wrap thrift connection with SASL for secure connection. + try { + HadoopThriftAuthBridge.Client authBridge = + ShimLoader.getHadoopThriftAuthBridge().createClient(); - if (useSasl) { - // Wrap thrift connection with SASL for secure connection. - try { - HadoopThriftAuthBridge.Client authBridge = - ShimLoader.getHadoopThriftAuthBridge().createClient(); + // check if we should use delegation tokens to authenticate + // the call below gets hold of the tokens if they are set up by hadoop + // this should happen on the map/reduce tasks if the client added the + // tokens into hadoop's credential store in the front end during job + // submission. + String tokenSig = conf.get("hive.metastore.token.signature"); + // tokenSig could be null + tokenStrForm = shim.getTokenStrForm(tokenSig); - // check if we should use delegation tokens to authenticate - // the call below gets hold of the tokens if they are set up by hadoop - // this should happen on the map/reduce tasks if the client added the - // tokens into hadoop's credential store in the front end during job - // submission. - String tokenSig = conf.get("hive.metastore.token.signature"); - // tokenSig could be null - tokenStrForm = shim.getTokenStrForm(tokenSig); + if(tokenStrForm != null) { + // authenticate using delegation tokens via the "DIGEST" mechanism + transport = authBridge.createClientTransport(null, store.getHost(), + "DIGEST", tokenStrForm, transport); + } else { + String principalConfig = + conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); + transport = authBridge.createClientTransport( + principalConfig, store.getHost(), "KERBEROS", null, + transport); + } + } catch (IOException ioe) { + LOG.error("Couldn't create client transport", ioe); + throw new MetaException(ioe.toString()); + } + } else if (useFramedTransport) { + transport = new TFramedTransport(transport); + } - if(tokenStrForm != null) { - // authenticate using delegation tokens via the "DIGEST" mechanism - transport = authBridge.createClientTransport(null, store.getHost(), - "DIGEST", tokenStrForm, transport); - } else { - String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); - transport = authBridge.createClientTransport( - principalConfig, store.getHost(), "KERBEROS", null, - transport); + client = new ThriftHiveMetastore.Client(new TBinaryProtocol(transport)); + try { + transport.open(); + isConnected = true; + } catch (TTransportException e) { + tte = e; + if (LOG.isDebugEnabled()) { + LOG.warn("Failed to connect to the MetaStore Server...", e); + } else { + // Don't print full exception trace if DEBUG is not on. + LOG.warn("Failed to connect to the MetaStore Server..."); + } + } + + if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){ + // Call set_ugi, only in unsecure mode. + try { + UserGroupInformation ugi = shim.getUGIForConf(conf); + client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); + } catch (LoginException e) { + LOG.warn("Failed to do login. set_ugi() is not successful, " + + "Continuing without it.", e); + } catch (IOException e) { + LOG.warn("Failed to find ugi of client set_ugi() is not successful, " + + "Continuing without it.", e); + } catch (TException e) { + LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " + + "Continuing without it.", e); + } + } + } catch (MetaException e) { + LOG.error("Unable to connect to metastore with URI " + store + + " in attempt " + attempt, e); } - } catch (IOException ioe) { - LOG.error("Couldn't create client transport", ioe); - throw new MetaException(ioe.toString()); + if (isConnected) { + break; + } } - } else if (useFramedTransport) { - transport = new TFramedTransport(transport); - } - - client = new ThriftHiveMetastore.Client(new TBinaryProtocol(transport)); - - for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { - if (attempt > 0 && retryDelaySeconds > 0) { + // Wait before launching the next round of connection retries. + if (retryDelaySeconds > 0) { try { LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt."); Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } - - try { - transport.open(); - isConnected = true; - } catch (TTransportException e) { - tte = e; - if (LOG.isDebugEnabled()) { - LOG.warn("Failed to connect to the MetaStore Server...", e); - } else { - // Don't print full exception trace if DEBUG is not on. - LOG.warn("Failed to connect to the MetaStore Server..."); - } - } } if (!isConnected) { - throw new MetaException("Could not connect to the MetaStore server! Caused by: " + - StringUtils.stringifyException(tte)); + throw new MetaException("Could not connect to meta store using any of the URIs provided"); } - - if (!useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){ - // Call set_ugi, only in unsecure mode. - try { - UserGroupInformation ugi = shim.getUGIForConf(conf); - client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); - } catch (LoginException e) { - LOG.warn("Failed to do login. set_ugi() is not successful, Continuing without it.", e); - } catch (IOException e) { - LOG.warn("Failed to find ugi of client set_ugi() is not successful, " + - "Continuing without it.", e); - } catch (TException e) { - LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " + - "Continuing without it.", e); - } - } + LOG.info("Connected to metastore."); } public String getTokenStrForm() throws IOException { Index: metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (revision 1413432) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (working copy) @@ -118,15 +118,14 @@ if (e.getCause() instanceof javax.jdo.JDOException) { // Due to reflection, the jdo exception is wrapped in // invocationTargetException - caughtException = e; + caughtException = (javax.jdo.JDOException) e.getCause(); } - else { + else throw e.getCause(); - } } if (retryCount >= retryLimit) { - throw caughtException; + throw caughtException; } assert (retryInterval >= 0); Index: metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java (working copy) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.transport.TTransportException; + +public class RetryingMetaStoreClient implements InvocationHandler { + + private static final Log LOG = LogFactory.getLog(RetryingMetaStoreClient.class.getName()); + + private final IMetaStoreClient base; + private final HiveConf hiveConf; + private final int retryLimit; + private final int retryDelaySeconds; + + protected RetryingMetaStoreClient(HiveConf hiveConf, HiveMetaHookLoader hookLoader, + Class msClientClass) throws MetaException { + this.hiveConf = hiveConf; + this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); + this.retryDelaySeconds = + hiveConf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY); + this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(msClientClass, new Class[] { + HiveConf.class, HiveMetaHookLoader.class}, new Object[] {hiveConf, hookLoader}); + } + + public static IMetaStoreClient getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader, + String mscClassName) throws MetaException { + + Class baseClass = (Class) + MetaStoreUtils.getClass(mscClassName); + + RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, hookLoader, baseClass); + + return (IMetaStoreClient) Proxy.newProxyInstance(RetryingMetaStoreClient.class.getClassLoader(), + baseClass.getInterfaces(), handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Object ret = null; + int retriesMade = 0; + TException caughtException = null; + while (true) { + try { + ret = method.invoke(base, args); + break; + } catch (UndeclaredThrowableException e) { + throw e.getCause(); + } catch (InvocationTargetException e) { + if ((e.getCause() instanceof TApplicationException) || + (e.getCause() instanceof TProtocolException) || + (e.getCause() instanceof TTransportException)) { + caughtException = (TException) e.getCause(); + } else if ((e.getCause() instanceof MetaException) && + e.getCause().getMessage().matches("JDO[a-zA-Z]*Exception")) { + caughtException = (MetaException) e.getCause(); + } else { + throw e.getCause(); + } + } + + if (retriesMade >= retryLimit) { + throw caughtException; + } + retriesMade++; + LOG.warn("MetaStoreClient lost connection. Attempting to reconnect.", + caughtException.getCause()); + Thread.sleep(retryDelaySeconds * 1000); + base.reconnect(); + } + return ret; + } +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (revision 1413432) +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (working copy) @@ -53,6 +53,11 @@ */ public interface IMetaStoreClient { + /** + * Tries to reconnect this MetaStoreClient to the MetaStore. + */ + public void reconnect() throws MetaException; + public void close(); /** Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1413432) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -90,7 +90,8 @@ HiveConf.ConfVars.METASTOREDIRECTORY, HiveConf.ConfVars.METASTOREWAREHOUSE, HiveConf.ConfVars.METASTOREURIS, - HiveConf.ConfVars.METASTORETHRIFTRETRIES, + HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, + HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES, HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, HiveConf.ConfVars.METASTOREPWD, @@ -242,7 +243,10 @@ METASTOREWAREHOUSE("hive.metastore.warehouse.dir", "/user/hive/warehouse"), METASTOREURIS("hive.metastore.uris", ""), // Number of times to retry a connection to a Thrift metastore server - METASTORETHRIFTRETRIES("hive.metastore.connect.retries", 5), + METASTORETHRIFTCONNECTIONRETRIES("hive.metastore.connect.retries", 3), + // Number of times to retry a Thrift metastore call upon failure + METASTORETHRIFTFAILURERETRIES("hive.metastore.failure.retries", 3), + // Number of seconds the client should wait between connection attempts METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", 1), // Socket timeout for the client connection (in seconds) Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1413432) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -2132,7 +2133,8 @@ } } }; - return new HiveMetaStoreClient(conf, hookLoader); + return RetryingMetaStoreClient.getProxy(conf, hookLoader, + HiveMetaStoreClient.class.getName()); } /**