diff --git hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties new file mode 100644 index 0000000..82684b3 --- /dev/null +++ hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hive.log.threshold=ALL +hive.root.logger=DEBUG,DRFA +hive.log.dir=/tmp/ekoifman +hive.log.file=hive.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshold=${hive.log.threshold} + +# +# Daily Rolling File Appender +# +# Use the PidDailyerRollingFileAppend class instead if you want to use separate log files +# for different CLI session. +# +# log4j.appender.DRFA=org.apache.hadoop.hive.ql.log.PidDailyRollingFileAppender + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + +log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n +log4j.appender.console.encoding=UTF-8 + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter + + +log4j.category.DataNucleus=ERROR,DRFA +log4j.category.Datastore=ERROR,DRFA +log4j.category.Datastore.Schema=ERROR,DRFA +log4j.category.JPOX.Datastore=ERROR,DRFA +log4j.category.JPOX.Plugin=ERROR,DRFA +log4j.category.JPOX.MetaData=ERROR,DRFA +log4j.category.JPOX.Query=ERROR,DRFA +log4j.category.JPOX.General=ERROR,DRFA +log4j.category.JPOX.Enhancer=ERROR,DRFA + + +# Silence useless ZK logs +log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA diff --git hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml new file mode 100644 index 0000000..70ccc31 --- /dev/null +++ hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml @@ -0,0 +1,77 @@ + + + + + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://localhost/metastore + + + javax.jdo.option.ConnectionUserName + hive + + + javax.jdo.option.ConnectionPassword + hive + + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + + + datanucleus.autoCreateSchema + false + + + datanucleus.fixedDatastore + true + + + + hive.support.concurrency + true + + + hive.txn.manager + org.apache.hadoop.hive.ql.lockmgr.DbTxnManager + + + hive.enforce.bucketing + true + + + hive.exec.dynamic.partition.mode + nonstrict + + + + + diff --git hcatalog/src/test/e2e/templeton/deployers/env.sh hcatalog/src/test/e2e/templeton/deployers/env.sh index 958ced8..6cdeb04 100755 --- hcatalog/src/test/e2e/templeton/deployers/env.sh +++ hcatalog/src/test/e2e/templeton/deployers/env.sh @@ -50,6 +50,11 @@ if [ -z ${HADOOP_HOME} ]; then export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION} fi +if [ -z ${MYSQL_CLIENT_JAR} ]; then + #if using MySQL backed metastore + export MYSQL_CLIENT_JAR=/Users/${USER}/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar +fi + export TEZ_CLIENT_HOME=/Users/ekoifman/dev/apache-tez-client-${TEZ_VERSION} #Make sure Pig is built for the Hadoop version you are running export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build diff --git hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh index 0ead10a..8cc9353 100755 --- hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh +++ hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh @@ -25,10 +25,17 @@ source ./env.sh #decide which DB to run against +#Derby cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml +#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ${HIVE_HOME}/conf/hive-site.xml #cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mssql.xml ${HIVE_HOME}/conf/hive-site.xml cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml ${HIVE_HOME}/hcatalog/etc/webhcat/webhcat-site.xml +cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties ${HIVE_HOME}/conf/hive-log4j.properties + +if [ -f ${MYSQL_CLIENT_JAR} ]; then + cp ${MYSQL_CLIENT_JAR} ${HIVE_HOME}/lib +fi if [ -d ${WEBHCAT_LOG_DIR} ]; then rm -Rf ${WEBHCAT_LOG_DIR}; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 216a61c..f9a742d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -533,18 +533,29 @@ public void unlock(UnlockRequest rqst) } } + /** + * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} + */ + private static class LockInfoExt extends LockInfo { + private final ShowLocksResponseElement e; + LockInfoExt(ShowLocksResponseElement e, long intLockId) { + super(e, intLockId); + this.e = e; + } + } public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { try { Connection dbConn = null; ShowLocksResponse rsp = new ShowLocksResponse(); List elems = new ArrayList(); + List sortedList = new ArrayList(); Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS"; LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -572,7 +583,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { if (!rs.wasNull()) e.setAcquiredat(acquiredAt); e.setUser(rs.getString(10)); e.setHostname(rs.getString(11)); - elems.add(e); + sortedList.add(new LockInfoExt(e, rs.getLong(12))); } LOG.debug("Going to rollback"); dbConn.rollback(); @@ -584,6 +595,12 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { closeStmt(stmt); closeDbConn(dbConn); } + //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined + //by checkLock() - makes diagnostics easier. + Collections.sort(sortedList, new LockInfoComparator()); + for(LockInfoExt lockInfoExt : sortedList) { + elems.add(lockInfoExt.e); + } rsp.setLocks(elems); return rsp; } catch (RetryException e) { @@ -1086,17 +1103,17 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE } private static class LockInfo { - long extLockId; - long intLockId; - long txnId; - String db; - String table; - String partition; - LockState state; - LockType type; + private final long extLockId; + private final long intLockId; + private final long txnId; + private final String db; + private final String table; + private final String partition; + private final LockState state; + private final LockType type; // Assumes the result set is set to a valid row - LockInfo(ResultSet rs) throws SQLException { + LockInfo(ResultSet rs) throws SQLException, MetaException { extLockId = rs.getLong("hl_lock_ext_id"); // can't be null intLockId = rs.getLong("hl_lock_int_id"); // can't be null db = rs.getString("hl_db"); // can't be null @@ -1107,12 +1124,27 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE switch (rs.getString("hl_lock_state").charAt(0)) { case LOCK_WAITING: state = LockState.WAITING; break; case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; + default: + throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0)); } switch (rs.getString("hl_lock_type").charAt(0)) { case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; case LOCK_SHARED: type = LockType.SHARED_READ; break; case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; + default: + throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); } + txnId = rs.getLong("hl_txnid"); + } + LockInfo(ShowLocksResponseElement e, long intLockId) { + extLockId = e.getLockid(); + this.intLockId = intLockId; + db = e.getDbname(); + table = e.getTablename(); + partition = e.getPartname(); + state = e.getState(); + type = e.getType(); + txnId = e.getTxnid(); } public boolean equals(Object other) { @@ -1130,15 +1162,22 @@ public String toString() { partition + " state:" + (state == null ? "null" : state.toString()) + " type:" + (type == null ? "null" : type.toString()); } + private boolean isDbLock() { + return db != null && table == null && partition == null; + } + private boolean isTableLock() { + return db != null && table != null && partition == null; + } } private static class LockInfoComparator implements Comparator { + private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); public boolean equals(Object other) { return this == other; } public int compare(LockInfo info1, LockInfo info2) { - // We sort by state (acquired vs waiting) and then by extLockId. + // We sort by state (acquired vs waiting) and then by LockType, they by id if (info1.state == LockState.ACQUIRED && info2.state != LockState .ACQUIRED) { return -1; @@ -1147,6 +1186,11 @@ public int compare(LockInfo info1, LockInfo info2) { info2.state == LockState .ACQUIRED) { return 1; } + + int sortByType = lockTypeComparator.compare(info1.type, info2.type); + if(sortByType != 0) { + return sortByType; + } if (info1.extLockId < info2.extLockId) { return -1; } else if (info1.extLockId > info2.extLockId) { @@ -1163,6 +1207,41 @@ public int compare(LockInfo info1, LockInfo info2) { } } + /** + * Sort more restrictive locks after less restrictive ones + */ + private final static class LockTypeComparator implements Comparator { + public boolean equals(Object other) { + return this == other; + } + public int compare(LockType t1, LockType t2) { + switch (t1) { + case EXCLUSIVE: + if(t2 == LockType.EXCLUSIVE) { + return 0; + } + return 1; + case SHARED_WRITE: + switch (t2) { + case EXCLUSIVE: + return -1; + case SHARED_WRITE: + return 0; + case SHARED_READ: + return 1; + default: + throw new RuntimeException("Unexpected LockType: " + t2); + } + case SHARED_READ: + if(t2 == LockType.SHARED_READ) { + return 0; + } + return -1; + default: + throw new RuntimeException("Unexpected LockType: " + t1); + } + } + } private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING} // A jump table to figure out whether to wait, acquire, @@ -1362,11 +1441,11 @@ private LockResponse checkLock(Connection dbConn, LockResponse response = new LockResponse(); response.setLockid(extLockId); - LOG.debug("Setting savepoint"); + LOG.debug("checkLock(): Setting savepoint. extLockId=" + extLockId); Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); Set strings = new HashSet(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1431,19 +1510,26 @@ private LockResponse checkLock(Connection dbConn, query.append("))"); } } + query.append(" and hl_lock_ext_id <= ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); Statement stmt = null; try { stmt = dbConn.createStatement(); ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); while (rs.next()) { lockSet.add(new LockInfo(rs)); } // Turn the tree set into an array so we can move back and forth easily // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); + if(LOG.isDebugEnabled()) { + LOG.debug("Locks to check(full): "); + for(LockInfo info : locks) { + LOG.debug(" " + info); + } + } for (LockInfo info : locksBeingChecked) { // Find the lock record we're checking @@ -1496,22 +1582,27 @@ private LockResponse checkLock(Connection dbConn, // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { + LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state); + LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); + switch (lockAction) { + case WAIT: + if(!ignoreConflict(info, locks[i])) { + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); + return response; + } + //fall through to ACQUIRE case ACQUIRE: acquire(dbConn, stmt, extLockId, info.intLockId); acquired = true; break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; case KEEP_LOOKING: continue; } @@ -1534,6 +1625,19 @@ private LockResponse checkLock(Connection dbConn, return response; } + /** + * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not + * sufficient. For example, an EXCLUSIVE lock on partition should prevent SHARED_READ + * on the table, but there is no reason for EXCLUSIVE on a table to prevent SHARED_READ + * on a database. + */ + private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) { + return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && + existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || + (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && + desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE); + } + private void wait(Connection dbConn, Savepoint save) throws SQLException { // Need to rollback because we did a select that acquired locks but we didn't // actually update anything. Also, we may have locked some locks as @@ -1654,7 +1758,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) try { stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 64a1ee0..338e755 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1322,7 +1322,7 @@ public int execute() throws CommandNeedRetryException { maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); try { - LOG.info("Starting command: " + queryStr); + LOG.info("Starting command(queryId=" + queryId + "): " + queryStr); // compile and execute can get called from different threads in case of HS2 // so clear timing in this thread's Hive object before proceeding. Hive.get().clearMetaCallTiming(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index de7d414..805e090 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -72,12 +72,21 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * Send a lock request to the metastore. This is intended for use by * {@link DbTxnManager}. * @param lock lock request + * @param isBlocking if true, will block until locks have been acquired * @throws LockException + * @return the result of the lock attempt */ - List lock(LockRequest lock) throws LockException { + LockState lock(LockRequest lock, String queryId, boolean isBlocking, List acquiredLocks) throws LockException { try { - LOG.debug("Requesting lock"); + LOG.debug("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); + //link lockId to queryId + LOG.debug("Response " + res); + if(!isBlocking) { + if(res.getState() == LockState.WAITING) { + return LockState.WAITING; + } + } while (res.getState() == LockState.WAITING) { backoff(); res = client.checkLock(res.getLockid()); @@ -88,9 +97,8 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, if (res.getState() != LockState.ACQUIRED) { throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); } - List locks = new ArrayList(1); - locks.add(hl); - return locks; + acquiredLocks.add(hl); + return res.getState(); } catch (NoSuchTxnException e) { LOG.error("Metastore could not find txnid " + lock.getTxnid()); throw new LockException(ErrorMsg.TXNMGR_NOT_INSTANTIATED.getMsg(), e); @@ -102,6 +110,20 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, e); } } + /** + * Used to make another attempt to acquire a lock (in Waiting state) + * @param extLockId + * @return result of the attempt + * @throws LockException + */ + LockState checkLock(long extLockId) throws LockException { + try { + return client.checkLock(extLockId).getState(); + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), + e); + } + } @Override public void unlock(HiveLock hiveLock) throws LockException { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index d11fabd..ccbac80 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.thrift.TException; +import java.util.ArrayList; import java.util.List; /** @@ -87,6 +88,15 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { + acquireLocks(plan, ctx, username, true); + } + + /** + * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} + * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING + * @return null if no locks were needed + */ + LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { init(); // Make sure we've built the lock manager getLockManager(); @@ -94,7 +104,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo boolean atLeastOneLock = false; LockRequestBuilder rqstBuilder = new LockRequestBuilder(); - LOG.debug("Setting lock request transaction to " + txnId); + //link queryId to txnId + LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) .setUser(username); @@ -206,10 +217,15 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo // Make sure we need locks. It's possible there's nothing to lock in // this operation. - if (!atLeastOneLock) return; + if (!atLeastOneLock) { + LOG.debug("No locks needed for queryId" + plan.getQueryId()); + return null; + } - List locks = lockMgr.lock(rqstBuilder.build()); + List locks = new ArrayList(1); + LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); ctx.setHiveLocks(locks); + return lockState; } @Override diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 1cd7c32..2fb78fd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -32,6 +32,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -39,6 +40,7 @@ /** * Unit tests for {@link DbTxnManager}. + * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} */ public class TestDbTxnManager { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java new file mode 100644 index 0000000..6a69641 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lockmgr; + +import junit.framework.Assert; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} + * Tests here + */ +public class TestDbTxnManager2 { + private static HiveConf conf = new HiveConf(Driver.class); + private HiveTxnManager txnMgr; + private Context ctx; + private Driver driver; + + @BeforeClass + public static void setUpClass() throws Exception { + TxnDbUtil.setConfValues(conf); + conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + conf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true); + } + @Before + public void setUp() throws Exception { + SessionState.start(conf); + ctx = new Context(conf); + driver = new Driver(conf); + driver.init(); + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + Assert.assertTrue(txnMgr instanceof DbTxnManager); + } + @After + public void tearDown() throws Exception { + driver.close(); + if (txnMgr != null) txnMgr.closeTxnManager(); + TxnDbUtil.cleanDb(); + TxnDbUtil.prepDb(); + } + @Test + public void createTable() throws Exception { + CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks.get(0)); + txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + Assert.assertEquals("Lock remained", 0, getLocks().size()); + } + @Test + public void insertOverwriteCreate() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists T3(a int)"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks.get(1)); + txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + Assert.assertEquals("Lock remained", 0, getLocks().size()); + cpr = driver.run("drop table if exists T1"); + checkCmdOnDriver(cpr); + cpr = driver.run("drop table if exists T2"); + checkCmdOnDriver(cpr); + } + @Test + public void insertOverwritePartitionedCreate() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists T5(name string, age int, gpa double)"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks.get(1)); + txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + Assert.assertEquals("Lock remained", 0, getLocks().size()); + cpr = driver.run("drop table if exists T5"); + checkCmdOnDriver(cpr); + cpr = driver.run("drop table if exists T4"); + checkCmdOnDriver(cpr); + } + @Test + public void basicBlocking() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("select a from T6"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 + List selectLocks = ctx.getHiveLocks(); + cpr = driver.compileAndRespond("drop table if exists T6"); + checkCmdOnDriver(cpr); + //tries to get X lock on T1 and gets Waiting state + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks.get(1)); + txnMgr.getLockManager().releaseLocks(selectLocks);//release S on T6 + //attempt to X on T6 again - succeed + lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks.get(0)); + List xLock = new ArrayList(0); + xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + txnMgr.getLockManager().releaseLocks(xLock); + cpr = driver.run("drop table if exists T6"); + locks = getLocks(); + Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); + checkCmdOnDriver(cpr); + } + @Test + public void lockConflictDbTable() throws Exception { + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); + List updateLocks = ctx.getHiveLocks(); + cpr = driver.compileAndRespond("drop database if exists temp"); + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1)); + txnMgr.getLockManager().releaseLocks(updateLocks); + lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0)); + List xLock = new ArrayList(0); + xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + txnMgr.getLockManager().releaseLocks(xLock); + } + @Test + public void updateSelectUpdate() throws Exception { + CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("delete from T8 where b = 89"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 + List deleteLocks = ctx.getHiveLocks(); + cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler"); + cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1"); + checkCmdOnDriver(cpr); + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2)); + txnMgr.getLockManager().releaseLocks(deleteLocks); + lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid()); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1)); + List relLocks = new ArrayList(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + cpr = driver.run("drop table if exists T6"); + locks = getLocks(); + Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); + checkCmdOnDriver(cpr); + } + + + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { + Assert.assertEquals(l.toString(),l.getType(), type); + Assert.assertEquals(l.toString(),l.getState(), state); + Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db)); + Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table)); + Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition)); + } + private void checkCmdOnDriver(CommandProcessorResponse cpr) { + Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0); + } + private String normalizeCase(String s) { + return s == null ? null : s.toLowerCase(); + } + private List getLocks() throws Exception { + ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); + return rsp.getLocks(); + } +}