diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 017f565..280ebb4 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -277,6 +277,7 @@ public String toString() { private static class ConnectionImpl implements StreamingConnection { private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final HiveEndPoint endPt; private final UserGroupInformation ugi; private final String username; @@ -309,6 +310,9 @@ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, } this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode); checkEndPoint(endPoint, msClient); if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); @@ -366,6 +370,7 @@ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) public void close() { if (ugi==null) { msClient.close(); + heartbeaterMSClient.close(); return; } try { @@ -374,6 +379,7 @@ public void close() { @Override public Void run() throws Exception { msClient.close(); + heartbeaterMSClient.close(); return null; } } ); @@ -429,8 +435,8 @@ public TransactionBatch run() throws StreamingException, InterruptedException { private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { - return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter, agentInfo); + return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient, + heartbeaterMSClient, recordWriter, agentInfo); } @@ -541,14 +547,14 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo + endPoint.metaStoreUri + ". " + e.getMessage(), e); } } - - } // class ConnectionImpl + private static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; private final HiveEndPoint endPt; private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final RecordWriter recordWriter; private final List txnIds; @@ -572,9 +578,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, - final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, - String agentInfo) - throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + final int numTxns, final IMetaStoreClient msClient, + final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo) + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { @@ -588,6 +594,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.ugi = ugi; this.endPt = endPt; this.msClient = msClient; + this.heartbeaterMSClient = msClient; this.recordWriter = recordWriter; this.agentInfo = agentInfo; @@ -937,7 +944,7 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { Long first = txnIds.get(currentTxnIndex); Long last = txnIds.get(txnIds.size()-1); try { - HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last); + HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last); if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch()); } @@ -1045,5 +1052,4 @@ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean va conf.setBoolVar(var, value); } - } // class HiveEndPoint diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 5b6f20c..27315e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -324,7 +324,6 @@ private static Table getTable(WriteEntity we) { return t; } /** - * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat */ @VisibleForTesting @@ -418,7 +417,13 @@ public void heartbeat() throws LockException { for (HiveLock lock : locks) { long lockId = ((DbLockManager.DbHiveLock)lock).lockId; try { - client.heartbeat(txnId, lockId); + // Try to get the threadlocal metastore client for the heartbeat calls. + // If not set yet (should not happen), heartbeat using DbTxnManager's metastore client + SynchronizedMetaStoreClient heartbeaterClient = Heartbeater.getThreadLocalMSClient(); + if (heartbeaterClient == null) { + heartbeaterClient = client; + } + heartbeaterClient.heartbeat(txnId, lockId); } catch (NoSuchLockException e) { LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId)); throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); @@ -445,7 +450,12 @@ public void heartbeat() throws LockException { private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; - Heartbeater heartbeater = new Heartbeater(this, conf); + Heartbeater heartbeater; + try { + heartbeater = new Heartbeater(this, conf); + } catch (MetaException | HiveException e) { + throw new LockException(e); + } // For negative testing purpose.. if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { initialDelay = 0; @@ -601,19 +611,47 @@ private static long getHeartbeatInterval(Configuration conf) throws LockExceptio public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; private HiveConf conf; - LockException lockException; + // SynchronizedMetaStoreClient object per heartbeater thread. + private static ThreadLocal threadLocalMSClient = + new ThreadLocal() { + + @Override + protected SynchronizedMetaStoreClient initialValue() { + return null; + } + + @Override + public synchronized void remove() { + if (this.get() != null) { + this.get().close(); + } + super.remove(); + } + }; + public LockException getLockException() { return lockException; } + static SynchronizedMetaStoreClient getThreadLocalMSClient() { + return threadLocalMSClient.get(); + } + /** * * @param txnMgr transaction manager for this operation + * @throws HiveException + * @throws MetaException */ - Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf) throws HiveException, MetaException { this.txnMgr = txnMgr; this.conf = conf; + Hive db = Hive.get(conf); + // Create new threadlocal synchronized metastore client object for use in hearbeater threads. + // This makes the concurrent use of heartbeat thread safe, and won't cause transaction + // abort due to a long metastore client call blocking the heartbeat call. + threadLocalMSClient.set(new SynchronizedMetaStoreClient(db.getMSC())); lockException = null; } @@ -627,7 +665,6 @@ public void run() { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true"); } - LOG.debug("Heartbeating..."); txnMgr.heartbeat(); } catch (LockException e) { @@ -640,7 +677,7 @@ public void run() { /** * Synchronized MetaStoreClient wrapper */ - final class SynchronizedMetaStoreClient { + final static class SynchronizedMetaStoreClient { private final IMetaStoreClient client; SynchronizedMetaStoreClient(IMetaStoreClient client) { this.client = client; @@ -681,5 +718,9 @@ synchronized void unlock(long lockid) throws TException { synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { return client.showLocks(showLocksRequest); } + + synchronized void close() { + client.close(); + } } }