diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 017f565..280ebb4 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -277,6 +277,7 @@ public String toString() { private static class ConnectionImpl implements StreamingConnection { private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final HiveEndPoint endPt; private final UserGroupInformation ugi; private final String username; @@ -309,6 +310,9 @@ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, } this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode); checkEndPoint(endPoint, msClient); if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); @@ -366,6 +370,7 @@ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) public void close() { if (ugi==null) { msClient.close(); + heartbeaterMSClient.close(); return; } try { @@ -374,6 +379,7 @@ public void close() { @Override public Void run() throws Exception { msClient.close(); + heartbeaterMSClient.close(); return null; } } ); @@ -429,8 +435,8 @@ public TransactionBatch run() throws StreamingException, InterruptedException { private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { - return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter, agentInfo); + return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient, + heartbeaterMSClient, recordWriter, agentInfo); } @@ -541,14 +547,14 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo + endPoint.metaStoreUri + ". " + e.getMessage(), e); } } - - } // class ConnectionImpl + private static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; private final HiveEndPoint endPt; private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final RecordWriter recordWriter; private final List txnIds; @@ -572,9 +578,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, - final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, - String agentInfo) - throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + final int numTxns, final IMetaStoreClient msClient, + final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo) + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { @@ -588,6 +594,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.ugi = ugi; this.endPt = endPt; this.msClient = msClient; + this.heartbeaterMSClient = msClient; this.recordWriter = recordWriter; this.agentInfo = agentInfo; @@ -937,7 +944,7 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { Long first = txnIds.get(currentTxnIndex); Long last = txnIds.get(txnIds.size()-1); try { - HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last); + HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last); if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch()); } @@ -1045,5 +1052,4 @@ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean va conf.setBoolVar(var, value); } - } // class HiveEndPoint diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 5b6f20c..4c93669 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -85,6 +85,35 @@ private Runnable shutdownRunner = null; private static final int SHUTDOWN_HOOK_PRIORITY = 0; + // SynchronizedMetaStoreClient object per heartbeater thread. + private static ThreadLocal threadLocalMSClient = + new ThreadLocal() { + + @Override + protected SynchronizedMetaStoreClient initialValue() { + return null; + } + + @Override + public synchronized void remove() { + SynchronizedMetaStoreClient client = this.get(); + if (client != null) { + client.close(); + } + super.remove(); + } + }; + + private static int heartbeaterMSClientCount = 0; + private static Object heartBeaterClientCountLock = new Object(); + private int heartbeaterThreadPoolSize = 0; + + + private static SynchronizedMetaStoreClient getThreadLocalMSClient() { + return threadLocalMSClient.get(); + } + + DbTxnManager() { shutdownRunner = new Runnable() { @Override @@ -324,7 +353,6 @@ private static Table getTable(WriteEntity we) { return t; } /** - * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat */ @VisibleForTesting @@ -418,7 +446,30 @@ public void heartbeat() throws LockException { for (HiveLock lock : locks) { long lockId = ((DbLockManager.DbHiveLock)lock).lockId; try { - client.heartbeat(txnId, lockId); + // Get the threadlocal metastore client for the heartbeat calls. + SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient(); + if (heartbeaterClient == null) { + Hive db; + try { + db = Hive.get(conf); + // Create a new threadlocal synchronized metastore client for use in hearbeater threads. + // This makes the concurrent use of heartbeat thread safe, and won't cause transaction + // abort due to a long metastore client call blocking the heartbeat call. + heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC()); + threadLocalMSClient.set(heartbeaterClient); + } catch (HiveException e) { + LOG.error("Unable to create new metastore client for heartbeating", e); + throw new LockException(e); + } + // Increment the threadlocal metastore client count + synchronized (heartBeaterClientCountLock) { + heartbeaterMSClientCount++; + if (heartbeaterMSClientCount >= heartbeaterThreadPoolSize) { + LOG.warn("The number of hearbeater metastore client has exceeded the max limit."); + } + } + } + heartbeaterClient.heartbeat(txnId, lockId); } catch (NoSuchLockException e) { LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId)); throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); @@ -552,24 +603,44 @@ private void init() throws LockException { } private synchronized void initHeartbeatExecutorService() { - if (heartbeatExecutorService != null - && !heartbeatExecutorService.isShutdown() + if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown() && !heartbeatExecutorService.isTerminated()) { return; } - - int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); + heartbeaterThreadPoolSize = + conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); heartbeatExecutorService = - Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() { + Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() { private final AtomicInteger threadCounter = new AtomicInteger(); + @Override public Thread newThread(Runnable r) { - return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement()); + return new HeartbeaterThread(r, "Heartbeater-" + threadCounter.getAndIncrement()); } }); ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true); } + public static class HeartbeaterThread extends Thread { + public HeartbeaterThread(Runnable target, String name) { + super(target, name); + } + + @Override + /** + * We're overriding finalize so that we can do an orderly cleanup of resources held by + * the threadlocal metastore client. + */ + protected void finalize() throws Throwable { + threadLocalMSClient.remove(); + // Adjust the metastore client count + synchronized (DbTxnManager.heartBeaterClientCountLock) { + DbTxnManager.heartbeaterMSClientCount--; + } + super.finalize(); + } + } + @Override public boolean isTxnOpen() { return txnId > 0; @@ -601,12 +672,11 @@ private static long getHeartbeatInterval(Configuration conf) throws LockExceptio public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; private HiveConf conf; - LockException lockException; + public LockException getLockException() { return lockException; } - /** * * @param txnMgr transaction manager for this operation @@ -627,7 +697,6 @@ public void run() { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true"); } - LOG.debug("Heartbeating..."); txnMgr.heartbeat(); } catch (LockException e) { @@ -681,5 +750,9 @@ synchronized void unlock(long lockid) throws TException { synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { return client.showLocks(showLocksRequest); } + + synchronized void close() { + client.close(); + } } }