diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4d71eb4f4d..30312aa887 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1229,10 +1229,8 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( - delta.minWriteId, delta.maxWriteId)) { + if(ValidWriteIdList.RangeResponse.ALL == + writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } if (writeIdList.isWriteIdRangeValid( diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3565616171..6be0ff2147 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -18,8 +18,13 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; @@ -32,9 +37,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,13 +50,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,9 +63,6 @@ private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; - // List of compactions to clean. - private Map> compactId2LockMap = new HashMap<>(); - private Map compactId2CompactInfoMap = new HashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { @@ -96,95 +89,9 @@ public void run() { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - // First look for all the compactions that are waiting to be cleaned. If we have not - // seen an entry before, look for all the locks held on that table or partition and - // record them. We will then only clean the partition once all of those locks have been - // released. This way we avoid removing the files while they are in use, - // while at the same time avoiding starving the cleaner as new readers come along. - // This works because we know that any reader who comes along after the worker thread has - // done the compaction will read the more up to date version of the data (either in a - // newer delta or in a newer base). - List toClean = txnHandler.findReadyToClean(); - { - /** - * Since there may be more than 1 instance of Cleaner running we may have state info - * for items which were cleaned by instances. Here we remove them. - * - * In the long run if we add end_time to compaction_queue, then we can check that - * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case - * we know the lock owner is reading files created by this compaction or later. - * The advantage is that we don't have to store the locks. - */ - Set currentToCleanSet = new HashSet<>(); - for (CompactionInfo ci : toClean) { - currentToCleanSet.add(ci.id); - } - Set cleanPerformedByOthers = new HashSet<>(); - for (long id : compactId2CompactInfoMap.keySet()) { - if (!currentToCleanSet.contains(id)) { - cleanPerformedByOthers.add(id); - } - } - for (long id : cleanPerformedByOthers) { - compactId2CompactInfoMap.remove(id); - compactId2LockMap.remove(id); - } - } - if (toClean.size() > 0 || compactId2LockMap.size() > 0) { - ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - if(LOG.isDebugEnabled()) { - dumpLockState(locksResponse); - } - for (CompactionInfo ci : toClean) { - // Check to see if we have seen this request before. If so, ignore it. If not, - // add it to our queue. - if (!compactId2LockMap.containsKey(ci.id)) { - compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); - compactId2CompactInfoMap.put(ci.id, ci); - } - } - - // Now, for each entry in the queue, see if all of the associated locks are clear so we - // can clean - Set currentLocks = buildCurrentLockSet(locksResponse); - List expiredLocks = new ArrayList(); - List compactionsCleaned = new ArrayList(); - try { - for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { - boolean sawLock = false; - for (Long lockId : queueEntry.getValue()) { - if (currentLocks.contains(lockId)) { - sawLock = true; - break; - } else { - expiredLocks.add(lockId); - } - } - - if (!sawLock) { - // Remember to remove this when we're out of the loop, - // we can't do it in the loop or we'll get a concurrent modification exception. - compactionsCleaned.add(queueEntry.getKey()); - //Future thought: this may be expensive so consider having a thread pool run in parallel - clean(compactId2CompactInfoMap.get(queueEntry.getKey())); - } else { - // Remove the locks we didn't see so we don't look for them again next time - for (Long lockId : expiredLocks) { - queueEntry.getValue().remove(lockId); - } - LOG.info("Skipping cleaning of " + - idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + - " due to reader present: " + queueEntry.getValue()); - } - } - } finally { - if (compactionsCleaned.size() > 0) { - for (Long compactId : compactionsCleaned) { - compactId2LockMap.remove(compactId); - compactId2CompactInfoMap.remove(compactId); - } - } - } + Long minOpenTxn = txnHandler.findMinHistoryLevel(); + for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + clean(compactionInfo, minOpenTxn); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + @@ -212,41 +119,7 @@ public void run() { } while (!stop.get()); } - private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { - Set relatedLocks = new HashSet(); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - /** - * Hive QL is not case sensitive wrt db/table/column names - * Partition names get - * normalized (as far as I can tell) by lower casing column name but not partition value. - * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} - * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} - * Since user input may start out in any case, compare here case-insensitive for db/table - * but leave partition name as is. - */ - if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { - if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { - if ((ci.partName == null && lock.getPartname() == null) || - (ci.partName != null && ci.partName.equals(lock.getPartname()))) { - relatedLocks.add(lock.getLockid()); - } - } - } - } - - return relatedLocks; - } - - private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { - Set currentLocks = new HashSet(locksResponse.getLocks().size()); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - currentLocks.add(lock.getLockid()); - } - return currentLocks; - } - - private void clean(CompactionInfo ci) throws MetaException { + private void clean(CompactionInfo ci, Long minOpenTxn) throws MetaException { LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); @@ -271,6 +144,24 @@ private void clean(CompactionInfo ci) throws MetaException { StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); + ValidReaderWriteIdList minOpenValidWriteIdList = null; + if(minOpenTxn != null) { + //if minOpenTxn==null => we have no open txns and we'll use CompactionInfo.highestWriteId + ValidTxnList validTxnList = TxnUtils. + createValidReadTxnList(txnHandler.getOpenTxns(), minOpenTxn); + List tblNames = Collections.singletonList( + TableName.getDbTable(t.getDbName(), t.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest( + tblNames, validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + minOpenValidWriteIdList = TxnUtils.createValidReaderWriteIdList( + rsp.getTblValidWriteIds().get(0)); + } /** * Each Compaction only compacts as far as the highest txn id such that all txns below it * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked @@ -287,10 +178,13 @@ private void clean(CompactionInfo ci) throws MetaException { * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete * unless ValidWriteIdList is "capped" at highestWriteId. */ - final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), - ci.highestWriteId) - : new ValidReaderWriteIdList(); + final ValidWriteIdList validWriteIdList = + (minOpenValidWriteIdList != null) ? + minOpenValidWriteIdList : + ((ci.highestWriteId > 0) ? + new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], + new BitSet(), ci.highestWriteId) : + new ValidReaderWriteIdList()); if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); @@ -327,7 +221,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); - List filesToDelete = new ArrayList(obsoleteDirs.size()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); @@ -337,9 +231,6 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - List compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); - Collections.sort(compactIds); - extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { @@ -359,63 +250,4 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti fs.delete(dead, true); } } - private static class LockComparator implements Comparator { - //sort ascending by resource, nulls first - @Override - public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { - if(o1 == o2) { - return 0; - } - if(o1 == null) { - return -1; - } - if(o2 == null) { - return 1; - } - int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); - if(v != 0) { - return v; - } - if(o1.getTablename() == null) { - return -1; - } - if(o2.getTablename() == null) { - return 1; - } - v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); - if(v != 0) { - return v; - } - if(o1.getPartname() == null) { - return -1; - } - if(o2.getPartname() == null) { - return 1; - } - v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); - if(v != 0) { - return v; - } - //if still equal, compare by lock ids - v = Long.compare(o1.getLockid(), o2.getLockid()); - if(v != 0) { - return v; - } - return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); - - } - } - private void dumpLockState(ShowLocksResponse slr) { - Iterator l = slr.getLocksIterator(); - List sortedList = new ArrayList<>(); - while(l.hasNext()) { - sortedList.add(l.next()); - } - //sort for readability - sortedList.sort(new LockComparator()); - LOG.info("dumping locks"); - for(ShowLocksResponseElement lock : sortedList) { - LOG.info(lock.toString()); - } - } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cbb76d51d6..1f35e5e94f 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -324,6 +324,37 @@ public void markCompacted(CompactionInfo info) throws MetaException { return findReadyToClean(); } } + @Override + public Long findMinHistoryLevel() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select MHL_MIN_OPEN_TXNID from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if(rs.next()) { + return rs.getLong(1); + } + return null;//we have no active txns in the system.... + } catch (SQLException e) { + LOG.error("Unable to select MHL_MIN_OPEN_TXNID for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findMinHistoryLevel"); + throw new MetaException("Unable to execute findMinHistoryLevel() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinHistoryLevel(); + } + + } /** * This will remove an entry from the queue after diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 080cc5284b..1e0190f6eb 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -352,6 +352,14 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.ReadOnly List findReadyToClean() throws MetaException; + /** + * Returns the smallest txnid that is seen in open state across all active + * transactions in the system. + * @return {@code null} if there are no active transactions + */ + @RetrySemantics.ReadOnly + Long findMinHistoryLevel() throws MetaException; + /** * This will remove an entry from the queue after * it has been compacted. diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index bd202edb91..8114076be4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -181,17 +181,6 @@ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValid } } - public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { - // This is based on the existing valid write ID list that was built for a select query; - // therefore we assume all the aborted txns, etc. were already accounted for. - // All we do is adjust the high watermark to only include contiguous txns. - Long minOpenWriteId = ids.getMinOpenWriteId(); - if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { - return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); - } - return ids; - } - /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration