diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties new file mode 100644 index 0000000..82684b3 --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hive.log.threshold=ALL +hive.root.logger=DEBUG,DRFA +hive.log.dir=/tmp/ekoifman +hive.log.file=hive.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshold=${hive.log.threshold} + +# +# Daily Rolling File Appender +# +# Use the PidDailyerRollingFileAppend class instead if you want to use separate log files +# for different CLI session. +# +# log4j.appender.DRFA=org.apache.hadoop.hive.ql.log.PidDailyRollingFileAppender + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + +log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n +log4j.appender.console.encoding=UTF-8 + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter + + +log4j.category.DataNucleus=ERROR,DRFA +log4j.category.Datastore=ERROR,DRFA +log4j.category.Datastore.Schema=ERROR,DRFA +log4j.category.JPOX.Datastore=ERROR,DRFA +log4j.category.JPOX.Plugin=ERROR,DRFA +log4j.category.JPOX.MetaData=ERROR,DRFA +log4j.category.JPOX.Query=ERROR,DRFA +log4j.category.JPOX.General=ERROR,DRFA +log4j.category.JPOX.Enhancer=ERROR,DRFA + + +# Silence useless ZK logs +log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml new file mode 100644 index 0000000..70ccc31 --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml @@ -0,0 +1,77 @@ + + + + + + + + javax.jdo.option.ConnectionURL + jdbc:mysql://localhost/metastore + + + javax.jdo.option.ConnectionUserName + hive + + + javax.jdo.option.ConnectionPassword + hive + + + javax.jdo.option.ConnectionDriverName + com.mysql.jdbc.Driver + + + datanucleus.autoCreateSchema + false + + + datanucleus.fixedDatastore + true + + + + hive.support.concurrency + true + + + hive.txn.manager + org.apache.hadoop.hive.ql.lockmgr.DbTxnManager + + + hive.enforce.bucketing + true + + + hive.exec.dynamic.partition.mode + nonstrict + + + + + diff --git a/hcatalog/src/test/e2e/templeton/deployers/env.sh b/hcatalog/src/test/e2e/templeton/deployers/env.sh index 958ced8..6cdeb04 100755 --- a/hcatalog/src/test/e2e/templeton/deployers/env.sh +++ b/hcatalog/src/test/e2e/templeton/deployers/env.sh @@ -50,6 +50,11 @@ if [ -z ${HADOOP_HOME} ]; then export HADOOP_HOME=/Users/${USER}/dev/hwxhadoop/hadoop-dist/target/hadoop-${HADOOP_VERSION} fi +if [ -z ${MYSQL_CLIENT_JAR} ]; then + #if using MySQL backed metastore + export MYSQL_CLIENT_JAR=/Users/${USER}/dev/mysql-connector-java-5.1.30/mysql-connector-java-5.1.30-bin.jar +fi + export TEZ_CLIENT_HOME=/Users/ekoifman/dev/apache-tez-client-${TEZ_VERSION} #Make sure Pig is built for the Hadoop version you are running export PIG_TAR_PATH=/Users/${USER}/dev/pig-${PIG_VERSION}-src/build diff --git a/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh b/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh index 0ead10a..a2248e5 100755 --- a/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh +++ b/hcatalog/src/test/e2e/templeton/deployers/start_hive_services.sh @@ -25,10 +25,17 @@ source ./env.sh #decide which DB to run against -cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml +#Derby +#cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.xml ${HIVE_HOME}/conf/hive-site.xml +cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ${HIVE_HOME}/conf/hive-site.xml #cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mssql.xml ${HIVE_HOME}/conf/hive-site.xml cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/webhcat/webhcat-site.xml ${HIVE_HOME}/hcatalog/etc/webhcat/webhcat-site.xml +cp ${PROJ_HOME}/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-log4j.properties ${HIVE_HOME}/conf/hive-log4j.properties + +if [ -f ${MYSQL_CLIENT_JAR} ]; then + cp ${MYSQL_CLIENT_JAR} ${HIVE_HOME}/lib +fi if [ -d ${WEBHCAT_LOG_DIR} ]; then rm -Rf ${WEBHCAT_LOG_DIR}; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 216a61c..0c4fe49 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -533,18 +533,29 @@ public void unlock(UnlockRequest rqst) } } + /** + * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} + */ + private static class LockInfoExt extends LockInfo { + private final ShowLocksResponseElement e; + LockInfoExt(ShowLocksResponseElement e, long intLockId) { + super(e, intLockId); + this.e = e; + } + } public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { try { Connection dbConn = null; ShowLocksResponse rsp = new ShowLocksResponse(); List elems = new ArrayList(); + List sortedList = new ArrayList(); Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id from HIVE_LOCKS"; LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -572,7 +583,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { if (!rs.wasNull()) e.setAcquiredat(acquiredAt); e.setUser(rs.getString(10)); e.setHostname(rs.getString(11)); - elems.add(e); + sortedList.add(new LockInfoExt(e, rs.getLong(12))); } LOG.debug("Going to rollback"); dbConn.rollback(); @@ -584,6 +595,12 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { closeStmt(stmt); closeDbConn(dbConn); } + //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined + //by checkLock() - makes diagnostics easier. + Collections.sort(sortedList, new LockInfoComparator()); + for(LockInfoExt lockInfoExt : sortedList) { + elems.add(lockInfoExt.e); + } rsp.setLocks(elems); return rsp; } catch (RetryException e) { @@ -1086,14 +1103,14 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE } private static class LockInfo { - long extLockId; - long intLockId; - long txnId; - String db; - String table; - String partition; - LockState state; - LockType type; + private final long extLockId; + private final long intLockId; + private final long txnId; + private final String db; + private final String table; + private final String partition; + private final LockState state; + private final LockType type; // Assumes the result set is set to a valid row LockInfo(ResultSet rs) throws SQLException { @@ -1107,12 +1124,27 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE switch (rs.getString("hl_lock_state").charAt(0)) { case LOCK_WAITING: state = LockState.WAITING; break; case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; + default: + state = null; } switch (rs.getString("hl_lock_type").charAt(0)) { case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; case LOCK_SHARED: type = LockType.SHARED_READ; break; case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; + default: + type = null; } + txnId = rs.getLong("hl_txnid"); + } + LockInfo(ShowLocksResponseElement e, long intLockId) { + extLockId = e.getLockid(); + this.intLockId = intLockId; + db = e.getDbname(); + table = e.getTablename(); + partition = e.getPartname(); + state = e.getState(); + type = e.getType(); + txnId = e.getTxnid(); } public boolean equals(Object other) { @@ -1130,15 +1162,22 @@ public String toString() { partition + " state:" + (state == null ? "null" : state.toString()) + " type:" + (type == null ? "null" : type.toString()); } + private boolean isDbLock() { + return db != null && table == null && partition == null; + } + private boolean isTableLock() { + return db != null && table != null && partition == null; + } } private static class LockInfoComparator implements Comparator { + private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); public boolean equals(Object other) { return this == other; } public int compare(LockInfo info1, LockInfo info2) { - // We sort by state (acquired vs waiting) and then by extLockId. + // We sort by state (acquired vs waiting) and then by LockType, they by id if (info1.state == LockState.ACQUIRED && info2.state != LockState .ACQUIRED) { return -1; @@ -1147,6 +1186,11 @@ public int compare(LockInfo info1, LockInfo info2) { info2.state == LockState .ACQUIRED) { return 1; } + + int sortByType = lockTypeComparator.compare(info1.type, info2.type); + if(sortByType != 0) { + return sortByType; + } if (info1.extLockId < info2.extLockId) { return -1; } else if (info1.extLockId > info2.extLockId) { @@ -1163,6 +1207,41 @@ public int compare(LockInfo info1, LockInfo info2) { } } + /** + * Sort more restrictive locks after less restrictive ones + */ + private final static class LockTypeComparator implements Comparator { + public boolean equals(Object other) { + return this == other; + } + public int compare(LockType t1, LockType t2) { + switch (t1) { + case EXCLUSIVE: + if(t2 == LockType.EXCLUSIVE) { + return 0; + } + return 1; + case SHARED_WRITE: + switch (t2) { + case EXCLUSIVE: + return -1; + case SHARED_WRITE: + return 0; + case SHARED_READ: + return 1; + default: + throw new RuntimeException("Unexpected LockType: " + t2); + } + case SHARED_READ: + if(t2 == LockType.SHARED_READ) { + return 0; + } + return -1; + default: + throw new RuntimeException("Unexpected LockType: " + t1); + } + } + } private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING} // A jump table to figure out whether to wait, acquire, @@ -1362,11 +1441,11 @@ private LockResponse checkLock(Connection dbConn, LockResponse response = new LockResponse(); response.setLockid(extLockId); - LOG.debug("Setting savepoint"); + LOG.debug("checkLock(): Setting savepoint. extLockId=" + extLockId); Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); Set strings = new HashSet(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1431,19 +1510,26 @@ private LockResponse checkLock(Connection dbConn, query.append("))"); } } + query.append(" and hl_lock_ext_id <= ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); Statement stmt = null; try { stmt = dbConn.createStatement(); ResultSet rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); while (rs.next()) { lockSet.add(new LockInfo(rs)); } // Turn the tree set into an array so we can move back and forth easily // in it. - LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]); + LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); + if(LOG.isDebugEnabled()) { + LOG.debug("Locks to check(full): "); + for(LockInfo info : locks) { + LOG.debug(" " + info); + } + } for (LockInfo info : locksBeingChecked) { // Find the lock record we're checking @@ -1496,22 +1582,27 @@ private LockResponse checkLock(Connection dbConn, // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. - switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { + LockAction lockAction = jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state); + LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); + switch (lockAction) { + case WAIT: + if(!ignoreConflict(info, locks[i])) { + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); + return response; + } + //fall through to ACQUIRE case ACQUIRE: acquire(dbConn, stmt, extLockId, info.intLockId); acquired = true; break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; case KEEP_LOOKING: continue; } @@ -1534,6 +1625,19 @@ private LockResponse checkLock(Connection dbConn, return response; } + /** + * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not + * sufficient. For example, an EXCLUSIVE lock on partition should prevent SHARED_READ + * on the table, but there is no reason for EXCLUSIVE on a table to prevent SHARED_READ + * on a database. + */ + private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) { + return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && + existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || + (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && + desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE); + } + private void wait(Connection dbConn, Savepoint save) throws SQLException { // Need to rollback because we did a select that acquired locks but we didn't // actually update anything. Also, we may have locked some locks as @@ -1654,7 +1758,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) try { stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d2eed88..54c83a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1317,7 +1317,7 @@ public int execute() throws CommandNeedRetryException { maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); try { - LOG.info("Starting command: " + queryStr); + LOG.info("Starting command(queryId=" + queryId + "): " + queryStr); plan.setStarted(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index de7d414..a399b4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -74,10 +74,12 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * @param lock lock request * @throws LockException */ - List lock(LockRequest lock) throws LockException { + List lock(LockRequest lock, String queryId) throws LockException { try { - LOG.debug("Requesting lock"); + LOG.debug("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); + //link lockId to queryId + LOG.debug("Response " + res); while (res.getState() == LockState.WAITING) { backoff(); res = client.checkLock(res.getLockid()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index d11fabd..53bfdf4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -94,7 +94,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo boolean atLeastOneLock = false; LockRequestBuilder rqstBuilder = new LockRequestBuilder(); - LOG.debug("Setting lock request transaction to " + txnId); + //link queryId to txnId + LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) .setUser(username); @@ -206,9 +207,12 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo // Make sure we need locks. It's possible there's nothing to lock in // this operation. - if (!atLeastOneLock) return; + if (!atLeastOneLock) { + LOG.debug("No locks needed for queryId" + plan.getQueryId()); + return; + } - List locks = lockMgr.lock(rqstBuilder.build()); + List locks = lockMgr.lock(rqstBuilder.build(), plan.getQueryId()); ctx.setHiveLocks(locks); }