diff --git hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties
new file mode 100644
index 0000000..82684b3
--- /dev/null
+++ hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+hive.log.threshold=ALL
+hive.root.logger=DEBUG,DRFA
+hive.log.dir=/tmp/ekoifman
+hive.log.file=hive.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hive.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshold=${hive.log.threshold}
+
+#
+# Daily Rolling File Appender
+#
+# Use the PidDailyerRollingFileAppend class instead if you want to use separate log files
+# for different CLI session.
+#
+# log4j.appender.DRFA=org.apache.hadoop.hive.ql.log.PidDailyRollingFileAppender
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+
+log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
+log4j.appender.console.encoding=UTF-8
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
+
+
+log4j.category.DataNucleus=ERROR,DRFA
+log4j.category.Datastore=ERROR,DRFA
+log4j.category.Datastore.Schema=ERROR,DRFA
+log4j.category.JPOX.Datastore=ERROR,DRFA
+log4j.category.JPOX.Plugin=ERROR,DRFA
+log4j.category.JPOX.MetaData=ERROR,DRFA
+log4j.category.JPOX.Query=ERROR,DRFA
+log4j.category.JPOX.General=ERROR,DRFA
+log4j.category.JPOX.Enhancer=ERROR,DRFA
+
+
+# Silence useless ZK logs
+log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA
diff --git hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
new file mode 100644
index 0000000..70ccc31
--- /dev/null
+++ hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -0,0 +1,77 @@
+
+
+
+
+
+
+
+ javax.jdo.option.ConnectionURL
+ jdbc:mysql://localhost/metastore
+
+
+ javax.jdo.option.ConnectionUserName
+ hive
+
+
+ javax.jdo.option.ConnectionPassword
+ hive
+
+
+ javax.jdo.option.ConnectionDriverName
+ com.mysql.jdbc.Driver
+
+
+ datanucleus.autoCreateSchema
+ false
+
+
+ datanucleus.fixedDatastore
+ true
+
+
+
+ hive.support.concurrency
+ true
+
+
+ hive.txn.manager
+ org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
+
+
+ hive.enforce.bucketing
+ true
+
+
+ hive.exec.dynamic.partition.mode
+ nonstrict
+
+
+
+
+
diff --git hcatalog/src/test/e2e/templeton/deployers/env.sh hcatalog/src/test/e2e/templeton/deployers/env.sh
index 958ced8..6cdeb04 100755
--- hcatalog/src/test/e2e/templeton/deployers/env.sh
+++ hcatalog/src/test/e2e/templeton/deployers/env.sh
@@ -50,6 +50,11 @@ if [ -z ${HADOOP_HOME} ]; then
export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION}
fi
+if [ -z ${MYSQL_CLIENT_JAR} ]; then
+ #if using MySQL backed metastore
+ export MYSQL_CLIENT_JAR=/Users/${USER}/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar
+fi
+
export TEZ_CLIENT_HOME=/Users/ekoifman/dev/apache-tez-client-${TEZ_VERSION}
#Make sure Pig is built for the Hadoop version you are running
export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build
diff --git hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
index 0ead10a..8cc9353 100755
--- hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
+++ hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
@@ -25,10 +25,17 @@
source ./env.sh
#decide which DB to run against
+#Derby
cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml
+#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ${HIVE_HOME}/conf/hive-site.xml
#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mssql.xml ${HIVE_HOME}/conf/hive-site.xml
cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml ${HIVE_HOME}/hcatalog/etc/webhcat/webhcat-site.xml
+cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties ${HIVE_HOME}/conf/hive-log4j.properties
+
+if [ -f ${MYSQL_CLIENT_JAR} ]; then
+ cp ${MYSQL_CLIENT_JAR} ${HIVE_HOME}/lib
+fi
if [ -d ${WEBHCAT_LOG_DIR} ]; then
rm -Rf ${WEBHCAT_LOG_DIR};
diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 216a61c..0c4fe49 100644
--- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -533,18 +533,29 @@ public void unlock(UnlockRequest rqst)
}
}
+ /**
+ * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse}
+ */
+ private static class LockInfoExt extends LockInfo {
+ private final ShowLocksResponseElement e;
+ LockInfoExt(ShowLocksResponseElement e, long intLockId) {
+ super(e, intLockId);
+ this.e = e;
+ }
+ }
public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
try {
Connection dbConn = null;
ShowLocksResponse rsp = new ShowLocksResponse();
List elems = new ArrayList();
+ List sortedList = new ArrayList();
Statement stmt = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
+ "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
LOG.debug("Doing to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
while (rs.next()) {
@@ -572,7 +583,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
e.setUser(rs.getString(10));
e.setHostname(rs.getString(11));
- elems.add(e);
+ sortedList.add(new LockInfoExt(e, rs.getLong(12)));
}
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -584,6 +595,12 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
closeStmt(stmt);
closeDbConn(dbConn);
}
+ //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
+ //by checkLock() - makes diagnostics easier.
+ Collections.sort(sortedList, new LockInfoComparator());
+ for(LockInfoExt lockInfoExt : sortedList) {
+ elems.add(lockInfoExt.e);
+ }
rsp.setLocks(elems);
return rsp;
} catch (RetryException e) {
@@ -1086,14 +1103,14 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE
}
private static class LockInfo {
- long extLockId;
- long intLockId;
- long txnId;
- String db;
- String table;
- String partition;
- LockState state;
- LockType type;
+ private final long extLockId;
+ private final long intLockId;
+ private final long txnId;
+ private final String db;
+ private final String table;
+ private final String partition;
+ private final LockState state;
+ private final LockType type;
// Assumes the result set is set to a valid row
LockInfo(ResultSet rs) throws SQLException {
@@ -1107,12 +1124,27 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE
switch (rs.getString("hl_lock_state").charAt(0)) {
case LOCK_WAITING: state = LockState.WAITING; break;
case LOCK_ACQUIRED: state = LockState.ACQUIRED; break;
+ default:
+ state = null;
}
switch (rs.getString("hl_lock_type").charAt(0)) {
case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break;
case LOCK_SHARED: type = LockType.SHARED_READ; break;
case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break;
+ default:
+ type = null;
}
+ txnId = rs.getLong("hl_txnid");
+ }
+ LockInfo(ShowLocksResponseElement e, long intLockId) {
+ extLockId = e.getLockid();
+ this.intLockId = intLockId;
+ db = e.getDbname();
+ table = e.getTablename();
+ partition = e.getPartname();
+ state = e.getState();
+ type = e.getType();
+ txnId = e.getTxnid();
}
public boolean equals(Object other) {
@@ -1130,15 +1162,22 @@ public String toString() {
partition + " state:" + (state == null ? "null" : state.toString())
+ " type:" + (type == null ? "null" : type.toString());
}
+ private boolean isDbLock() {
+ return db != null && table == null && partition == null;
+ }
+ private boolean isTableLock() {
+ return db != null && table != null && partition == null;
+ }
}
private static class LockInfoComparator implements Comparator {
+ private static final LockTypeComparator lockTypeComparator = new LockTypeComparator();
public boolean equals(Object other) {
return this == other;
}
public int compare(LockInfo info1, LockInfo info2) {
- // We sort by state (acquired vs waiting) and then by extLockId.
+ // We sort by state (acquired vs waiting) and then by LockType, they by id
if (info1.state == LockState.ACQUIRED &&
info2.state != LockState .ACQUIRED) {
return -1;
@@ -1147,6 +1186,11 @@ public int compare(LockInfo info1, LockInfo info2) {
info2.state == LockState .ACQUIRED) {
return 1;
}
+
+ int sortByType = lockTypeComparator.compare(info1.type, info2.type);
+ if(sortByType != 0) {
+ return sortByType;
+ }
if (info1.extLockId < info2.extLockId) {
return -1;
} else if (info1.extLockId > info2.extLockId) {
@@ -1163,6 +1207,41 @@ public int compare(LockInfo info1, LockInfo info2) {
}
}
+ /**
+ * Sort more restrictive locks after less restrictive ones
+ */
+ private final static class LockTypeComparator implements Comparator {
+ public boolean equals(Object other) {
+ return this == other;
+ }
+ public int compare(LockType t1, LockType t2) {
+ switch (t1) {
+ case EXCLUSIVE:
+ if(t2 == LockType.EXCLUSIVE) {
+ return 0;
+ }
+ return 1;
+ case SHARED_WRITE:
+ switch (t2) {
+ case EXCLUSIVE:
+ return -1;
+ case SHARED_WRITE:
+ return 0;
+ case SHARED_READ:
+ return 1;
+ default:
+ throw new RuntimeException("Unexpected LockType: " + t2);
+ }
+ case SHARED_READ:
+ if(t2 == LockType.SHARED_READ) {
+ return 0;
+ }
+ return -1;
+ default:
+ throw new RuntimeException("Unexpected LockType: " + t1);
+ }
+ }
+ }
private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING}
// A jump table to figure out whether to wait, acquire,
@@ -1362,11 +1441,11 @@ private LockResponse checkLock(Connection dbConn,
LockResponse response = new LockResponse();
response.setLockid(extLockId);
- LOG.debug("Setting savepoint");
+ LOG.debug("checkLock(): Setting savepoint. extLockId=" + extLockId);
Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
"hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type from HIVE_LOCKS where hl_db in (");
+ "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
Set strings = new HashSet(locksBeingChecked.size());
for (LockInfo info : locksBeingChecked) {
@@ -1431,19 +1510,26 @@ private LockResponse checkLock(Connection dbConn,
query.append("))");
}
}
+ query.append(" and hl_lock_ext_id <= ").append(extLockId);
LOG.debug("Going to execute query <" + query.toString() + ">");
Statement stmt = null;
try {
stmt = dbConn.createStatement();
ResultSet rs = stmt.executeQuery(query.toString());
- SortedSet lockSet = new TreeSet(new LockInfoComparator());
+ SortedSet lockSet = new TreeSet(new LockInfoComparator());
while (rs.next()) {
lockSet.add(new LockInfo(rs));
}
// Turn the tree set into an array so we can move back and forth easily
// in it.
- LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]);
+ LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Locks to check(full): ");
+ for(LockInfo info : locks) {
+ LOG.debug(" " + info);
+ }
+ }
for (LockInfo info : locksBeingChecked) {
// Find the lock record we're checking
@@ -1496,22 +1582,27 @@ private LockResponse checkLock(Connection dbConn,
// We've found something that matches what we're trying to lock,
// so figure out if we can lock it too.
- switch (jumpTable.get(locks[index].type).get(locks[i].type).get
- (locks[i].state)) {
+ LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state);
+ LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction);
+ switch (lockAction) {
+ case WAIT:
+ if(!ignoreConflict(info, locks[i])) {
+ wait(dbConn, save);
+ if (alwaysCommit) {
+ // In the case where lockNoWait has been called we don't want to commit because
+ // it's going to roll everything back. In every other case we want to commit here.
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ }
+ response.setState(LockState.WAITING);
+ LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")");
+ return response;
+ }
+ //fall through to ACQUIRE
case ACQUIRE:
acquire(dbConn, stmt, extLockId, info.intLockId);
acquired = true;
break;
- case WAIT:
- wait(dbConn, save);
- if (alwaysCommit) {
- // In the case where lockNoWait has been called we don't want to commit because
- // it's going to roll everything back. In every other case we want to commit here.
- LOG.debug("Going to commit");
- dbConn.commit();
- }
- response.setState(LockState.WAITING);
- return response;
case KEEP_LOOKING:
continue;
}
@@ -1534,6 +1625,19 @@ private LockResponse checkLock(Connection dbConn,
return response;
}
+ /**
+ * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not
+ * sufficient. For example, an EXCLUSIVE lock on partition should prevent SHARED_READ
+ * on the table, but there is no reason for EXCLUSIVE on a table to prevent SHARED_READ
+ * on a database.
+ */
+ private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) {
+ return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ &&
+ existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) ||
+ (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ &&
+ desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE);
+ }
+
private void wait(Connection dbConn, Savepoint save) throws SQLException {
// Need to rollback because we did a select that acquired locks but we didn't
// actually update anything. Also, we may have locked some locks as
@@ -1654,7 +1758,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId)
try {
stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
- "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
+ "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " +
"hl_lock_ext_id = " + extLockId;
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d2eed88..54c83a1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1317,7 +1317,7 @@ public int execute() throws CommandNeedRetryException {
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
- LOG.info("Starting command: " + queryStr);
+ LOG.info("Starting command(queryId=" + queryId + "): " + queryStr);
plan.setStarted();
diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index de7d414..805e090 100644
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -72,12 +72,21 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode,
* Send a lock request to the metastore. This is intended for use by
* {@link DbTxnManager}.
* @param lock lock request
+ * @param isBlocking if true, will block until locks have been acquired
* @throws LockException
+ * @return the result of the lock attempt
*/
- List lock(LockRequest lock) throws LockException {
+ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List acquiredLocks) throws LockException {
try {
- LOG.debug("Requesting lock");
+ LOG.debug("Requesting: queryId=" + queryId + " " + lock);
LockResponse res = client.lock(lock);
+ //link lockId to queryId
+ LOG.debug("Response " + res);
+ if(!isBlocking) {
+ if(res.getState() == LockState.WAITING) {
+ return LockState.WAITING;
+ }
+ }
while (res.getState() == LockState.WAITING) {
backoff();
res = client.checkLock(res.getLockid());
@@ -88,9 +97,8 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode,
if (res.getState() != LockState.ACQUIRED) {
throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());
}
- List locks = new ArrayList(1);
- locks.add(hl);
- return locks;
+ acquiredLocks.add(hl);
+ return res.getState();
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find txnid " + lock.getTxnid());
throw new LockException(ErrorMsg.TXNMGR_NOT_INSTANTIATED.getMsg(), e);
@@ -102,6 +110,20 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode,
e);
}
}
+ /**
+ * Used to make another attempt to acquire a lock (in Waiting state)
+ * @param extLockId
+ * @return result of the attempt
+ * @throws LockException
+ */
+ LockState checkLock(long extLockId) throws LockException {
+ try {
+ return client.checkLock(extLockId).getState();
+ } catch (TException e) {
+ throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
+ e);
+ }
+ }
@Override
public void unlock(HiveLock hiveLock) throws LockException {
diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d11fabd..ccbac80 100644
--- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.thrift.TException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -87,6 +88,15 @@ public HiveLockManager getLockManager() throws LockException {
@Override
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
+ acquireLocks(plan, ctx, username, true);
+ }
+
+ /**
+ * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+ * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
+ * @return null if no locks were needed
+ */
+ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
init();
// Make sure we've built the lock manager
getLockManager();
@@ -94,7 +104,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo
boolean atLeastOneLock = false;
LockRequestBuilder rqstBuilder = new LockRequestBuilder();
- LOG.debug("Setting lock request transaction to " + txnId);
+ //link queryId to txnId
+ LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId());
rqstBuilder.setTransactionId(txnId)
.setUser(username);
@@ -206,10 +217,15 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo
// Make sure we need locks. It's possible there's nothing to lock in
// this operation.
- if (!atLeastOneLock) return;
+ if (!atLeastOneLock) {
+ LOG.debug("No locks needed for queryId" + plan.getQueryId());
+ return null;
+ }
- List locks = lockMgr.lock(rqstBuilder.build());
+ List locks = new ArrayList(1);
+ LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
ctx.setHiveLocks(locks);
+ return lockState;
}
@Override
diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 1cd7c32..2fb78fd 100644
--- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -32,6 +32,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -39,6 +40,7 @@
/**
* Unit tests for {@link DbTxnManager}.
+ * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
*/
public class TestDbTxnManager {
diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
new file mode 100644
index 0000000..6a69641
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
+ * Tests here
+ */
+public class TestDbTxnManager2 {
+ private static HiveConf conf = new HiveConf(Driver.class);
+ private HiveTxnManager txnMgr;
+ private Context ctx;
+ private Driver driver;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ conf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true);
+ }
+ @Before
+ public void setUp() throws Exception {
+ SessionState.start(conf);
+ ctx = new Context(conf);
+ driver = new Driver(conf);
+ driver.init();
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+ txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ Assert.assertTrue(txnMgr instanceof DbTxnManager);
+ }
+ @After
+ public void tearDown() throws Exception {
+ driver.close();
+ if (txnMgr != null) txnMgr.closeTxnManager();
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+ }
+ @Test
+ public void createTable() throws Exception {
+ CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks.get(0));
+ txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ Assert.assertEquals("Lock remained", 0, getLocks().size());
+ }
+ @Test
+ public void insertOverwriteCreate() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists T3(a int)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks.get(1));
+ txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ Assert.assertEquals("Lock remained", 0, getLocks().size());
+ cpr = driver.run("drop table if exists T1");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("drop table if exists T2");
+ checkCmdOnDriver(cpr);
+ }
+ @Test
+ public void insertOverwritePartitionedCreate() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists T5(name string, age int, gpa double)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks.get(1));
+ txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ Assert.assertEquals("Lock remained", 0, getLocks().size());
+ cpr = driver.run("drop table if exists T5");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("drop table if exists T4");
+ checkCmdOnDriver(cpr);
+ }
+ @Test
+ public void basicBlocking() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("select a from T6");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6
+ List selectLocks = ctx.getHiveLocks();
+ cpr = driver.compileAndRespond("drop table if exists T6");
+ checkCmdOnDriver(cpr);
+ //tries to get X lock on T1 and gets Waiting state
+ LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks.get(1));
+ txnMgr.getLockManager().releaseLocks(selectLocks);//release S on T6
+ //attempt to X on T6 again - succeed
+ lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks.get(0));
+ List xLock = new ArrayList(0);
+ xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ txnMgr.getLockManager().releaseLocks(xLock);
+ cpr = driver.run("drop table if exists T6");
+ locks = getLocks();
+ Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
+ checkCmdOnDriver(cpr);
+ }
+ @Test
+ public void lockConflictDbTable() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create database if not exists temp");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
+ List updateLocks = ctx.getHiveLocks();
+ cpr = driver.compileAndRespond("drop database if exists temp");
+ LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0));
+ checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1));
+ txnMgr.getLockManager().releaseLocks(updateLocks);
+ lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0));
+ List xLock = new ArrayList(0);
+ xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ txnMgr.getLockManager().releaseLocks(xLock);
+ }
+ @Test
+ public void updateSelectUpdate() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.compileAndRespond("delete from T8 where b = 89");
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
+ List deleteLocks = ctx.getHiveLocks();
+ cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
+ checkCmdOnDriver(cpr);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1");
+ checkCmdOnDriver(cpr);
+ LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
+ List locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2));
+ txnMgr.getLockManager().releaseLocks(deleteLocks);
+ lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
+ List relLocks = new ArrayList(2);
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+ relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+ txnMgr.getLockManager().releaseLocks(relLocks);
+ cpr = driver.run("drop table if exists T6");
+ locks = getLocks();
+ Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
+ checkCmdOnDriver(cpr);
+ }
+
+
+ private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
+ Assert.assertEquals(l.toString(),l.getType(), type);
+ Assert.assertEquals(l.toString(),l.getState(), state);
+ Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db));
+ Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table));
+ Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition));
+ }
+ private void checkCmdOnDriver(CommandProcessorResponse cpr) {
+ Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0);
+ }
+ private String normalizeCase(String s) {
+ return s == null ? null : s.toLowerCase();
+ }
+ private List getLocks() throws Exception {
+ ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
+ return rsp.getLocks();
+ }
+}