diff --git common/src/java/org/apache/hadoop/hive/common/ServerUtils.java common/src/java/org/apache/hadoop/hive/common/ServerUtils.java index b44f92f..af4e9b2 100644 --- common/src/java/org/apache/hadoop/hive/common/ServerUtils.java +++ common/src/java/org/apache/hadoop/hive/common/ServerUtils.java @@ -65,5 +65,16 @@ public static InetAddress getHostAddress(String hostname) throws UnknownHostExce } return serverIPAddress; } + /** + * @return name of current host + */ + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 73255d2..bea1473 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -39,6 +40,9 @@ public boolean tooManyAborts = false; /** * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) + * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition. + * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and + * {@link ValidCompactorTxnList#highWatermark} */ public long highestTxnId; byte[] metaInfo; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index da2b395..b81735d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -160,6 +160,8 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725) + Statement updStmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -173,6 +175,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { dbConn.rollback(); return null; } + updStmt = dbConn.createStatement(); do { CompactionInfo info = new CompactionInfo(); info.id = rs.getLong(1); @@ -186,7 +189,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + " AND cq_state='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); + int updCount = updStmt.executeUpdate(s); if(updCount == 1) { dbConn.commit(); return info; @@ -210,6 +213,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(updStmt); close(rs, stmt, dbConn); } } catch (RetryException e) { @@ -627,6 +631,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. + * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. */ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { Connection dbConn = null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 5d10b5c..b9788d0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -140,6 +140,13 @@ public static void prepDb() throws Exception { " CC_HIGHEST_TXN_ID bigint," + " CC_META_INFO varchar(2048) for bit data," + " CC_HADOOP_JOB_ID varchar(32))"); + + stmt.execute("CREATE TABLE AUX_TABLE (" + + " MT_KEY1 varchar(128) NOT NULL," + + " MT_KEY2 bigint NOT NULL," + + " MT_COMMENT varchar(255)," + + " PRIMARY KEY(MT_KEY1, MT_KEY2)" + + ")"); conn.commit(); } catch (SQLException e) { @@ -185,6 +192,7 @@ public static void cleanDb() throws Exception { dropTable(stmt, "COMPACTION_QUEUE"); dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); dropTable(stmt, "COMPLETED_COMPACTIONS"); + dropTable(stmt, "AUX_TABLE"); conn.commit(); } finally { closeResources(conn, stmt, null); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 53d2bb4..a120f56 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -23,6 +23,8 @@ import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.Warehouse; @@ -43,6 +45,8 @@ import java.io.IOException; import java.sql.*; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -85,7 +89,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -abstract class TxnHandler implements TxnStore { +abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; @@ -136,6 +140,12 @@ * Derby specific concurrency control */ private static final ReentrantLock derbyLock = new ReentrantLock(true); + /** + * must be static since even in UT there may be > 1 instance of TxnHandler + * (e.g. via Compactor services) + */ + private final static ConcurrentHashMap derbyKey2Lock = new ConcurrentHashMap<>(); + private static final String hostname = ServerUtils.hostname(); // Private methods should never catch SQLException and then throw MetaException. The public // methods depend on SQLException coming back so they can detect and handle deadlocks. Private @@ -514,7 +524,7 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * @throws MetaException */ private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : ""); + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); ResultSet rs = stmt.executeQuery(addForUpdateClause(query)); if(rs.next()) { return rs; @@ -1372,14 +1382,14 @@ protected Connection getDbConn(int isolationLevel) throws SQLException { } } - void rollbackDBConn(Connection dbConn) { + static void rollbackDBConn(Connection dbConn) { try { if (dbConn != null && !dbConn.isClosed()) dbConn.rollback(); } catch (SQLException e) { LOG.warn("Failed to rollback db connection " + getMessage(e)); } } - protected void closeDbConn(Connection dbConn) { + protected static void closeDbConn(Connection dbConn) { try { if (dbConn != null && !dbConn.isClosed()) { dbConn.close(); @@ -1393,7 +1403,7 @@ protected void closeDbConn(Connection dbConn) { * Close statement instance. * @param stmt statement instance. */ - protected void closeStmt(Statement stmt) { + protected static void closeStmt(Statement stmt) { try { if (stmt != null && !stmt.isClosed()) stmt.close(); } catch (SQLException e) { @@ -1405,7 +1415,7 @@ protected void closeStmt(Statement stmt) { * Close the ResultSet. * @param rs may be {@code null} */ - void close(ResultSet rs) { + static void close(ResultSet rs) { try { if (rs != null && !rs.isClosed()) { rs.close(); @@ -1419,7 +1429,7 @@ void close(ResultSet rs) { /** * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn} */ - void close(ResultSet rs, Statement stmt, Connection dbConn) { + static void close(ResultSet rs, Statement stmt, Connection dbConn) { close(rs); closeStmt(stmt); closeDbConn(dbConn); @@ -2564,6 +2574,40 @@ private static boolean isRetryable(Exception ex) { } return false; } + private boolean isDuplicateKeyError(SQLException ex) { + switch (dbProduct) { + case DERBY: + if("23505".equals(ex.getSQLState())) { + return true; + } + break; + case MYSQL: + if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case SQLSERVER: + //2627 is unique constaint violation incl PK, 2601 - unique key + if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case ORACLE: + if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) { + return true; + } + break; + case POSTGRES: + //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html + if("23505".equals(ex.getSQLState())) { + return true; + } + break; + default: + throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex)); + } + return false; + } private static String getMessage(SQLException ex) { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } @@ -2638,4 +2682,115 @@ private void unlockInternal() { derbyLock.unlock(); } } + @Override + public MutexAPI getMutexAPI() { + return this; + } + + @Override + public LockHandle acquireLock(String key) throws MetaException { + /** + * The implementation here is a bit kludgey but done so that code exercised by unit tests + * (which run against Derby which has no support for select for update) is as similar to + * production code as possible. + * In particular, with Derby we always run in a single process with a single metastore and + * the absence of For Update is handled via a Semaphore. The later would strictly speaking + * make the SQL statments below unnecessary (for Derby), but then they would not be tested. + */ + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0"); + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + if(LOG.isDebugEnabled()) { + LOG.debug("About to execute SQL: " + sqlStmt); + } + rs = stmt.executeQuery(sqlStmt); + if (!rs.next()) { + close(rs); + try { + stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)"); + dbConn.commit(); + } catch (SQLException ex) { + if (!isDuplicateKeyError(ex)) { + throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex); + } + } + rs = stmt.executeQuery(sqlStmt); + if (!rs.next()) { + throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing."); + } + } + Semaphore derbySemaphore = null; + if(dbProduct == DatabaseProduct.DERBY) { + derbyKey2Lock.putIfAbsent(key, new Semaphore(1)); + derbySemaphore = derbyKey2Lock.get(key); + derbySemaphore.acquire(); + } + LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname)); + //OK, so now we have a lock + return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore); + } catch (SQLException ex) { + rollbackDBConn(dbConn); + close(rs, stmt, dbConn); + checkRetryable(dbConn, ex, "acquireLock(" + key + ")"); + throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex)); + } + catch(InterruptedException ex) { + rollbackDBConn(dbConn); + close(rs, stmt, dbConn); + throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex)); + } + finally { + unlockInternal(); + } + } + catch(RetryException ex) { + acquireLock(key); + } + throw new MetaException("This can't happen because checkRetryable() has a retry limit"); + } + public void acquireLock(String key, LockHandle handle) { + //the idea is that this will use LockHandle.dbConn + throw new NotImplementedException(); + } + private static final class LockHandleImpl implements LockHandle { + private final Connection dbConn; + private final Statement stmt; + private final ResultSet rs; + private final Semaphore derbySemaphore; + private final List keys = new ArrayList<>(); + LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) { + this.dbConn = conn; + this.stmt = stmt; + this.rs = rs; + this.derbySemaphore = derbySemaphore; + if(derbySemaphore != null) { + //oterwise it may later release permit acquired by someone else + assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore"; + } + keys.add(key); + } + void addKey(String key) { + //keys.add(key); + //would need a list of (stmt,rs) pairs - 1 for each key + throw new NotImplementedException(); + } + + @Override + public void releaseLocks() { + rollbackDBConn(dbConn); + close(rs, stmt, dbConn); + if(derbySemaphore != null) { + derbySemaphore.release(); + } + for(String key : keys) { + LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname)); + } + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 6d738b5..927e9bc 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -47,6 +47,7 @@ @InterfaceStability.Evolving public interface TxnStore { + public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory} // Compactor states (Should really be enum) static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; @@ -329,10 +330,40 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, */ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; - @VisibleForTesting public int numLocksInLockTable() throws SQLException, MetaException; @VisibleForTesting long setTimeout(long milliseconds); + + public MutexAPI getMutexAPI(); + + /** + * This is primarily designed to provide coarse grained mutex support to operations running + * inside the Metastore (of which there could be several instances). The initial goal is to + * ensure that various sub-processes of the Compactor don't step on each other. + * + * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly. + */ + public static interface MutexAPI { + /** + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * a handle which must be used to release the lock. Each invocation returns a new handle. + */ + public LockHandle acquireLock(String key) throws MetaException; + + /** + * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This + * will associate the lock on {@code key} with the same handle. All locks associated with + * the same handle will be released together. + * @param handle not NULL + */ + public void acquireLock(String key, LockHandle handle) throws MetaException; + public static interface LockHandle { + /** + * Releases all locks associcated with this handle. + */ + public void releaseLocks(); + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 0d90b11..f0c1f05 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -74,8 +74,8 @@ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txn int i = 0; for (TxnInfo txn : txns.getOpen_txns()) { if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); - exceptions[i++] = txn.getId(); - } + exceptions[i++] = txn.getId();//todo: only add Aborted + }//remove all exceptions < minOpenTxn highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; return new ValidCompactorTxnList(exceptions, -1, highWater); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java index 648fd49..30bdfa7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java @@ -24,7 +24,7 @@ import java.util.Arrays; /** - * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. + * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it * is committed or aborted. Additionally it will return none if there are any open transactions * below the max transaction given, since we don't want to compact above open transactions. For diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index f0d23ba..6dcac58 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -50,11 +50,13 @@ import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.util.StringUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -68,6 +70,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; @@ -87,6 +90,7 @@ public TestTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); + LoggerContext ctx = (LoggerContext) LogManager.getContext(false); Configuration conf = ctx.getConfiguration(); conf.getLoggerConfig(CLASS_NAME).setLevel(Level.DEBUG); @@ -1252,6 +1256,97 @@ public void run() { } } + /** + * This cannnot be run against Derby (thus in UT) but it can run againt MySQL. + * 1. add to metastore/pom.xml + * + * mysql + * mysql-connector-java + * 5.1.30 + * + * 2. Hack in the c'tor of this class + * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore"); + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive"); + * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive"); + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); + * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack() + * + */ + @Ignore("multiple threads wedge Derby") + @Test + public void testMutexAPI() throws Exception { + final TxnStore.MutexAPI api = txnHandler.getMutexAPI(); + final AtomicInteger stepTracker = new AtomicInteger(0); + /** + * counter = 0; + * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock + * Thread2 counter=2, lock (and block), inc counter, should be 4 + */ + Thread t1 = new Thread("MutexTest1") { + public void run() { + try { + stepTracker.incrementAndGet();//now 1 + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); + Thread.sleep(4000); + //stepTracker should now be 2 which indicates t2 has started + Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get()); + stepTracker.incrementAndGet();//now 3 + handle.releaseLocks(); + } + catch(Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + }; + t1.setDaemon(true); + ErrorHandle ueh1 = new ErrorHandle(); + t1.setUncaughtExceptionHandler(ueh1); + Thread t2 = new Thread("MutexTest2") { + public void run() { + try { + stepTracker.incrementAndGet();//now 2 + //this should block until t1 unlocks + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); + stepTracker.incrementAndGet();//now 4 + Assert.assertEquals(4, stepTracker.get()); + handle.releaseLocks(); + stepTracker.incrementAndGet();//now 5 + } + catch(Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + }; + t2.setDaemon(true); + ErrorHandle ueh2 = new ErrorHandle(); + t2.setUncaughtExceptionHandler(ueh2); + t1.start(); + try { + Thread.sleep(1000); + } + catch(InterruptedException ex) { + LOG.info("Sleep was interrupted"); + } + t2.start(); + t1.join(6000);//so that test doesn't block + t2.join(6000); + + if(ueh1.error != null) { + Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false); + } + if (ueh2.error != null) { + Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false); + } + Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get()); + } + private final static class ErrorHandle implements Thread.UncaughtExceptionHandler { + Throwable error = null; + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage()); + error = e; + } + } private void updateTxns(Connection conn) throws SQLException { Statement stmt = conn.createStatement(); stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java index 59c8fe4..5d9e7be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java @@ -61,7 +61,9 @@ private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { @Override public void run() { + TxnStore.MutexAPI.LockHandle handle = null; try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name()); long startTime = System.currentTimeMillis(); txnHandler.purgeCompactionHistory(); int count = isAliveCounter.incrementAndGet(); @@ -70,6 +72,11 @@ public void run() { catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index 882562b..13b10de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -61,7 +61,9 @@ private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { } @Override public void run() { + TxnStore.MutexAPI.LockHandle handle = null; try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name()); long startTime = System.currentTimeMillis(); txnHandler.performTimeOuts(); int count = isAliveCounter.incrementAndGet(); @@ -70,6 +72,11 @@ public void run() { catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fbf5481..9ffeaec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -72,11 +73,13 @@ public void run() { // and if so remembers that and then sets it to true at the end. We have to check here // first to make sure we go through a complete iteration of the loop before resetting it. boolean setLooped = !looped.get(); - long startedAt = System.currentTimeMillis(); + TxnStore.MutexAPI.LockHandle handle = null; + long startedAt = -1; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { - + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); + startedAt = System.currentTimeMillis(); // First look for all the compactions that are waiting to be cleaned. If we have not // seen an entry before, look for all the locks held on that table or partition and // record them. We will then only clean the partition once all of those locks have been @@ -86,6 +89,31 @@ public void run() { // done the compaction will read the more up to date version of the data (either in a // newer delta or in a newer base). List toClean = txnHandler.findReadyToClean(); + { + /** + * Since there may be more than 1 instance of Cleaner running we may have state info + * for items which were cleaned by instances. Here we remove them. + * + * In the long run if we add end_time to compaction_queue, then we can check that + * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case + * we know the lock owner is reading files created by this compaction or later. + * The advantage is that we don't have to store the locks. + */ + Set currentToCleanSet = new HashSet<>(); + for (CompactionInfo ci : toClean) { + currentToCleanSet.add(ci.id); + } + Set cleanPerformedByOthers = new HashSet<>(); + for (long id : compactId2CompactInfoMap.keySet()) { + if (!currentToCleanSet.contains(id)) { + cleanPerformedByOthers.add(id); + } + } + for (long id : cleanPerformedByOthers) { + compactId2CompactInfoMap.remove(id); + compactId2LockMap.remove(id); + } + } if (toClean.size() > 0 || compactId2LockMap.size() > 0) { ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); @@ -119,6 +147,7 @@ public void run() { // Remember to remove this when we're out of the loop, // we can't do it in the loop or we'll get a concurrent modification exception. compactionsCleaned.add(queueEntry.getKey()); + //Future thought: this may be expensive so consider having a thread pool run in parallel clean(compactId2CompactInfoMap.get(queueEntry.getKey())); } else { // Remove the locks we didn't see so we don't look for them again next time @@ -140,6 +169,11 @@ public void run() { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); } + finally { + if (handle != null) { + handle.releaseLocks(); + } + } if (setLooped) { looped.set(true); } @@ -206,10 +240,24 @@ private void clean(CompactionInfo ci) throws MetaException { StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); - // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open - // transactions. This assures that all deltas are treated as valid and all we return are - // obsolete files. - final ValidTxnList txnList = new ValidReadTxnList(); + /** + * Each Compaction only compacts as far as the highest txn id such that all txns below it + * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked + * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info. + * + * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from + * under an active reader. + * + * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a + * clean request for D2. + * Cleaner checks existing locks and finds none. + * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction + * completes which creates D4. + * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete + * unless ValidTxnList is "capped" at highestTxnId. + */ + final ValidTxnList txnList = ci.highestTxnId > 0 ? + new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); @@ -249,7 +297,7 @@ private void removeFiles(String location, ValidTxnList txnList) throws IOExcepti FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { - LOG.debug("Doing to delete path " + dead.toString()); + LOG.debug("Going to delete path " + dead.toString()); fs.delete(dead, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 3e22548..916d9dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -74,11 +74,15 @@ public void run() { // much easier. The stop value is only for testing anyway and not used when called from // HiveMetaStore. do { - long startedAt = System.currentTimeMillis(); + long startedAt = -1; + TxnStore.MutexAPI.LockHandle handle = null; // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop // don't doom the entire thread. - try {//todo: add method to only get current i.e. skip history - more efficient + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); + startedAt = System.currentTimeMillis(); + //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); @@ -114,6 +118,8 @@ public void run() { // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, // as it may be time to compact again even though we haven't cleaned. + //todo: this is not robust. You can easily run Alter Table to start a compaction between + //the time currentCompactions is generated and now if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); @@ -134,7 +140,9 @@ public void run() { } StorageDescriptor sd = resolveStorageDescriptor(t, p); String runAs = findUserToRunAs(sd.getLocation(), t); - + /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. + * Long term we should consider having a thread pool here and running checkForCompactionS + * in parallel*/ CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { @@ -154,6 +162,11 @@ public void run() { LOG.error("Initiator loop caught unexpected exception this time through the loop: " + StringUtils.stringifyException(t)); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } long elapsedTime = System.currentTimeMillis() - startedAt; if (elapsedTime >= checkInterval || stop.get()) continue; diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 9b00435..54853fd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -229,7 +229,7 @@ private static void assertPredicateIsPushed(String ppd, List queryPlan) } Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true); } - @Ignore("alter table") +// @Ignore("alter table") @Test public void testAlterTable() throws Exception { int[][] tableData = {{1,2}}; @@ -604,7 +604,13 @@ public static void runCleaner(HiveConf hiveConf) throws MetaException { private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception { int lastCount = houseKeeperService.getIsAliveCounter(); houseKeeperService.start(conf); + int maxIter = 10; + int iterCount = 0; while(houseKeeperService.getIsAliveCounter() <= lastCount) { + if(iterCount++ >= maxIter) { + //prevent test hangs + throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits"); + } try { Thread.sleep(100);//make sure it has run at least once } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index a3bf9d3..3a6e76e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -189,7 +189,13 @@ public void testWriteDynamicPartition() throws Exception { private void runReaper() throws Exception { int lastCount = houseKeeperService.getIsAliveCounter(); houseKeeperService.start(conf); + int maxIter = 10; + int iterCount = 0; while(houseKeeperService.getIsAliveCounter() <= lastCount) { + if(iterCount++ >= maxIter) { + //prevent test hangs + throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits"); + } try { Thread.sleep(100);//make sure it has run at least once }