diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties
new file mode 100644
index 0000000..82684b3
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+hive.log.threshold=ALL
+hive.root.logger=DEBUG,DRFA
+hive.log.dir=/tmp/ekoifman
+hive.log.file=hive.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hive.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshold=${hive.log.threshold}
+
+#
+# Daily Rolling File Appender
+#
+# Use the PidDailyerRollingFileAppend class instead if you want to use separate log files
+# for different CLI session.
+#
+# log4j.appender.DRFA=org.apache.hadoop.hive.ql.log.PidDailyRollingFileAppender
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+
+log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
+log4j.appender.console.encoding=UTF-8
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
+
+
+log4j.category.DataNucleus=ERROR,DRFA
+log4j.category.Datastore=ERROR,DRFA
+log4j.category.Datastore.Schema=ERROR,DRFA
+log4j.category.JPOX.Datastore=ERROR,DRFA
+log4j.category.JPOX.Plugin=ERROR,DRFA
+log4j.category.JPOX.MetaData=ERROR,DRFA
+log4j.category.JPOX.Query=ERROR,DRFA
+log4j.category.JPOX.General=ERROR,DRFA
+log4j.category.JPOX.Enhancer=ERROR,DRFA
+
+
+# Silence useless ZK logs
+log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
new file mode 100644
index 0000000..70ccc31
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -0,0 +1,77 @@
+
+
+
+
+
+
+
+ javax.jdo.option.ConnectionURL
+ jdbc:mysql://localhost/metastore
+
+
+ javax.jdo.option.ConnectionUserName
+ hive
+
+
+ javax.jdo.option.ConnectionPassword
+ hive
+
+
+ javax.jdo.option.ConnectionDriverName
+ com.mysql.jdbc.Driver
+
+
+ datanucleus.autoCreateSchema
+ false
+
+
+ datanucleus.fixedDatastore
+ true
+
+
+
+ hive.support.concurrency
+ true
+
+
+ hive.txn.manager
+ org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
+
+
+ hive.enforce.bucketing
+ true
+
+
+ hive.exec.dynamic.partition.mode
+ nonstrict
+
+
+
+
+
diff --git a/hcatalog/src/test/e2e/templeton/deployers/env.sh b/hcatalog/src/test/e2e/templeton/deployers/env.sh
index 958ced8..6cdeb04 100755
--- a/hcatalog/src/test/e2e/templeton/deployers/env.sh
+++ b/hcatalog/src/test/e2e/templeton/deployers/env.sh
@@ -50,6 +50,11 @@ if [ -z ${HADOOP_HOME} ]; then
export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION}
fi
+if [ -z ${MYSQL_CLIENT_JAR} ]; then
+ #if using MySQL backed metastore
+ export MYSQL_CLIENT_JAR=/Users/${USER}/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar
+fi
+
export TEZ_CLIENT_HOME=/Users/ekoifman/dev/apache-tez-client-${TEZ_VERSION}
#Make sure Pig is built for the Hadoop version you are running
export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build
diff --git a/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh b/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
index 0ead10a..a2248e5 100755
--- a/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
+++ b/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh
@@ -25,10 +25,17 @@
source ./env.sh
#decide which DB to run against
-cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml
+#Derby
+#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml
+cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ${HIVE_HOME}/conf/hive-site.xml
#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mssql.xml ${HIVE_HOME}/conf/hive-site.xml
cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml ${HIVE_HOME}/hcatalog/etc/webhcat/webhcat-site.xml
+cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties ${HIVE_HOME}/conf/hive-log4j.properties
+
+if [ -f ${MYSQL_CLIENT_JAR} ]; then
+ cp ${MYSQL_CLIENT_JAR} ${HIVE_HOME}/lib
+fi
if [ -d ${WEBHCAT_LOG_DIR} ]; then
rm -Rf ${WEBHCAT_LOG_DIR};
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 216a61c..0c4fe49 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -533,18 +533,29 @@ public void unlock(UnlockRequest rqst)
}
}
+ /**
+ * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse}
+ */
+ private static class LockInfoExt extends LockInfo {
+ private final ShowLocksResponseElement e;
+ LockInfoExt(ShowLocksResponseElement e, long intLockId) {
+ super(e, intLockId);
+ this.e = e;
+ }
+ }
public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
try {
Connection dbConn = null;
ShowLocksResponse rsp = new ShowLocksResponse();
List elems = new ArrayList();
+ List sortedList = new ArrayList();
Statement stmt = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
+ "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS";
LOG.debug("Doing to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
while (rs.next()) {
@@ -572,7 +583,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
e.setUser(rs.getString(10));
e.setHostname(rs.getString(11));
- elems.add(e);
+ sortedList.add(new LockInfoExt(e, rs.getLong(12)));
}
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -584,6 +595,12 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
closeStmt(stmt);
closeDbConn(dbConn);
}
+ //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
+ //by checkLock() - makes diagnostics easier.
+ Collections.sort(sortedList, new LockInfoComparator());
+ for(LockInfoExt lockInfoExt : sortedList) {
+ elems.add(lockInfoExt.e);
+ }
rsp.setLocks(elems);
return rsp;
} catch (RetryException e) {
@@ -1086,14 +1103,14 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE
}
private static class LockInfo {
- long extLockId;
- long intLockId;
- long txnId;
- String db;
- String table;
- String partition;
- LockState state;
- LockType type;
+ private final long extLockId;
+ private final long intLockId;
+ private final long txnId;
+ private final String db;
+ private final String table;
+ private final String partition;
+ private final LockState state;
+ private final LockType type;
// Assumes the result set is set to a valid row
LockInfo(ResultSet rs) throws SQLException {
@@ -1107,12 +1124,27 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE
switch (rs.getString("hl_lock_state").charAt(0)) {
case LOCK_WAITING: state = LockState.WAITING; break;
case LOCK_ACQUIRED: state = LockState.ACQUIRED; break;
+ default:
+ state = null;
}
switch (rs.getString("hl_lock_type").charAt(0)) {
case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break;
case LOCK_SHARED: type = LockType.SHARED_READ; break;
case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break;
+ default:
+ type = null;
}
+ txnId = rs.getLong("hl_txnid");
+ }
+ LockInfo(ShowLocksResponseElement e, long intLockId) {
+ extLockId = e.getLockid();
+ this.intLockId = intLockId;
+ db = e.getDbname();
+ table = e.getTablename();
+ partition = e.getPartname();
+ state = e.getState();
+ type = e.getType();
+ txnId = e.getTxnid();
}
public boolean equals(Object other) {
@@ -1130,15 +1162,22 @@ public String toString() {
partition + " state:" + (state == null ? "null" : state.toString())
+ " type:" + (type == null ? "null" : type.toString());
}
+ private boolean isDbLock() {
+ return db != null && table == null && partition == null;
+ }
+ private boolean isTableLock() {
+ return db != null && table != null && partition == null;
+ }
}
private static class LockInfoComparator implements Comparator {
+ private static final LockTypeComparator lockTypeComparator = new LockTypeComparator();
public boolean equals(Object other) {
return this == other;
}
public int compare(LockInfo info1, LockInfo info2) {
- // We sort by state (acquired vs waiting) and then by extLockId.
+ // We sort by state (acquired vs waiting) and then by LockType, they by id
if (info1.state == LockState.ACQUIRED &&
info2.state != LockState .ACQUIRED) {
return -1;
@@ -1147,6 +1186,11 @@ public int compare(LockInfo info1, LockInfo info2) {
info2.state == LockState .ACQUIRED) {
return 1;
}
+
+ int sortByType = lockTypeComparator.compare(info1.type, info2.type);
+ if(sortByType != 0) {
+ return sortByType;
+ }
if (info1.extLockId < info2.extLockId) {
return -1;
} else if (info1.extLockId > info2.extLockId) {
@@ -1163,6 +1207,41 @@ public int compare(LockInfo info1, LockInfo info2) {
}
}
+ /**
+ * Sort more restrictive locks after less restrictive ones
+ */
+ private final static class LockTypeComparator implements Comparator {
+ public boolean equals(Object other) {
+ return this == other;
+ }
+ public int compare(LockType t1, LockType t2) {
+ switch (t1) {
+ case EXCLUSIVE:
+ if(t2 == LockType.EXCLUSIVE) {
+ return 0;
+ }
+ return 1;
+ case SHARED_WRITE:
+ switch (t2) {
+ case EXCLUSIVE:
+ return -1;
+ case SHARED_WRITE:
+ return 0;
+ case SHARED_READ:
+ return 1;
+ default:
+ throw new RuntimeException("Unexpected LockType: " + t2);
+ }
+ case SHARED_READ:
+ if(t2 == LockType.SHARED_READ) {
+ return 0;
+ }
+ return -1;
+ default:
+ throw new RuntimeException("Unexpected LockType: " + t1);
+ }
+ }
+ }
private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING}
// A jump table to figure out whether to wait, acquire,
@@ -1362,11 +1441,11 @@ private LockResponse checkLock(Connection dbConn,
LockResponse response = new LockResponse();
response.setLockid(extLockId);
- LOG.debug("Setting savepoint");
+ LOG.debug("checkLock(): Setting savepoint. extLockId=" + extLockId);
Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
"hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type from HIVE_LOCKS where hl_db in (");
+ "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
Set strings = new HashSet(locksBeingChecked.size());
for (LockInfo info : locksBeingChecked) {
@@ -1431,19 +1510,26 @@ private LockResponse checkLock(Connection dbConn,
query.append("))");
}
}
+ query.append(" and hl_lock_ext_id <= ").append(extLockId);
LOG.debug("Going to execute query <" + query.toString() + ">");
Statement stmt = null;
try {
stmt = dbConn.createStatement();
ResultSet rs = stmt.executeQuery(query.toString());
- SortedSet lockSet = new TreeSet(new LockInfoComparator());
+ SortedSet lockSet = new TreeSet(new LockInfoComparator());
while (rs.next()) {
lockSet.add(new LockInfo(rs));
}
// Turn the tree set into an array so we can move back and forth easily
// in it.
- LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]);
+ LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Locks to check(full): ");
+ for(LockInfo info : locks) {
+ LOG.debug(" " + info);
+ }
+ }
for (LockInfo info : locksBeingChecked) {
// Find the lock record we're checking
@@ -1496,22 +1582,27 @@ private LockResponse checkLock(Connection dbConn,
// We've found something that matches what we're trying to lock,
// so figure out if we can lock it too.
- switch (jumpTable.get(locks[index].type).get(locks[i].type).get
- (locks[i].state)) {
+ LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state);
+ LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction);
+ switch (lockAction) {
+ case WAIT:
+ if(!ignoreConflict(info, locks[i])) {
+ wait(dbConn, save);
+ if (alwaysCommit) {
+ // In the case where lockNoWait has been called we don't want to commit because
+ // it's going to roll everything back. In every other case we want to commit here.
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ }
+ response.setState(LockState.WAITING);
+ LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")");
+ return response;
+ }
+ //fall through to ACQUIRE
case ACQUIRE:
acquire(dbConn, stmt, extLockId, info.intLockId);
acquired = true;
break;
- case WAIT:
- wait(dbConn, save);
- if (alwaysCommit) {
- // In the case where lockNoWait has been called we don't want to commit because
- // it's going to roll everything back. In every other case we want to commit here.
- LOG.debug("Going to commit");
- dbConn.commit();
- }
- response.setState(LockState.WAITING);
- return response;
case KEEP_LOOKING:
continue;
}
@@ -1534,6 +1625,19 @@ private LockResponse checkLock(Connection dbConn,
return response;
}
+ /**
+ * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not
+ * sufficient. For example, an EXCLUSIVE lock on partition should prevent SHARED_READ
+ * on the table, but there is no reason for EXCLUSIVE on a table to prevent SHARED_READ
+ * on a database.
+ */
+ private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) {
+ return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ &&
+ existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) ||
+ (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ &&
+ desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE);
+ }
+
private void wait(Connection dbConn, Savepoint save) throws SQLException {
// Need to rollback because we did a select that acquired locks but we didn't
// actually update anything. Also, we may have locked some locks as
@@ -1654,7 +1758,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId)
try {
stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
- "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
+ "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " +
"hl_lock_ext_id = " + extLockId;
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d2eed88..54c83a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1317,7 +1317,7 @@ public int execute() throws CommandNeedRetryException {
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
- LOG.info("Starting command: " + queryStr);
+ LOG.info("Starting command(queryId=" + queryId + "): " + queryStr);
plan.setStarted();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index de7d414..a399b4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -74,10 +74,12 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode,
* @param lock lock request
* @throws LockException
*/
- List lock(LockRequest lock) throws LockException {
+ List lock(LockRequest lock, String queryId) throws LockException {
try {
- LOG.debug("Requesting lock");
+ LOG.debug("Requesting: queryId=" + queryId + " " + lock);
LockResponse res = client.lock(lock);
+ //link lockId to queryId
+ LOG.debug("Response " + res);
while (res.getState() == LockState.WAITING) {
backoff();
res = client.checkLock(res.getLockid());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d11fabd..53bfdf4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -94,7 +94,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo
boolean atLeastOneLock = false;
LockRequestBuilder rqstBuilder = new LockRequestBuilder();
- LOG.debug("Setting lock request transaction to " + txnId);
+ //link queryId to txnId
+ LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId());
rqstBuilder.setTransactionId(txnId)
.setUser(username);
@@ -206,9 +207,12 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo
// Make sure we need locks. It's possible there's nothing to lock in
// this operation.
- if (!atLeastOneLock) return;
+ if (!atLeastOneLock) {
+ LOG.debug("No locks needed for queryId" + plan.getQueryId());
+ return;
+ }
- List locks = lockMgr.lock(rqstBuilder.build());
+ List locks = lockMgr.lock(rqstBuilder.build(), plan.getQueryId());
ctx.setHiveLocks(locks);
}