diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 71e51315d3..d83dd0fa0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -32,7 +32,7 @@ import java.util.Set; import java.util.regex.Pattern; -import org.apache.avro.generic.GenericData; +import com.google.common.base.Strings; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -57,7 +57,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -1046,6 +1045,22 @@ public static Directory getAcidState(Path directory, Ref useFileIds, boolean ignoreEmptyFiles, Map tblproperties) throws IOException { + ValidTxnList validTxnList = null; + String s = conf.get(ValidTxnList.VALID_TXNS_KEY); + if(!Strings.isNullOrEmpty(s)) { + /** + * getAcidState() is sometimes called on non-transactional tables, e.g. + * OrcInputFileFormat.FileGenerator.callInternal(). e.g. orc_merge3.q In that case + * writeIdList is bogus - doesn't even have a table name. + * see https://issues.apache.org/jira/browse/HIVE-20856. + * + * For now, assert that ValidTxnList.VALID_TXNS_KEY is set only if this is really a read + * of a transactional table. see getChildState() + */ + validTxnList = new ValidReadTxnList(); + validTxnList.readFromString(s); + } + FileSystem fs = directory.getFileSystem(conf); // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); @@ -1073,13 +1088,13 @@ public static Directory getAcidState(Path directory, if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, - obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); + obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } else { List children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, - bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); + bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList); } } @@ -1219,7 +1234,7 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List aborted, Map tblproperties, - FileSystem fs) throws IOException { + FileSystem fs, ValidTxnList validTxnList) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (!child.isDirectory()) { @@ -1229,6 +1244,10 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi return; } if (fn.startsWith(BASE_PREFIX)) { + if(validTxnList == null) { + throw new IllegalArgumentException("No ValidTxnList for " + child.getPath()); + } + //todo: check that visibility txn id is valid long writeId = parseBase(p); if(bestBase.oldestBaseWriteId > writeId) { //keep track for error reporting @@ -1250,12 +1269,14 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi obsolete.add(child); } } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { + if(validTxnList == null) { + throw new IllegalArgumentException("No ValidTxnList for " + child.getPath()); + } + //todo: check that visibility txn id is valid String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - // Handle aborted deltas. Currently this can only happen for MM tables. - if (tblproperties != null && isTransactionalTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( - delta.minWriteId, delta.maxWriteId)) { + if(ValidWriteIdList.RangeResponse.ALL == + writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } if (writeIdList.isWriteIdRangeValid( diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 374e973243..00cdaaff59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -403,13 +403,13 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB // Make sure we need locks. It's possible there's nothing to lock in // this operation. if(plan.getInputs().isEmpty() && plan.getOutputs().isEmpty()) { - LOG.debug("No locks needed for queryId" + queryId); + LOG.debug("No locks needed for queryId=" + queryId); return null; } List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); //It's possible there's nothing to lock even if we have w/r entities. if(lockComponents.isEmpty()) { - LOG.debug("No locks needed for queryId" + queryId); + LOG.debug("No locks needed for queryId=" + queryId); return null; } rqstBuilder.addLockComponents(lockComponents); @@ -538,6 +538,9 @@ public void replTableWriteIdState(String validWriteIdList, String dbName, String } } + //todo: this won't work if there is a txn but no locks - won't do anything + //this can be a problem for multi-stmt txns or perhaps even for single stmt + //if a query takes a long time to compile @Override public void heartbeat() throws LockException { List locks; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index ba1f1ffe88..12c10273a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -262,7 +262,7 @@ void replTableWriteIdState(String validWriteIdList, String dbName, String tableN /** * For resources that support MVCC, the state of the DB must be recorded for the duration of the - * operation/transaction. Returns {@code true} if current statment needs to do this. + * operation/transaction. Returns {@code true} if current statement needs to do this. */ boolean recordSnapshot(QueryPlan queryPlan); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index ee7c940d2b..455ffc3887 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -501,6 +501,7 @@ public HiveTxnManager getTxnMgr() { * it's not coupled to the executing thread. Since tests run against Derby which often wedges * under concurrent access, tests must use a single thead and simulate concurrent access. * For example, {@code TestDbTxnManager2} + * @return previous {@link HiveTxnManager} or null */ @VisibleForTesting public HiveTxnManager setTxnMgr(HiveTxnManager mgr) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3565616171..66a90965ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,8 +17,14 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -32,9 +38,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,13 +51,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,9 +64,6 @@ private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; - // List of compactions to clean. - private Map> compactId2LockMap = new HashMap<>(); - private Map compactId2CompactInfoMap = new HashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { @@ -96,95 +90,9 @@ public void run() { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - // First look for all the compactions that are waiting to be cleaned. If we have not - // seen an entry before, look for all the locks held on that table or partition and - // record them. We will then only clean the partition once all of those locks have been - // released. This way we avoid removing the files while they are in use, - // while at the same time avoiding starving the cleaner as new readers come along. - // This works because we know that any reader who comes along after the worker thread has - // done the compaction will read the more up to date version of the data (either in a - // newer delta or in a newer base). - List toClean = txnHandler.findReadyToClean(); - { - /** - * Since there may be more than 1 instance of Cleaner running we may have state info - * for items which were cleaned by instances. Here we remove them. - * - * In the long run if we add end_time to compaction_queue, then we can check that - * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case - * we know the lock owner is reading files created by this compaction or later. - * The advantage is that we don't have to store the locks. - */ - Set currentToCleanSet = new HashSet<>(); - for (CompactionInfo ci : toClean) { - currentToCleanSet.add(ci.id); - } - Set cleanPerformedByOthers = new HashSet<>(); - for (long id : compactId2CompactInfoMap.keySet()) { - if (!currentToCleanSet.contains(id)) { - cleanPerformedByOthers.add(id); - } - } - for (long id : cleanPerformedByOthers) { - compactId2CompactInfoMap.remove(id); - compactId2LockMap.remove(id); - } - } - if (toClean.size() > 0 || compactId2LockMap.size() > 0) { - ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest()); - if(LOG.isDebugEnabled()) { - dumpLockState(locksResponse); - } - for (CompactionInfo ci : toClean) { - // Check to see if we have seen this request before. If so, ignore it. If not, - // add it to our queue. - if (!compactId2LockMap.containsKey(ci.id)) { - compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse)); - compactId2CompactInfoMap.put(ci.id, ci); - } - } - - // Now, for each entry in the queue, see if all of the associated locks are clear so we - // can clean - Set currentLocks = buildCurrentLockSet(locksResponse); - List expiredLocks = new ArrayList(); - List compactionsCleaned = new ArrayList(); - try { - for (Map.Entry> queueEntry : compactId2LockMap.entrySet()) { - boolean sawLock = false; - for (Long lockId : queueEntry.getValue()) { - if (currentLocks.contains(lockId)) { - sawLock = true; - break; - } else { - expiredLocks.add(lockId); - } - } - - if (!sawLock) { - // Remember to remove this when we're out of the loop, - // we can't do it in the loop or we'll get a concurrent modification exception. - compactionsCleaned.add(queueEntry.getKey()); - //Future thought: this may be expensive so consider having a thread pool run in parallel - clean(compactId2CompactInfoMap.get(queueEntry.getKey())); - } else { - // Remove the locks we didn't see so we don't look for them again next time - for (Long lockId : expiredLocks) { - queueEntry.getValue().remove(lockId); - } - LOG.info("Skipping cleaning of " + - idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) + - " due to reader present: " + queueEntry.getValue()); - } - } - } finally { - if (compactionsCleaned.size() > 0) { - for (Long compactId : compactionsCleaned) { - compactId2LockMap.remove(compactId); - compactId2CompactInfoMap.remove(compactId); - } - } - } + long minOpenTxnGLB = txnHandler.findMinOpenTxnGLB(); + for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { + clean(compactionInfo, minOpenTxnGLB); } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + @@ -212,41 +120,7 @@ public void run() { } while (!stop.get()); } - private Set findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) { - Set relatedLocks = new HashSet(); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - /** - * Hive QL is not case sensitive wrt db/table/column names - * Partition names get - * normalized (as far as I can tell) by lower casing column name but not partition value. - * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)} - * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)} - * Since user input may start out in any case, compare here case-insensitive for db/table - * but leave partition name as is. - */ - if (ci.dbname.equalsIgnoreCase(lock.getDbname())) { - if ((ci.tableName == null && lock.getTablename() == null) || - (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) { - if ((ci.partName == null && lock.getPartname() == null) || - (ci.partName != null && ci.partName.equals(lock.getPartname()))) { - relatedLocks.add(lock.getLockid()); - } - } - } - } - - return relatedLocks; - } - - private Set buildCurrentLockSet(ShowLocksResponse locksResponse) { - Set currentLocks = new HashSet(locksResponse.getLocks().size()); - for (ShowLocksResponseElement lock : locksResponse.getLocks()) { - currentLocks.add(lock.getLockid()); - } - return currentLocks; - } - - private void clean(CompactionInfo ci) throws MetaException { + private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { LOG.info("Starting cleaning for " + ci); try { Table t = resolveTable(ci); @@ -270,27 +144,40 @@ private void clean(CompactionInfo ci) throws MetaException { } StorageDescriptor sd = resolveStorageDescriptor(t, p); final String location = sd.getLocation(); - /** - * Each Compaction only compacts as far as the highest txn id such that all txns below it - * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked - * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info. - * - * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from - * under an active reader. + * todo: save this Use Case somewhere + * there are no open txns, then txn 7 starts, compactor makes base_N_c8 + * so if we make HWM=0 below, 8 will be visible but 7 will be reading + * below 8. so we should get current NEXT_TXN_ID, then check minOpen, + * and if there is nothing open, use NEXT_TXN_ID-1=HWM. + */ + ValidTxnList validTxnList = TxnCommonUtils. + //Visibility is capped at largest committed ID below which there are no open txns + createValidReadTxnList(txnHandler.getOpenTxns(), minOpenTxnGLB - 1); + //save it so that getAcidState() sees it + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + /** + * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta + * produced by a compactor, that means every reader that could be active right now see it + * as well. That means if this base/delta shadows some earlier base/delta, the it will be + * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. * - * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a - * clean request for D2. - * Cleaner checks existing locks and finds none. - * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction - * completes which creates D4. - * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete - * unless ValidWriteIdList is "capped" at highestWriteId. + * todo: given the above, COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID is not needed any more + * Repurpose it to store compactor txn id? */ - final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), - ci.highestWriteId) - : new ValidReaderWriteIdList(); + List tblNames = Collections.singletonList( + TableName.getDbTable(t.getDbName(), t.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); + rqst.setValidTxnList(validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + ValidReaderWriteIdList validWriteIdList = + TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); @@ -327,7 +214,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List obsoleteDirs = dir.getObsolete(); - List filesToDelete = new ArrayList(obsoleteDirs.size()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); for (FileStatus stat : obsoleteDirs) { filesToDelete.add(stat.getPath()); @@ -337,9 +224,6 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - List compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet()); - Collections.sort(compactIds); - extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")"); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); if (filesToDelete.size() < 1) { @@ -359,63 +243,4 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti fs.delete(dead, true); } } - private static class LockComparator implements Comparator { - //sort ascending by resource, nulls first - @Override - public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) { - if(o1 == o2) { - return 0; - } - if(o1 == null) { - return -1; - } - if(o2 == null) { - return 1; - } - int v = o1.getDbname().compareToIgnoreCase(o2.getDbname()); - if(v != 0) { - return v; - } - if(o1.getTablename() == null) { - return -1; - } - if(o2.getTablename() == null) { - return 1; - } - v = o1.getTablename().compareToIgnoreCase(o2.getTablename()); - if(v != 0) { - return v; - } - if(o1.getPartname() == null) { - return -1; - } - if(o2.getPartname() == null) { - return 1; - } - v = o1.getPartname().compareToIgnoreCase(o2.getPartname()); - if(v != 0) { - return v; - } - //if still equal, compare by lock ids - v = Long.compare(o1.getLockid(), o2.getLockid()); - if(v != 0) { - return v; - } - return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal()); - - } - } - private void dumpLockState(ShowLocksResponse slr) { - Iterator l = slr.getLocksIterator(); - List sortedList = new ArrayList<>(); - while(l.hasNext()) { - sortedList.add(l.next()); - } - //sort for readability - sortedList.sort(new LockComparator()); - LOG.info("dumping locks"); - for(ShowLocksResponseElement lock : sortedList) { - LOG.info(lock.toString()); - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 92c74e1d06..19e841b55b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -42,14 +42,13 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -59,10 +58,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -74,8 +70,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -238,6 +232,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor if (AcidUtils.isInsertOnlyTable(t.getParameters())) { if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { + //todo: pass compactorId, etc runMmCompaction(conf, t, p, sd, writeIds, ci); } return; @@ -351,6 +346,8 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, // Set up the session for driver. conf = new HiveConf(conf); conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + //todo: move into DriverUtils.runOnDriver()? have to clone conf in there then + conf.unset(ValidTxnList.VALID_TXNS_KEY); String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false); @@ -1179,6 +1176,7 @@ public void commitJob(JobContext context) throws IOException { //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir //name is that we want to rename; leave it for another day // TODO: if we expect one dir why don't we enforce it? + //may actually have delta_x_y and delete_delta_x_y for (FileStatus fileStatus : contents) { //newPath is the base/delta dir Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); @@ -1218,7 +1216,7 @@ private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds) throws IOException { Path fromPath = new Path(from), toPath = new Path(to); FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. + // Assume the high watermark can be used as maximum transaction ID. todo: is that true? can it be aborted? does it matter for compaction? long maxTxn = actualWriteIds.getHighWatermark(); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index dd0929f2b9..a7bac088e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -67,8 +67,9 @@ public void setConf(Configuration configuration) { // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to call other methods with // it. This should be changed to Configuration once everything that this calls that requires // HiveConf is moved to the standalone metastore. - conf = (configuration instanceof HiveConf) ? (HiveConf)configuration : - new HiveConf(configuration, HiveConf.class); + //clone the conf - compactor needs to set properties in it which we don't + // want to bleed into the caller + conf = new HiveConf(configuration, HiveConf.class); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index d9f186cd03..88c6740813 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,15 +18,22 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; @@ -58,7 +65,7 @@ static final private long SLEEP_TIME = 5000; static final private int baseThreadNum = 10002; - private String name; + private String workerName; private JobConf mrJob; // the MR job for compaction /** @@ -83,7 +90,7 @@ public void run() { // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { - final CompactionInfo ci = txnHandler.findNextToCompact(name); + final CompactionInfo ci = txnHandler.findNextToCompact(workerName); LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { @@ -144,14 +151,6 @@ public void run() { // Compaction doesn't work under a transaction and hence pass 0 for current txn Id // The response will have one entry per table and hence we get only one OpenWriteIds String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - final ValidWriteIdList tblValidWriteIds = - TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); - txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark()); - final StringBuilder jobName = new StringBuilder(name); - jobName.append("-compactor-"); - jobName.append(ci.getFullPartitionName()); // Determine who to run as String runAs; @@ -162,9 +161,58 @@ public void run() { runAs = ci.runAs; } - LOG.info("Starting " + ci.type.toString() + " compaction for " + - ci.getFullPartitionName()); + /** + * We need a transaction. could call txnHandler directly but then we'd have to set up a hearbeat + * but using {@link HiveTxnManager} creates a Thrift connection to the HMS + * will this cause security checks that could fail? + * on the other hand we run SQL via Driver which certainly uses {@link HiveTxnManager} + final HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + * openTxn requires Context() which is set up based on query parse/plan.... + long txnid = txnMgr.openTxn(null, null); + */ + OpenTxnRequest otReq = new OpenTxnRequest(1, runAs, hostname()); + otReq.setAgentInfo(getName());//ThreadName + long compactorTxnId = txnHandler.openTxns(otReq).getTxn_ids().get(0); + //todo: now we can update compaction_queue entry with this id + //also make sure to write to TXN_COMPONENTS so that if txn aborts, we don't delete the metadata about it from TXNS!!!! + + HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); + heartbeatRequest.setTxnid(compactorTxnId); + heartbeatRequest.setLockid(0); + /** + * todo: now set up a thread to do the heartbeat + */ + txnHandler.heartbeat(heartbeatRequest); + + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + //with this ValidWriteIdList is capped at whatever HWM validTxnList has + rqst.setValidTxnList(validTxnList.writeToString()); + final ValidCompactorWriteIdList tblValidWriteIds = + TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + /** + * we could set it in 'conf' but then the queries that compactor runs on the Driver see it and think + * that a txn is already opened (normally it's open txn then lock the snapshot) + * Changing this logic in the driver won't help since some statements compactor runs are create + * table which cannot be used in a multi stmt txn anyway. Also, locking a snapshot before opening txn + * doesn't make much sense - the snapshot should be capped at the current txn. + * So pass it in ValidWriteIdList and use it from compactor code which from here is pass by ref + * Alternatively, set it on conf here, but clone conf and remove it before calling Driver... + * Both are ugly. + * + * it's better to set it in conf and clear before calling Driver - this way every call to getAcidState() will see it + * whether set by Driver or here + */ + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + + //todo: this is a RDBMS call - so is setRunAs() above - could combine into 1 + txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark()); + final StringBuilder jobName = new StringBuilder(workerName); + jobName.append("-compactor-"); + jobName.append(ci.getFullPartitionName()); + LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName()); final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf, runJobAsSelf(runAs) ? runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); @@ -191,6 +239,7 @@ public Object run() throws Exception { } } txnHandler.markCompacted(ci); + txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } @@ -198,9 +247,10 @@ public Object run() throws Exception { LOG.error("Caught exception while trying to compact " + ci + ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); txnHandler.markFailed(ci); + txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId)); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " + + LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + StringUtils.stringifyException(t)); } @@ -223,7 +273,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException StringBuilder name = new StringBuilder(hostname()); name.append("-"); name.append(getId()); - this.name = name.toString(); + this.workerName = name.toString(); setName(name.toString()); } @@ -250,7 +300,9 @@ public static StatsUpdater init(CompactionInfo ci, List columnListForSta private StatsUpdater(CompactionInfo ci, List columnListForStats, HiveConf conf, String userName) { - this.conf = conf; + this.conf = new HiveConf(conf); + //so that Driver doesn't think it's arleady in a transaction + this.conf.unset(ValidTxnList.VALID_TXNS_KEY); this.userName = userName; this.ci = ci; if (!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) { @@ -298,6 +350,8 @@ void gatherStats() { sb.setLength(sb.length() - 1); //remove trailing , LOG.info(ci + ": running '" + sb.toString() + "'"); conf.setVar(HiveConf.ConfVars.METASTOREURIS,""); + + //todo: use DriverUtils.runOnDriver() here Driver d = new Driver(new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build(), userName); SessionState localSession = null; try { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 057fd7704c..0f5ee1054d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.hadoop.mapred.JobConf; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; @@ -246,14 +247,7 @@ private void testOrcPPD(boolean enablePPD) throws Exception { List rs0 = runStatementOnDriver(query); Assert.assertEquals("Read failed", 0, rs0.size()); runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'"); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - t.init(stop, looped); - t.run(); + runWorker(hiveConf); //now we have base_0001 file int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); @@ -293,14 +287,7 @@ public void testAlterTable() throws Exception { int[][] tableData = {{1,2}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - t.init(stop, looped); - t.run(); + runWorker(hiveConf); int[][] tableData2 = {{5,6}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b"); @@ -1338,18 +1325,11 @@ public void testCompactWithDelete() throws Exception { int[][] tableData = {{1,2},{3,4}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - t.init(stop, looped); - t.run(); + runWorker(hiveConf); runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2"); runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'"); - t.run(); + runWorker(hiveConf); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 833e63745f..5dfdab3efa 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -1,22 +1,26 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -25,13 +29,19 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.orc.TestVectorizedOrcAcidRowBatchReader; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.util.HashSet; import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager; public class TestTxnCommands3 extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands3.class); @@ -283,4 +293,117 @@ private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketin "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00000"}}; checkResult(expected2, testQuery, isVectorized, "after update", LOG); } + @Test + public void testCleaner2() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + dropTable(new String[] {"T"}); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2 + + Driver driver2 = new Driver(new QueryState.Builder().withHiveConf(hiveConf).build(), null); + driver2.setMaxRows(10000); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + HiveTxnManager txnMgr1 = swapTxnManager(txnMgr2); + Driver driver1 = swapDrivers(driver2); + runStatementOnDriver("start transaction");//T3 + /* this select sees + target/warehouse/t/ + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", + "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4", + "t/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + txnMgr2 = swapTxnManager(txnMgr1); + driver2 = swapDrivers(driver1); + runStatementOnDriver("alter table T compact 'minor'");//T4 + TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2 & delete_delta_1_2 + /* Now we should have + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + FileSystem fs = FileSystem.get(hiveConf); + Path warehousePath = new Path(getWarehouseDir()); + FileStatus[] actualList = fs.listStatus(new Path(warehousePath + "/t"), + FileUtils.HIDDEN_FILES_PATH_FILTER); + + String[] expectedList = new String[] { + "/t/delete_delta_0000001_0000002", + "/t/delta_0000001_0000002", + "/t/delta_0000001_0000001_0000", + "/t/delta_0000002_0000002_0000", + }; + checkExpectedFiles(actualList, expectedList, warehousePath.toString()); + /* Now the state should be + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + The cleaner should not delete delta_2_2 since the txn running 'select * from T' may still be + reading this file (assuming this select statement is still running)*/ + + TestTxnCommands2.runCleaner(hiveConf); + expectedList = new String[] { + "/t/delete_delta_0000001_0000002", + "/t/delta_0000001_0000002", + "/t/delta_0000002_0000002_0000", + }; + actualList = fs.listStatus(new Path(warehousePath + "/t"), + FileUtils.HIDDEN_FILES_PATH_FILTER); + checkExpectedFiles(actualList, expectedList, warehousePath.toString()); + //ok, so this doesn't work - d2 is removed - this actually makes sense since if delta_1_2 is available the assumption is that + //it should be used in favor of delta_1_1 delta_2_2 + //relying on state of LM won't help here either: + /*1. start txn do a select * from T as above + * now the txn does the same select * again and while it's running compactor makes d_1_2 and cleaner runs while the 2nd select * is reading d2_2 + * The new locks will tell the cleaner that it's ok to clean.... + * One would argue it should then not get new locks if it already has the same type of lock on this resource.... + * */ + + + + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } + private static void checkExpectedFiles(FileStatus[] actualList, String[] expectedList, String filePrefix) throws Exception { + Set expectedSet = new HashSet<>(); + Set unexpectedSet = new HashSet<>(); + for(String f : expectedList) { + expectedSet.add(f); + } + for(FileStatus fs : actualList) { + String endOfPath = fs.getPath().toString().substring(fs.getPath().toString().indexOf(filePrefix) + filePrefix.length()); + if(!expectedSet.remove(endOfPath)) { + unexpectedSet.add(endOfPath); + } + } + Assert.assertTrue("not found set: " + expectedSet + " unexpected set: " + unexpectedSet, expectedSet.isEmpty() && unexpectedSet.isEmpty()); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index dab0d982c9..c52de89f12 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -254,4 +254,9 @@ void dropTable(String[] tabs) throws Exception { d.run("drop table if exists " + tab); } } + Driver swapDrivers(Driver otherDriver) { + Driver tmp = d; + d = otherDriver; + return tmp; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 77fe73687a..1990899128 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.TestTxnCommands2; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -87,7 +88,7 @@ private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; - private Driver driver; + private Driver driver, driver2; private TxnStore txnHandler; public TestDbTxnManager2() throws Exception { @@ -103,6 +104,7 @@ public void setUp() throws Exception { SessionState.start(conf); ctx = new Context(conf); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build(), null); + driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build(), null); TxnDbUtil.cleanDb(conf); TxnDbUtil.prepDb(conf); SessionState ss = SessionState.get(); @@ -115,6 +117,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { driver.close(); + driver2.close(); if (txnMgr != null) { txnMgr.closeTxnManager(); } @@ -548,10 +551,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); @@ -561,10 +564,10 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + TestTxnCommands2.runWorker(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + TestTxnCommands2.runCleaner(conf); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); @@ -576,7 +579,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); @@ -586,7 +589,7 @@ public void testMetastoreTablesCleanup() throws Exception { checkCmdOnDriver(cpr); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(1, count); - org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + TestTxnCommands2.runWorker(conf); // will fail count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); Assert.assertEquals(0, count); count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); @@ -824,7 +827,7 @@ public static ShowLocksResponseElement checkLock(LockType expectedType, LockStat * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us * to write good tests) */ - private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { return SessionState.get().setTxnMgr(txnMgr); } @Test @@ -2556,4 +2559,122 @@ public void testTruncate() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test + public void testCleaner() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) stored as" + + " orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + checkCmdOnDriver(driver.run("start transaction")); + checkCmdOnDriver(driver.run("insert into T values(0,2),(1,4)")); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver2.run("start transaction")); + checkCmdOnDriver(driver2.run("select * from T")); + + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("commit")); + checkCmdOnDriver(driver.run("alter table T compact 'minor'")); + TestTxnCommands2.runWorker(conf); + + //so now we should have d1,d2,d1_2 + /* Now we should have d1_d2 from minor compaction and d1 from 1st txn by 'driver' and + d2 from 2nd txn by 'driver' - both committed at this point. + At this point, txn doint 'select * from T' from 'driver2' is still open and it sees the txn + that created d2 as open, so cleaner should not remove d2 + [ + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + TestTxnCommands2.runCleaner(conf); +/* +target/warehouse/t/ +├── delete_delta_0000001_0000002 +│   ├── _orc_acid_version +│   └── bucket_00000 +├── delta_0000001_0000002 +│   ├── _orc_acid_version +│   └── bucket_00000 +└── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + System.currentTimeMillis(); + //todo: the state is correct - add checks that the right files are there + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } + @Test + public void testCleaner2() throws Exception { + dropTable(new String[] {"T"}); + CommandProcessorResponse cpr = driver.run("create table T (a int, b int) stored as" + + " orc tblproperties('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T values(0,2)"));//makes delta_1_1 in T1 + checkCmdOnDriver(driver.run("insert into T values(1,4)"));//makes delta_2_2 in T2 + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver2.run("start transaction"));//T3 + checkCmdOnDriver(driver2.run("select INPUT__FILE__NAME from T")); + List rs = new ArrayList<>(); + driver2.getResults(rs); + /* this select sees + target/warehouse/t/ + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000*/ + swapTxnManager(txnMgr); + checkCmdOnDriver(driver.run("alter table T compact 'minor'"));//T4 + TestTxnCommands2.runWorker(conf);//makes delta_1_2 & delete_delta_1_2 + /* Now we should have + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000001_0000 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + */ + TestTxnCommands2.runCleaner(conf); + /* Now the state should be + target/warehouse/t/ + ├── delete_delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + ├── delta_0000001_0000002 + │   ├── _orc_acid_version + │   └── bucket_00000 + └── delta_0000002_0000002_0000 + ├── _orc_acid_version + └── bucket_00000 + The cleaner should not delete delta_2_2 since the txn running 'select * from T' may still be + reading this file (assuming this select statement is still running)*/ + System.currentTimeMillis(); + //todo: the state is correct - add checks that the right files are there + + //alternatively, make the 2nd txn be a delete and verify that the reader doesn't see it in which case you'd have to do a major compaction + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ce574b4ac4..467851a590 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -18,34 +18,20 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.DataOperationType; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * Tests for the compactor Cleaner thread @@ -197,231 +183,6 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { Assert.assertTrue(sawDelta); } - @Test - public void blockedByLockTable() throws Exception { - Table t = newTable("default", "bblt", false); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblt", compacts.get(0).getTablename()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void blockedByLockPartition() throws Exception { - Table t = newTable("default", "bblp", true); - Partition p = newPartition(t, "today"); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblp", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblp", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); - comp.setTablename("bblp"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.DELETE); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania")); - req.setTxnid(resp.getTxn_ids().get(0)); - LockResponse res = txnHandler.lock(req); - - startCleaner(); - - // Check there are no compactions requests left. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - Assert.assertEquals("bblp", compacts.get(0).getTablename()); - Assert.assertEquals("ds=today", compacts.get(0).getPartitionname()); - Assert.assertEquals(CompactionType.MINOR, compacts.get(0).getType()); - } - - @Test - public void notBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", false); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp.setTablename("bblt"); - comp.setOperationType(DataOperationType.INSERT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); - comp2.setTablename("bblt"); - comp.setOperationType(DataOperationType.SELECT); - List components2 = new ArrayList(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void partitionNotBlockedBySubsequentLock() throws Exception { - Table t = newTable("default", "bblt", true); - Partition p = newPartition(t, "today"); - - // Set the run frequency low on this test so it doesn't take long - conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 100, - TimeUnit.MILLISECONDS); - - addBaseFile(t, p, 20L, 20); - addDeltaFile(t, p, 21L, 22L, 2); - addDeltaFile(t, p, 23L, 24L, 2); - addDeltaFile(t, p, 21L, 24L, 4); - - burnThroughTransactions("default", "bblt", 25); - - CompactionRequest rqst = new CompactionRequest("default", "bblt", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - txnHandler.markCompacted(ci); - txnHandler.setRunAs(ci.id, System.getProperty("user.name")); - - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp.setTablename("bblt"); - comp.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.INSERT); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - AtomicBoolean looped = new AtomicBoolean(); - looped.set(false); - startCleaner(looped); - - // Make sure the compactor has a chance to run once - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - - // There should still be one request, as the locks still held. - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - - - // obtain a second lock. This shouldn't block cleaner as it was acquired after the initial - // clean request - LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); - comp2.setTablename("bblt"); - comp2.setPartitionname("ds=today"); - comp.setOperationType(DataOperationType.SELECT); - List components2 = new ArrayList(1); - components2.add(comp2); - LockRequest req2 = new LockRequest(components, "me", "localhost"); - LockResponse res2 = txnHandler.lock(req2); - - // Unlock the previous lock - txnHandler.unlock(new UnlockRequest(res.getLockid())); - looped.set(false); - - while (!looped.get()) { - Thread.currentThread().sleep(100); - } - stopThread(); - Thread.currentThread().sleep(200); - - - // Check there are no compactions requests left. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - @Test public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { Table t = newTable("default", "campcnb", true); diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java index 94cb18dccc..c61a997612 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnCommonUtils.java @@ -41,6 +41,7 @@ * @return a valid txn list. */ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + assert currentTxn <= txns.getTxn_high_water_mark(); /* * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cbb76d51d6..86bd4b965d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.metastore.txn; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -324,6 +323,54 @@ public void markCompacted(CompactionInfo info) throws MetaException { return findReadyToClean(); } } + @Override + public long findMinOpenTxnGLB() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + return findMinOpenTxnGLB(stmt); + } catch (SQLException e) { + LOG.error("Unable to findMinOpenTxnGLB() due to:" + e.getMessage()); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findMinOpenTxnGLB"); + throw new MetaException("Unable to execute findMinOpenTxnGLB() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinOpenTxnGLB(); + } + } + + /** + * See doc at {@link TxnStore#findMinOpenTxnGLB()} + */ + private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { + String s = "select ntxn_next from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + rs.next(); + long minOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + return hwm; + } + //since generating new txnid uses select for update on single row in NEXT_TXN_ID + assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")"; + return minOpenTxnId; + } /** * This will remove an entry from the queue after @@ -480,7 +527,6 @@ public void markCleaned(CompactionInfo info) throws MetaException { markCleaned(info); } } - /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). @@ -502,42 +548,11 @@ public void cleanTxnToWriteIdTable() throws MetaException { // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. // If there are no txns which are currently open or aborted in the system, then current value of // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. - String s = "select ntxn_next from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long minUncommittedTxnId = rs.getLong(1); - - // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table - // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next. - s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (rs.next()) { - long minOpenTxnId = rs.getLong(1); - if (minOpenTxnId > 0) { - minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId); - } - } - - // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid - // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (rs.next()) { - long minAbortedTxnId = rs.getLong(1); - if (minAbortedTxnId > 0) { - minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId); - } - } + long minUncommittedTxnId = findMinOpenTxnGLB(stmt); // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; + String s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 22ce007278..4af0473fb4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -356,6 +356,16 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.ReadOnly List findReadyToClean() throws MetaException; + /** + * Returns the smallest txnid that could be seen in open state across all active transactions in + * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active transactions, i.e. greatest + * lower bound of open transaction IDs. Even if a transaction is opened concurrently with this + * call it cannot have an id less than what this method returns. + * @return transaction ID + */ + @RetrySemantics.ReadOnly + long findMinOpenTxnGLB() throws MetaException; + /** * This will remove an entry from the queue after * it has been compacted. diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 3bb1f0c62c..b11d1c43d6 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -83,17 +83,6 @@ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValid } } - public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { - // This is based on the existing valid write ID list that was built for a select query; - // therefore we assume all the aborted txns, etc. were already accounted for. - // All we do is adjust the high watermark to only include contiguous txns. - Long minOpenWriteId = ids.getMinOpenWriteId(); - if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { - return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); - } - return ids; - } - /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index 2c9d98b5fd..2a70ec3e55 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -186,63 +186,6 @@ public void testTxNWithKeyWrongPrefix() throws Exception { Assert.assertTrue(validTxns.isTxnValid(1)); } - @Test - public void testTxnRange() throws Exception { - ValidTxnList validTxns = client.getValidTxns(); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 3L)); - List tids = client.openTxns("me", 5).getTxn_ids(); - - HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5); - Assert.assertEquals(0, rsp.getNosuch().size()); - Assert.assertEquals(0, rsp.getAborted().size()); - - client.rollbackTxn(1L); - client.commitTxn(2L); - client.commitTxn(3L); - client.commitTxn(4L); - validTxns = client.getValidTxns(); - System.out.println("validTxns = " + validTxns); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 3L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(2L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(3L, 4L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 4L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(2L, 5L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1L, 2L)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4L, 5L)); - - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(1L, 1L)); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(5L, 10L)); - - validTxns = new ValidReadTxnList("10:5:4,5,6:"); - Assert.assertEquals(ValidTxnList.RangeResponse.NONE, - validTxns.isTxnRangeValid(4,6)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(7, 10)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(7, 11)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(3, 6)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(4, 7)); - Assert.assertEquals(ValidTxnList.RangeResponse.SOME, - validTxns.isTxnRangeValid(1, 12)); - Assert.assertEquals(ValidTxnList.RangeResponse.ALL, - validTxns.isTxnRangeValid(1, 3)); - } - @Test public void testLocks() throws Exception { LockRequestBuilder rqstBuilder = new LockRequestBuilder(); diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index b8ff03f9c4..856cc3256f 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -26,7 +26,7 @@ * This class will view a transaction as valid only if it is committed. Both open and aborted * transactions will be seen as invalid. */ -public class ValidReadTxnList implements ValidTxnList { +public final class ValidReadTxnList implements ValidTxnList { protected long[] exceptions; protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open @@ -62,33 +62,6 @@ public boolean isTxnValid(long txnid) { return Arrays.binarySearch(exceptions, txnid) < 0; } - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - // check the easy cases first - if (minTxnId > highWatermark) { - return RangeResponse.NONE; - } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) { - return RangeResponse.ALL; - } - - // since the exceptions and the range in question overlap, count the - // exceptions in the range - long count = Math.max(0, maxTxnId - highWatermark); - for(long txn: exceptions) { - if (minTxnId <= txn && txn <= maxTxnId) { - count += 1; - } - } - - if (count == 0) { - return RangeResponse.ALL; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.NONE; - } else { - return RangeResponse.SOME; - } - } - @Override public String toString() { return writeToString(); @@ -191,34 +164,5 @@ public boolean isTxnAborted(long txnid) { int index = Arrays.binarySearch(exceptions, txnid); return index >= 0 && abortedBits.get(index); } - - @Override - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { - // check the easy cases first - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } - - int count = 0; // number of aborted txns found in exceptions - - // traverse the aborted txns list, starting at first aborted txn index - for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) { - long abortedTxnId = exceptions[i]; - if (abortedTxnId > maxTxnId) { // we've already gone beyond the specified range - break; - } - if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) { - count++; - } - } - - if (count == 0) { - return RangeResponse.NONE; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.ALL; - } else { - return RangeResponse.SOME; - } - } } diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java index 95a0b56e39..bc8ac0d61b 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java @@ -35,6 +35,12 @@ private long minOpenWriteId = Long.MAX_VALUE; protected long highWatermark; + /** + * This seems like a bad c'tor. It doesn't even have a table name in it and it's used every time + * ValidWriteIdList.VALID_WRITEIDS_KEY is not found in Configuration. + * But, if anything, that would indicate a bug if was done for an acid read since it + * considers everything valid - this should not be assumed. + */ public ValidReaderWriteIdList() { this(null, new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); } diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index d4c3b09730..919a75ede0 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -46,16 +46,6 @@ */ public boolean isTxnValid(long txnid); - /** - * Find out if a range of transaction ids are valid. Note that valid may have different meanings - * for different implementations, as some will only want to see committed transactions and some - * both committed and aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are valid. - */ - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId); - /** * Write this validTxnList into a string. This should produce a string that * can be used by {@link #readFromString(String)} to populate a validTxnsList. @@ -90,14 +80,6 @@ */ public boolean isTxnAborted(long txnid); - /** - * Find out if a range of transaction ids are aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are aborted. - */ - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId); - /** * Returns smallest Open transaction in this set, {@code null} if there is none. */