diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 73255d2..bea1473 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -39,6 +40,9 @@ public boolean tooManyAborts = false; /** * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) + * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition. + * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and + * {@link ValidCompactorTxnList#highWatermark} */ public long highestTxnId; byte[] metaInfo; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index da2b395..250a101 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -627,6 +627,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. + * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. */ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { Connection dbConn = null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 53d2bb4..95f0ddf 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hive.metastore.txn; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.util.concurrent.Service; import com.jolbox.bonecp.BoneCPConfig; import com.jolbox.bonecp.BoneCPDataSource; +import org.apache.commons.collections.map.HashedMap; import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Stat; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.Warehouse; @@ -43,6 +48,8 @@ import java.io.IOException; import java.sql.*; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -85,7 +92,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -abstract class TxnHandler implements TxnStore { +abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; @@ -1372,14 +1379,14 @@ protected Connection getDbConn(int isolationLevel) throws SQLException { } } - void rollbackDBConn(Connection dbConn) { + static void rollbackDBConn(Connection dbConn) { try { if (dbConn != null && !dbConn.isClosed()) dbConn.rollback(); } catch (SQLException e) { LOG.warn("Failed to rollback db connection " + getMessage(e)); } } - protected void closeDbConn(Connection dbConn) { + protected static void closeDbConn(Connection dbConn) { try { if (dbConn != null && !dbConn.isClosed()) { dbConn.close(); @@ -1393,7 +1400,7 @@ protected void closeDbConn(Connection dbConn) { * Close statement instance. * @param stmt statement instance. */ - protected void closeStmt(Statement stmt) { + protected static void closeStmt(Statement stmt) { try { if (stmt != null && !stmt.isClosed()) stmt.close(); } catch (SQLException e) { @@ -1405,7 +1412,7 @@ protected void closeStmt(Statement stmt) { * Close the ResultSet. * @param rs may be {@code null} */ - void close(ResultSet rs) { + static void close(ResultSet rs) { try { if (rs != null && !rs.isClosed()) { rs.close(); @@ -1419,7 +1426,7 @@ void close(ResultSet rs) { /** * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn} */ - void close(ResultSet rs, Statement stmt, Connection dbConn) { + static void close(ResultSet rs, Statement stmt, Connection dbConn) { close(rs); closeStmt(stmt); closeDbConn(dbConn); @@ -2564,6 +2571,23 @@ private static boolean isRetryable(Exception ex) { } return false; } + private boolean isDuplicateKeyError(SQLException ex) { + switch (dbProduct) { + case DERBY: + if("23505".equals(ex.getSQLState())) { + //what about error code? + } + break; + case MYSQL: + if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) { + return true; + } + break; + default: + throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex)); + } + return false; + } private static String getMessage(SQLException ex) { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } @@ -2638,4 +2662,108 @@ private void unlockInternal() { derbyLock.unlock(); } } + @Override + public MutexAPI getMutexAPI() { + return this; + } + + /** + * todo: should this do retries on SQLException? + * @param key + * @return + */ + @Override + public LockHandle acquireLock(String key) { + if(dbProduct == DatabaseProduct.DERBY) { + try { + derbyKey2Lock.putIfAbsent(key, new Semaphore(1)); + Semaphore s = derbyKey2Lock.get(key); + s.acquire(); + return new LockHandleDerby(key, s); + } + catch(InterruptedException ex) { + throw new IllegalStateException("Unable to lock " + quoteString(key), ex); + } + } + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery(addForUpdateClause("select MT_COMMENT from AUX_TABLE where key1=" + quoteString(key) + " and key2=0")); + if(!rs.next()) { + try { + stmt.executeUpdate("insert into AUX_TABLE values(" + quoteString(key) + ", 0)"); + } + catch(SQLException ex) { + if(!isDuplicateKeyError(ex)) { + throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex); + } + } + close(rs); + rs = stmt.executeQuery(addForUpdateClause("select MT_COMMENT from AUX_TABLE where key1=" + quoteString(key) + " and key2=0")); + } + if(!rs.next()) { + throw new IllegalStateException("Unable to lock " + quoteString(key) + ". Expected row in AUX_TABLE is missing."); + } + //OK, so now we have a lock + return new LockHandleImpl(dbConn, stmt, rs, key); + } + catch(Exception ex) { + close(rs, stmt, dbConn); + throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + + ((ex instanceof SQLException) ? getMessage((SQLException) ex) : ex.getMessage()), ex); + } + } + public void acquireLock(String key, LockHandle handle) { + //the idea is that this will use LockHandle.dbConn + throw new NotImplementedException(); + } + private static final class LockHandleImpl implements LockHandle { + private final Connection dbConn; + private final Statement stmt; + private final ResultSet rs; + private final List keys = new ArrayList<>(); + LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key) { + this.dbConn = conn; + this.stmt = stmt; + this.rs = rs; + keys.add(key); + } + void addKey(String key) { + //keys.add(key); + throw new NotImplementedException(); + } + + @Override + public void releaseLocks() { + rollbackDBConn(dbConn); + close(rs, stmt, dbConn); + } + } + private ConcurrentHashMap derbyKey2Lock = new ConcurrentHashMap<>(); + + private static final class LockHandleDerby implements LockHandle { + private final List keys = new ArrayList<>(); + private final List locks = new ArrayList<>(); + /** + * Make sure this is only created after the Semaphore is acquired else it may later + * release a Semaphore it doesn't own + */ + LockHandleDerby(String key, Semaphore lock) { + keys.add(key); + locks.add(lock); + } + void addKey(String key) { + throw new NotImplementedException(); + } + @Override + public void releaseLocks() { + for(Semaphore s : locks) { + s.release(); + } + } + + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 6d738b5..8b84e4d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -47,6 +47,7 @@ @InterfaceStability.Evolving public interface TxnStore { + public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory} // Compactor states (Should really be enum) static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; @@ -329,10 +330,41 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, */ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; - @VisibleForTesting public int numLocksInLockTable() throws SQLException, MetaException; @VisibleForTesting long setTimeout(long milliseconds); + + public MutexAPI getMutexAPI(); + + /** + * This is primarily designed to provide coarse grained mutex support to operations running + * inside the Metastore (of which there could be several instances). The initial goal is to + * ensure that various sub-processes of the Compactor don't step on each other. + * + * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly. + */ + public static interface MutexAPI { + //todo: define some enum with key names + /** + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * a handle which must be used to release the lock. Each invocation returns a new handle. + */ + public LockHandle acquireLock(String key); + + /** + * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This + * will associate the lock on {@code key} with the same handle. All locks associated with + * the same handle will released together. + * @param handle not NULL + */ + public void acquireLock(String key, LockHandle handle); + public static interface LockHandle { + /** + * Releases all locks associcated with this handle. + */ + public void releaseLocks(); + } + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 0d90b11..f0c1f05 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -74,8 +74,8 @@ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txn int i = 0; for (TxnInfo txn : txns.getOpen_txns()) { if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); - exceptions[i++] = txn.getId(); - } + exceptions[i++] = txn.getId();//todo: only add Aborted + }//remove all exceptions < minOpenTxn highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; return new ValidCompactorTxnList(exceptions, -1, highWater); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java index 648fd49..30bdfa7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java @@ -24,7 +24,7 @@ import java.util.Arrays; /** - * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. + * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it * is committed or aborted. Additionally it will return none if there are any open transactions * below the max transaction given, since we don't want to compact above open transactions. For diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java index 59c8fe4..5d9e7be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java @@ -61,7 +61,9 @@ private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { @Override public void run() { + TxnStore.MutexAPI.LockHandle handle = null; try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name()); long startTime = System.currentTimeMillis(); txnHandler.purgeCompactionHistory(); int count = isAliveCounter.incrementAndGet(); @@ -70,6 +72,11 @@ public void run() { catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index 882562b..13b10de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -61,7 +61,9 @@ private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { } @Override public void run() { + TxnStore.MutexAPI.LockHandle handle = null; try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name()); long startTime = System.currentTimeMillis(); txnHandler.performTimeOuts(); int count = isAliveCounter.incrementAndGet(); @@ -70,6 +72,11 @@ public void run() { catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fbf5481..cfc840e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -72,11 +73,13 @@ public void run() { // and if so remembers that and then sets it to true at the end. We have to check here // first to make sure we go through a complete iteration of the loop before resetting it. boolean setLooped = !looped.get(); - long startedAt = System.currentTimeMillis(); + TxnStore.MutexAPI.LockHandle handle = null; + long startedAt = -1; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { - + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); + startedAt = System.currentTimeMillis(); // First look for all the compactions that are waiting to be cleaned. If we have not // seen an entry before, look for all the locks held on that table or partition and // record them. We will then only clean the partition once all of those locks have been @@ -86,6 +89,31 @@ public void run() { // done the compaction will read the more up to date version of the data (either in a // newer delta or in a newer base). List toClean = txnHandler.findReadyToClean(); + { + /** + * Since there may be more than 1 instance of Cleaner running we may have state info + * for items which were cleaned by instances. Here we remove them. + * + * In the long run if we add end_time to compaction_queue, then we can check that + * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case + * we know the lock owner is reading files created by this compaction or later. + * The advantage is that we don't have to store the locks. + */ + Set currentToCleanSet = new HashSet<>(); + for (CompactionInfo ci : toClean) { + currentToCleanSet.add(ci.id); + } + Set cleanPerformedByOthers = new HashSet<>(); + for (long id : compactId2CompactInfoMap.keySet()) { + if (!currentToCleanSet.contains(id)) { + cleanPerformedByOthers.add(id); + } + } + for (long id : cleanPerformedByOthers) { + compactId2CompactInfoMap.remove(id); + compactId2LockMap.remove(id); + } + } if (toClean.size() > 0 || compactId2LockMap.size() > 0) { ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); @@ -140,6 +168,11 @@ public void run() { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); } + finally { + if (handle != null) { + handle.releaseLocks(); + } + } if (setLooped) { looped.set(true); } @@ -206,10 +239,24 @@ private void clean(CompactionInfo ci) throws MetaException { StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); - // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open - // transactions. This assures that all deltas are treated as valid and all we return are - // obsolete files. - final ValidTxnList txnList = new ValidReadTxnList(); + /** + * Each Compaction only compacts as far as the highest txn id such that all txns below it + * are resolved (i.e. not opened). This is what "highestTxnId" tracks. This is only tracked + * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorTxnList and uses for more info. + * + * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from + * under an active reader. + * + * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a + * clean request for D2. + * Cleaner checks existing locks and finds none. + * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction + * completes which creates D4. + * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete + * unless ValidTxnList is "capped" at highestTxnId. + */ + final ValidTxnList txnList = ci.highestTxnId > 0 ? + new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 3e22548..b3c7bde 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -74,11 +74,15 @@ public void run() { // much easier. The stop value is only for testing anyway and not used when called from // HiveMetaStore. do { - long startedAt = System.currentTimeMillis(); + long startedAt = -1; + TxnStore.MutexAPI.LockHandle handle = null; // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop // don't doom the entire thread. - try {//todo: add method to only get current i.e. skip history - more efficient + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); + startedAt = System.currentTimeMillis(); + //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); @@ -114,6 +118,8 @@ public void run() { // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, // as it may be time to compact again even though we haven't cleaned. + //todo: this is not robust. You can easily run Alter Table to start a compaction between + //the time currentCompactions is generated and now if (lookForCurrentCompactions(currentCompactions, ci)) { LOG.debug("Found currently initiated or working compaction for " + ci.getFullPartitionName() + " so we will not initiate another compaction"); @@ -134,7 +140,39 @@ public void run() { } StorageDescriptor sd = resolveStorageDescriptor(t, p); String runAs = findUserToRunAs(sd.getLocation(), t); - + /*todo: this may need to read a lot of File metadata so could be expensive + so we should lock the partition/table in AUX_TABLE beore this and unlock + after compaction is scheduled; could also lock the whole 'inside of loop' here + + Note on locking here. To lock a table for example, we need to + 1. insert a row into AUX_TABLE with KEY1 = db/table and probably KEY2=N where N is some + number that identifies a particular operation. For example, Initiator is 1. + This has a drawback that you can't easily lock all db/table entries because anyone can + insert a new (db/table, N) combination for some other N. + (also since both key1/key2 are part of PK, how do you plan to generate sequences? + you can't usually update a PK so you'd have to select cur, delete old, insert new with + 1 + I'm not even sure that would work with S4U if you locked all key1 rows since you'd + be inserting a new one which would not be locked as it didn't exist at time of lock + though they would collide on insert in case of race....) + In any case, Insert will throw in case of duplicate. + so once this row is there we can S4U it to mutex an operation. + How do we ever delete this row? We can either delete at the end of the operation... probably bad idea for many reasons + A. do S4U. if there is a hit, proceed in the same txn + if no hit, insert in separate txn to make visible + then go to "do S4U" + This means delete would have to run at some later time (really should have included an + update timestamp column to do purge for inactive items so that AUX_TABLE doesn't + grow out of bounds (imagine millions of partitions) + W/o this could add purge process that selects all rows and then deletes 1-1 in a separate + txns once an hour or something .... + + could also just lock the whole initiator... this way nothing leaks and it's no worse than before in + terms of concurrency; though perhaps somewhat inefficient if multiple Initiators keep stepping on each other + + Actually "whole initiator" can mean the whole iteration of this loop. This makes it easy and there is nothing to clean up + It matches current implementation wrt concurrency. If we every need to speed this up by parallelizing ops we + can do it from within Initiator instance. For example, we can run checkForCompaction() in separate thread... + */ CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { @@ -154,6 +192,11 @@ public void run() { LOG.error("Initiator loop caught unexpected exception this time through the loop: " + StringUtils.stringifyException(t)); } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } long elapsedTime = System.currentTimeMillis() - startedAt; if (elapsedTime >= checkInterval || stop.get()) continue;