diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index bde78e4..85dd340 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -647,13 +647,13 @@ public void testHeartbeat() throws Exception { //todo: this should ideally check Transaction heartbeat as well, but heartbeat //timestamp is not reported yet //GetOpenTxnsInfoResponse txnresp = msClient.showTxns(); - ShowLocksResponse response = msClient.showLocks(); + ShowLocksResponse response = msClient.showLocks(null, null, null, false); Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size()); ShowLocksResponseElement lock = response.getLocks().get(0); long acquiredAt = lock.getAcquiredat(); long heartbeatAt = lock.getLastheartbeat(); txnBatch.heartbeat(); - response = msClient.showLocks(); + response = msClient.showLocks(null, null, null, false); Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size()); lock = response.getLocks().get(0); Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index cdd12ab..e68b536 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -2045,8 +2046,24 @@ public void unlock(long lockid) } @Override - public ShowLocksResponse showLocks() throws TException { - return client.show_locks(new ShowLocksRequest()); + public ShowLocksResponse showLocks(String dbName, String tableName, Map partSpec, + boolean isExtended) throws TException { + ShowLocksRequest showLocksRequest = new ShowLocksRequest(); + showLocksRequest.setDbname(dbName); + showLocksRequest.setTablename(tableName); + if (partSpec != null) { + List keyList = new ArrayList(); + List valList = new ArrayList(); + for (String partKey : partSpec.keySet()) { + String partVal = partSpec.remove(partKey); + keyList.add(partKey); + valList.add(partVal); + } + String partName = FileUtils.makePartName(keyList, valList); + showLocksRequest.setPartname(partName); + } + showLocksRequest.setIsExtended(isExtended); + return client.show_locks(showLocksRequest); } @Override diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 39cf927..651fd55 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1362,10 +1362,16 @@ void unlock(long lockid) /** * Show all currently held and waiting locks. + * @param dbName database name + * @param tableName table name + * @param partSpec partition spec + * @param isExtended whether keyword "EXTENDED" is specified * @return List of currently held and waiting locks. * @throws TException */ - ShowLocksResponse showLocks() throws TException; + ShowLocksResponse showLocks(String dbName, String tableName, Map partSpec, + boolean isExtended) throws TException; + /** * Send a heartbeat to indicate that the client holding these locks (if diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index be3c6de..83cccf6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -866,6 +866,34 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; + + // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. + String dbName = rqst.getDbname(); + String tableName = rqst.getTablename(); + String partName = rqst.getPartname(); + + StringBuilder filter = new StringBuilder(); + if (dbName != null && !dbName.isEmpty()) { + filter.append("hl_db=").append(quoteString(dbName)); + } + if (tableName != null && !tableName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_table=").append(quoteString(tableName)); + } + if (partName != null && !partName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_partition=").append(quoteString(partName)); + } + String whereClause = filter.toString(); + + if (!whereClause.isEmpty()) { + s = s + " where " + whereClause; + } + LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index b26f09d..c0f8e95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2530,6 +2530,14 @@ public int compare(HiveLock o1, HiveLock o2) { return 0; } public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) throws IOException { + List locks = rsp.getLocks(); + if (locks == null || locks.isEmpty()) { + // No lock, no need to write header + os.writeBytes("No lock is found."); + os.write(terminator); + return; + } + // Write a header os.writeBytes("Lock ID"); os.write(separator); @@ -2556,45 +2564,42 @@ public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) thro os.writeBytes("Hostname"); os.write(terminator); - List locks = rsp.getLocks(); - if (locks != null) { - for (ShowLocksResponseElement lock : locks) { - if(lock.isSetLockIdInternal()) { - os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal())); - } - else { - os.writeBytes(Long.toString(lock.getLockid())); - } - os.write(separator); - os.writeBytes(lock.getDbname()); - os.write(separator); - os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); - os.write(separator); - os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); - os.write(separator); - os.writeBytes(lock.getState().toString()); - os.write(separator); - if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not - os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId())); - } - else { - os.writeBytes(" ");//12 chars - try to keep cols aligned - } - os.write(separator); - os.writeBytes(lock.getType().toString()); - os.write(separator); - os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); - os.write(separator); - os.writeBytes(Long.toString(lock.getLastheartbeat())); - os.write(separator); - os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); - os.write(separator); - os.writeBytes(lock.getUser()); - os.write(separator); - os.writeBytes(lock.getHostname()); - os.write(separator); - os.write(terminator); + for (ShowLocksResponseElement lock : locks) { + if(lock.isSetLockIdInternal()) { + os.writeBytes(Long.toString(lock.getLockid()) + "." + Long.toString(lock.getLockIdInternal())); + } + else { + os.writeBytes(Long.toString(lock.getLockid())); + } + os.write(separator); + os.writeBytes(lock.getDbname()); + os.write(separator); + os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); + os.write(separator); + os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); + os.write(separator); + os.writeBytes(lock.getState().toString()); + os.write(separator); + if(lock.isSetBlockedByExtId()) {//both "blockedby" are either there or not + os.writeBytes(Long.toString(lock.getBlockedByExtId()) + "." + Long.toString(lock.getBlockedByIntId())); } + else { + os.writeBytes(" ");//12 chars - try to keep cols aligned + } + os.write(separator); + os.writeBytes(lock.getType().toString()); + os.write(separator); + os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); + os.write(separator); + os.writeBytes(Long.toString(lock.getLastheartbeat())); + os.write(separator); + os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); + os.write(separator); + os.writeBytes(lock.getUser()); + os.write(separator); + os.writeBytes(lock.getHostname()); + os.write(separator); + os.write(terminator); } } private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) @@ -2606,7 +2611,11 @@ private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) } lockMgr = (DbLockManager)lm; - ShowLocksResponse rsp = lockMgr.getLocks(); + String dbName = showLocks.getDbName(); + String tableName = showLocks.getTableName(); + Map partSpec = showLocks.getPartSpec(); + + ShowLocksResponse rsp = lockMgr.getLocks(dbName, tableName, partSpec, false); // write the results in the file DataOutputStream os = getOutputStream(showLocks.getResFile()); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 7fa57d6..ea95f7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -277,8 +278,13 @@ public void releaseLocks(List hiveLocks) { } public ShowLocksResponse getLocks() throws LockException { + return getLocks(null, null, null, false); + } + + public ShowLocksResponse getLocks(String dbName, String tableName, Map partSpec, + boolean isExtended) throws LockException { try { - return client.showLocks(); + return client.showLocks(dbName, tableName, partSpec, isExtended); } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); }