diff --git metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java index a6545a9..84e20b6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -215,8 +215,8 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl throw caughtException; } retriesMade++; - LOG.warn("MetaStoreClient lost connection. Attempting to reconnect.", - caughtException); + LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " + + retryLimit + ") after " + retryDelaySeconds + "s.", caughtException); Thread.sleep(retryDelaySeconds * 1000); } return ret; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 529e64c..c3725ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +42,10 @@ /** * An implementation of HiveLockManager for use with {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager}. - * Note, this lock manager is not meant to stand alone. It cannot be used - * without the DbTxnManager. + * Note, this lock manager is not meant to be stand alone. It cannot be used without the DbTxnManager. + * See {@link DbTxnManager#getMS()} for important concurrency/metastore access notes. */ -public class DbLockManager implements HiveLockManager{ +public final class DbLockManager implements HiveLockManager{ static final private String CLASS_NAME = DbLockManager.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); @@ -54,14 +53,14 @@ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set locks; - private SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; + private final DbTxnManager txnManager; - DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) { + DbLockManager(HiveConf conf, DbTxnManager txnManager) { locks = new HashSet<>(); - this.client = client; this.conf = conf; + this.txnManager = txnManager; } @Override @@ -100,7 +99,7 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List heartbeatTask = null; private Runnable shutdownRunner = null; private static final int SHUTDOWN_HOOK_PRIORITY = 0; - - // SynchronizedMetaStoreClient object per heartbeater thread. - private static ThreadLocal threadLocalMSClient = - new ThreadLocal() { - - @Override - protected SynchronizedMetaStoreClient initialValue() { - return null; - } - - @Override - public synchronized void remove() { - SynchronizedMetaStoreClient client = this.get(); - if (client != null) { - client.close(); - } - super.remove(); - } - }; - - private static AtomicInteger heartbeaterMSClientCount = new AtomicInteger(0); - private static int heartbeaterThreadPoolSize = 0; - - private static SynchronizedMetaStoreClient getThreadLocalMSClient() { - return threadLocalMSClient.get(); + /** + * We do this on every call to make sure TM uses same MS connection as is used by the caller (Driver, + * SemanticAnalyzer, etc). {@code Hive} instances are cached using ThreadLocal and + * {@link IMetaStoreClient} is cached within {@code Hive} with additional logic. Futhermore, this + * ensures that multiple threads are not sharing the same Thrift client (which could happen + * if we had cached {@link IMetaStoreClient} here. + * + * ThreadLocal gets cleaned up automatically when its thread goes away + * https://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html. This is especially + * important for threads created by {@link #heartbeatExecutorService} threads. + * + * Embedded {@link DbLockManager} follows the same logic. + * @return IMetaStoreClient + * @throws LockException on any errors + */ + IMetaStoreClient getMS() throws LockException { + try { + return Hive.get(conf).getMSC(); + } + catch(HiveException|MetaException e) { + String msg = "Unable to reach Hive Metastore: " + e.getMessage(); + LOG.error(msg, e); + throw new LockException(e); + } } - DbTxnManager() { shutdownRunner = new Runnable() { @Override @@ -148,7 +148,7 @@ long openTxn(Context ctx, String user, long delay) throws LockException { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); } try { - txnId = client.openTxn(user); + txnId = getMS().openTxn(user); statementId = 0; LOG.debug("Opened " + JavaUtils.txnIdToString(txnId)); ctx.setHeartbeater(startHeartbeat(delay)); @@ -158,11 +158,15 @@ long openTxn(Context ctx, String user, long delay) throws LockException { } } + /** + * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will + * be read by a different threads that one writing it, thus it's {@code volatile} + */ @Override public HiveLockManager getLockManager() throws LockException { init(); if (lockMgr == null) { - lockMgr = new DbLockManager(client, conf); + lockMgr = new DbLockManager(conf, this); } return lockMgr; } @@ -388,7 +392,7 @@ public void commitTxn() throws LockException { lockMgr.clearLocalLockRecords(); stopHeartbeat(); LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId)); - client.commitTxn(txnId); + getMS().commitTxn(txnId); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); @@ -414,7 +418,7 @@ public void rollbackTxn() throws LockException { lockMgr.clearLocalLockRecords(); stopHeartbeat(); LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId)); - client.rollbackTxn(txnId); + getMS().rollbackTxn(txnId); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); @@ -460,29 +464,11 @@ public void heartbeat() throws LockException { for (HiveLock lock : locks) { long lockId = ((DbLockManager.DbHiveLock)lock).lockId; try { - // Get the threadlocal metastore client for the heartbeat calls. - SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient(); - if (heartbeaterClient == null) { - Hive db; - try { - db = Hive.get(conf); - // Create a new threadlocal synchronized metastore client for use in heartbeater threads. - // This makes the concurrent use of heartbeat thread safe, and won't cause transaction - // abort due to a long metastore client call blocking the heartbeat call. - heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC()); - threadLocalMSClient.set(heartbeaterClient); - } catch (HiveException e) { - LOG.error("Unable to create new metastore client for heartbeating", e); - throw new LockException(e); - } - // Increment the threadlocal metastore client count - if (heartbeaterMSClientCount.incrementAndGet() >= heartbeaterThreadPoolSize) { - LOG.warn("The number of heartbeater metastore clients - + " - + heartbeaterMSClientCount.get() + ", has exceeded the max limit - " - + heartbeaterThreadPoolSize); - } - } - heartbeaterClient.heartbeat(txnId, lockId); + /** + * This relies on the ThreadLocal caching, which implies that the same {@link IMetaStoreClient}, + * in particular the Thrift connection it uses is never shared between threads + */ + getMS().heartbeat(txnId, lockId); } catch (NoSuchLockException e) { LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId)); throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); @@ -554,7 +540,7 @@ private void stopHeartbeat() throws LockException { public ValidTxnList getValidTxns() throws LockException { init(); try { - return client.getValidTxns(txnId); + return getMS().getValidTxns(txnId); } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); @@ -598,21 +584,10 @@ protected void destruct() { } private void init() throws LockException { - if (client == null) { - if (conf == null) { - throw new RuntimeException("Must call setHiveConf before any other " + - "methods."); - } - try { - Hive db = Hive.get(conf); - client = new SynchronizedMetaStoreClient(db.getMSC()); - initHeartbeatExecutorService(); - } catch (MetaException e) { - throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); - } catch (HiveException e) { - throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); - } + if (conf == null) { + throw new RuntimeException("Must call setHiveConf before any other methods."); } + initHeartbeatExecutorService(); } private synchronized void initHeartbeatExecutorService() { @@ -620,10 +595,9 @@ private synchronized void initHeartbeatExecutorService() { && !heartbeatExecutorService.isTerminated()) { return; } - heartbeaterThreadPoolSize = - conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); heartbeatExecutorService = - Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() { + Executors.newScheduledThreadPool( + conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE), new ThreadFactory() { private final AtomicInteger threadCounter = new AtomicInteger(); @Override @@ -635,22 +609,10 @@ public Thread newThread(Runnable r) { } public static class HeartbeaterThread extends Thread { - public HeartbeaterThread(Runnable target, String name) { + HeartbeaterThread(Runnable target, String name) { super(target, name); setDaemon(true); } - - @Override - /** - * We're overriding finalize so that we can do an orderly cleanup of resources held by - * the threadlocal metastore client. - */ - protected void finalize() throws Throwable { - threadLocalMSClient.remove(); - // Adjust the metastore client count - DbTxnManager.heartbeaterMSClientCount.decrementAndGet(); - super.finalize(); - } } @Override