diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 017f565..452cb15 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -277,6 +277,7 @@ public String toString() { private static class ConnectionImpl implements StreamingConnection { private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final HiveEndPoint endPt; private final UserGroupInformation ugi; private final String username; @@ -309,6 +310,9 @@ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, } this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); + // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are + // isolated from the other transaction related RPC calls. + this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode); checkEndPoint(endPoint, msClient); if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); @@ -366,6 +370,7 @@ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) public void close() { if (ugi==null) { msClient.close(); + heartbeaterMSClient.close(); return; } try { @@ -374,6 +379,7 @@ public void close() { @Override public Void run() throws Exception { msClient.close(); + heartbeaterMSClient.close(); return null; } } ); @@ -429,8 +435,8 @@ public TransactionBatch run() throws StreamingException, InterruptedException { private TransactionBatch fetchTransactionBatchImpl(int numTransactions, RecordWriter recordWriter) throws StreamingException, TransactionBatchUnAvailable, InterruptedException { - return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient - , recordWriter, agentInfo); + return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient, + heartbeaterMSClient, recordWriter, agentInfo); } @@ -541,14 +547,14 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo + endPoint.metaStoreUri + ". " + e.getMessage(), e); } } - - } // class ConnectionImpl + private static class TransactionBatchImpl implements TransactionBatch { private final String username; private final UserGroupInformation ugi; private final HiveEndPoint endPt; private final IMetaStoreClient msClient; + private final IMetaStoreClient heartbeaterMSClient; private final RecordWriter recordWriter; private final List txnIds; @@ -572,9 +578,9 @@ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveCo * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch */ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt, - final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, - String agentInfo) - throws StreamingException, TransactionBatchUnAvailable, InterruptedException { + final int numTxns, final IMetaStoreClient msClient, + final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo) + throws StreamingException, TransactionBatchUnAvailable, InterruptedException { boolean success = false; try { if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) { @@ -588,6 +594,7 @@ private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEn this.ugi = ugi; this.endPt = endPt; this.msClient = msClient; + this.heartbeaterMSClient = heartbeaterMSClient; this.recordWriter = recordWriter; this.agentInfo = agentInfo; @@ -937,7 +944,7 @@ public void heartbeat() throws StreamingException, HeartBeatFailure { Long first = txnIds.get(currentTxnIndex); Long last = txnIds.get(txnIds.size()-1); try { - HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last); + HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last); if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) { throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch()); } @@ -1045,5 +1052,4 @@ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean va conf.setBoolVar(var, value); } - } // class HiveEndPoint diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 60674eb..f72c379 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -291,6 +291,9 @@ public static int countQueryAgent(String countQuery) throws Exception { } @VisibleForTesting public static String queryToString(String query) throws Exception { + return queryToString(query, true); + } + public static String queryToString(String query, boolean includeHeader) throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; @@ -300,10 +303,12 @@ public static String queryToString(String query) throws Exception { stmt = conn.createStatement(); rs = stmt.executeQuery(query); ResultSetMetaData rsmd = rs.getMetaData(); - for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { - sb.append(rsmd.getColumnName(colPos)).append(" "); + if(includeHeader) { + for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rsmd.getColumnName(colPos)).append(" "); + } + sb.append('\n'); } - sb.append('\n'); while(rs.next()) { for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { sb.append(rs.getObject(colPos)).append(" "); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 5b6f20c..7b0369d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -85,6 +85,32 @@ private Runnable shutdownRunner = null; private static final int SHUTDOWN_HOOK_PRIORITY = 0; + // SynchronizedMetaStoreClient object per heartbeater thread. + private static ThreadLocal threadLocalMSClient = + new ThreadLocal() { + + @Override + protected SynchronizedMetaStoreClient initialValue() { + return null; + } + + @Override + public synchronized void remove() { + SynchronizedMetaStoreClient client = this.get(); + if (client != null) { + client.close(); + } + super.remove(); + } + }; + + private static AtomicInteger heartbeaterMSClientCount = new AtomicInteger(0); + private int heartbeaterThreadPoolSize = 0; + + private static SynchronizedMetaStoreClient getThreadLocalMSClient() { + return threadLocalMSClient.get(); + } + DbTxnManager() { shutdownRunner = new Runnable() { @Override @@ -324,7 +350,6 @@ private static Table getTable(WriteEntity we) { return t; } /** - * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat */ @VisibleForTesting @@ -418,7 +443,29 @@ public void heartbeat() throws LockException { for (HiveLock lock : locks) { long lockId = ((DbLockManager.DbHiveLock)lock).lockId; try { - client.heartbeat(txnId, lockId); + // Get the threadlocal metastore client for the heartbeat calls. + SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient(); + if (heartbeaterClient == null) { + Hive db; + try { + db = Hive.get(conf); + // Create a new threadlocal synchronized metastore client for use in hearbeater threads. + // This makes the concurrent use of heartbeat thread safe, and won't cause transaction + // abort due to a long metastore client call blocking the heartbeat call. + heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC()); + threadLocalMSClient.set(heartbeaterClient); + } catch (HiveException e) { + LOG.error("Unable to create new metastore client for heartbeating", e); + throw new LockException(e); + } + // Increment the threadlocal metastore client count + if (heartbeaterMSClientCount.incrementAndGet() >= heartbeaterThreadPoolSize) { + LOG.warn("The number of hearbeater metastore clients - + " + + heartbeaterMSClientCount.get() + ", has exceeded the max limit - " + + heartbeaterThreadPoolSize); + } + } + heartbeaterClient.heartbeat(txnId, lockId); } catch (NoSuchLockException e) { LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId)); throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId)); @@ -445,7 +492,7 @@ public void heartbeat() throws LockException { private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; - Heartbeater heartbeater = new Heartbeater(this, conf); + Heartbeater heartbeater = new Heartbeater(this, conf, queryId); // For negative testing purpose.. if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { initialDelay = 0; @@ -552,24 +599,42 @@ private void init() throws LockException { } private synchronized void initHeartbeatExecutorService() { - if (heartbeatExecutorService != null - && !heartbeatExecutorService.isShutdown() + if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown() && !heartbeatExecutorService.isTerminated()) { return; } - - int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); + heartbeaterThreadPoolSize = + conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); heartbeatExecutorService = - Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() { + Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() { private final AtomicInteger threadCounter = new AtomicInteger(); + @Override public Thread newThread(Runnable r) { - return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement()); + return new HeartbeaterThread(r, "Heartbeater-" + threadCounter.getAndIncrement()); } }); ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true); } + public static class HeartbeaterThread extends Thread { + public HeartbeaterThread(Runnable target, String name) { + super(target, name); + } + + @Override + /** + * We're overriding finalize so that we can do an orderly cleanup of resources held by + * the threadlocal metastore client. + */ + protected void finalize() throws Throwable { + threadLocalMSClient.remove(); + // Adjust the metastore client count + DbTxnManager.heartbeaterMSClientCount.decrementAndGet(); + super.finalize(); + } + } + @Override public boolean isTxnOpen() { return txnId > 0; @@ -601,20 +666,21 @@ private static long getHeartbeatInterval(Configuration conf) throws LockExceptio public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; private HiveConf conf; - LockException lockException; + private final String queryId; + public LockException getLockException() { return lockException; } - /** * * @param txnMgr transaction manager for this operation */ - Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf, String queryId) { this.txnMgr = txnMgr; this.conf = conf; lockException = null; + this.queryId = queryId; } /** @@ -627,12 +693,16 @@ public void run() { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true"); } - LOG.debug("Heartbeating..."); txnMgr.heartbeat(); } catch (LockException e) { - LOG.error("Failed trying to heartbeat " + e.getMessage()); + LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + e.getMessage()); lockException = e; + } catch (Throwable t) { + LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + t.getMessage(), t); + lockException = + new LockException("Failed trying to heartbeat queryId=" + queryId + ": " + + t.getMessage(), t); } } } @@ -681,5 +751,9 @@ synchronized void unlock(long lockid) throws TException { synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { return client.showLocks(showLocksRequest); } + + synchronized void close() { + client.close(); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 388b7c5..d995e85 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -21,10 +21,13 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -428,26 +431,69 @@ public void testTimeOutReaper() throws Exception { Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1")); //now test that we don't timeout locks we should not - hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 10, TimeUnit.MINUTES); + //heartbeater should be running in the background every 1/2 second + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); + //hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true); runStatementOnDriver("start transaction"); runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17"); + pause(750); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + + //since there is txn open, we are heartbeating the txn not individual locks + GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo(); + Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size()); + TxnInfo txnInfo = null; + for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) { + if(ti.getState() == TxnState.OPEN) { + txnInfo = ti; + break; + } + } + Assert.assertNotNull(txnInfo); + Assert.assertEquals(2, txnInfo.getId()); + Assert.assertEquals(TxnState.OPEN, txnInfo.getState()); + String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); + String[] vals = s.split("\\s+"); + Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); + long lastHeartbeat = Long.parseLong(vals[1]); + //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we + //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec + Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat); + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); + pause(750); TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf); + pause(750); slr = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); - Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size()); + pause(750); TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf); slr = txnHandler.showLocks(new ShowLocksRequest()); + Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0)); - Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size()); + + //should've done several heartbeats + s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); + vals = s.split("\\s+"); + Assert.assertEquals("Didn't get expected timestamps", 2, vals.length); + Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")", + lastHeartbeat < Long.parseLong(vals[1])); runStatementOnDriver("rollback"); slr = txnHandler.showLocks(new ShowLocksRequest()); Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size()); } + private static void pause(int timeMillis) { + try { + Thread.sleep(timeMillis); + } + catch (InterruptedException e) { + } + } /** * takes raw data and turns it into a string as if from Driver.getResults()