diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cbd2b8d175..fc53ed397e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2751,6 +2751,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "has had a transaction done on it since the last major compaction. So decreasing this\n" + "value will increase the load on the NameNode."), + HIVE_COMPACTOR_REQUEST_QUEUE("hive.compactor.request.queue", 1, + "Enables parallelization of the checkForCompaction operation, that includes many file metadata checks\n" + + "and may be expensive"), + HIVE_COMPACTOR_DELTA_NUM_THRESHOLD("hive.compactor.delta.num.threshold", 10, "Number of delta directories in a table or partition that will trigger a minor\n" + "compaction."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 6017fd31b1..dedc990d0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -50,11 +53,15 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -68,7 +75,7 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; - private Map tblNameOwnersCache = new HashMap<>(); + private Executor compactionExecutor; private long checkInterval; private long prevStart = -1; @@ -101,72 +108,43 @@ public void run() { //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval) - .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet()); + .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); - for (CompactionInfo ci : potentials) { - LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); - try { - Table t = resolveTable(ci); + Map tblNameOwners = new HashMap<>(); + List compactionList = new ArrayList<>(); - // Check if we already have initiated or are working on a compaction for this partition - // or table. If so, skip it. If we are just waiting on cleaning we can still check, - // as it may be time to compact again even though we haven't cleaned. - //todo: this is not robust. You can easily run Alter Table to start a compaction between - //the time currentCompactions is generated and now - if (lookForCurrentCompactions(currentCompactions, ci)) { - LOG.debug("Found currently initiated or working compaction for " + - ci.getFullPartitionName() + " so we will not initiate another compaction"); - continue; - } - if(txnHandler.checkFailedCompactions(ci)) { - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " - + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); - txnHandler.markFailed(ci); - continue; - } + if (!potentials.isEmpty()) { + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( + txnHandler.getOpenTxns(), 0); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + } - // Figure out who we should run the file operations as + for (CompactionInfo ci : potentials) { + try { + Table t = resolveTable(ci); Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + ", assuming it has been dropped and moving on."); continue; } - ValidTxnList validTxnList = TxnCommonUtils - .createValidReadTxnList(txnHandler.getOpenTxns(), 0); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - // The response will have one entry per table and hence we get only one ValidWriteIdList - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest rqst - = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - rqst.setValidTxnList(validTxnList.writeToString()); - final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( - txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); - - StorageDescriptor sd = resolveStorageDescriptor(t, p); - String runAs = tblNameOwnersCache.get(fullTableName); - if (runAs == null) { - LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " + - "will determine user based on table location"); - runAs = findUserToRunAs(sd.getLocation(), t); - tblNameOwnersCache.put(fullTableName, runAs); - } - /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. - * Long term we should consider having a thread pool here and running checkForCompactionS - * in parallel*/ - CompactionType compactionNeeded - = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); - if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); + String runAs = resolveUserToRunAs(tblNameOwners, t, p); + /* checkForCompaction includes many file metadata checks and may be expensive. + * Therefore, using a thread pool here and running checkForCompactions in parallel */ + compactionList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> + scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor)); } catch (Throwable t) { - LOG.error("Caught exception while trying to determine if we should compact " + - ci + ". Marking failed to avoid repeated failures, " + - "" + StringUtils.stringifyException(t)); + LOG.error("Caught exception while trying to determine if we should compact {}. " + + "Marking failed to avoid repeated failures, {}", ci, t); txnHandler.markFailed(ci); } } + CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])) + .join(); // Check for timed out remote workers. recoverFailedCompactions(true); @@ -187,8 +165,9 @@ public void run() { } long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= checkInterval || stop.get()) continue; - else Thread.sleep(checkInterval - elapsedTime); + if (elapsedTime < checkInterval && !stop.get()) { + Thread.sleep(checkInterval - elapsedTime); + } } while (!stop.get()); } catch (Throwable t) { @@ -197,12 +176,62 @@ public void run() { } } + private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs) + throws MetaException { + StorageDescriptor sd = resolveStorageDescriptor(t, p); + try { + ValidWriteIdList validWriteIds = resolveValidWriteIds(t); + CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs); + if (type != null) { + requestCompaction(ci, runAs, type); + } + } catch (Throwable ex) { + LOG.error("Caught exception while trying to determine if we should compact {}. " + + "Marking failed to avoid repeated failures, {}", ci, ex); + txnHandler.markFailed(ci); + } + } + + private ValidWriteIdList resolveValidWriteIds(Table t) throws NoSuchTxnException, MetaException { + ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY)); + // The response will have one entry per table and hence we get only one ValidWriteIdList + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + rqst.setValidTxnList(validTxnList.writeToString()); + + return TxnUtils.createValidCompactWriteIdList( + txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + } + + private String resolveUserToRunAs(Map cache, Table t, Partition p) + throws IOException, InterruptedException { + //Figure out who we should run the file operations as + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + StorageDescriptor sd = resolveStorageDescriptor(t, p); + + cache.putIfAbsent(fullTableName, findUserToRunAs(sd.getLocation(), t)); + return cache.get(fullTableName); + } + + private interface ThrowingRunnable { + void run() throws E; + + static Runnable unchecked(ThrowingRunnable r) { + return () -> { + try { + r.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + } @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - checkInterval = - conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; + checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + compactionExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE)); } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { @@ -393,13 +422,25 @@ private static boolean checkDynPartitioning(Table t, CompactionInfo ci){ return false; } - private boolean checkCompactionElig(CompactionInfo ci){ - Table t = null; + private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse currentCompactions) { + LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); + + // Check if we already have initiated or are working on a compaction for this partition + // or table. If so, skip it. If we are just waiting on cleaning we can still check, + // as it may be time to compact again even though we haven't cleaned. + // todo: this is not robust. You can easily run `alter table` to start a compaction between + // the time currentCompactions is generated and now + if (lookForCurrentCompactions(currentCompactions, ci)) { + LOG.debug("Found currently initiated or working compaction for " + + ci.getFullPartitionName() + " so we will not initiate another compaction"); + return false; + } + try { - t = resolveTable(ci); + Table t = resolveTable(ci); if (t == null) { LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); + "table or has been dropped and moving on."); return false; } @@ -409,17 +450,21 @@ private boolean checkCompactionElig(CompactionInfo ci){ if (noAutoCompactSet(t)) { LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + - "=true so we will not compact it."); - return false; - } else if (replIsCompactionDisabledForTable(t)) { + "=true so we will not compact it."); return false; - } else if (checkDynPartitioning(t, ci)) { + } else if (replIsCompactionDisabledForTable(t) || checkDynPartitioning(t, ci)) { return false; } + if (txnHandler.checkFailedCompactions(ci)) { + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); + txnHandler.markFailed(ci); + return false; + } } catch (Throwable e) { - LOG.error("Caught Exception while checking compactiton eligibility " + - StringUtils.stringifyException(e)); + LOG.error("Caught exception while checking compaction eligibility " + + StringUtils.stringifyException(e)); } return true; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 564839324f..1151466f8c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockType; @@ -769,6 +770,44 @@ public void dropPartition() throws Exception { List compacts = rsp.getCompacts(); Assert.assertEquals(0, compacts.size()); } + + @Test + public void processCompactionCandidatesInParallel() throws Exception { + Table t = newTable("default", "dp", true); + List components = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Partition p = newPartition(t, "part" + (i + 1)); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("dp"); + comp.setPartitionname("ds=part" + (i + 1)); + comp.setOperationType(DataOperationType.UPDATE); + components.add(comp); + } + burnThroughTransactions("default", "dp", 23); + long txnid = openTxn(); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + Assert.assertEquals(LockState.ACQUIRED, res.getState()); + + long writeid = allocateWriteId("default", "dp", txnid); + Assert.assertEquals(24, writeid); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 3); + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(10, compacts.size()); + } + @Override boolean useHive130DeltaDirName() { return false;