diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4393a2825e..0df56d1efe 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2748,6 +2748,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "has had a transaction done on it since the last major compaction. So decreasing this\n" + "value will increase the load on the NameNode."), + HIVE_COMPACTOR_REQUEST_QUEUE("hive.compactor.request.queue", 1, + "Enables parallelization of the checkForCompaction operation, that includes many file metadata checks\n" + + "and may be expensive"), + HIVE_COMPACTOR_DELTA_NUM_THRESHOLD("hive.compactor.delta.num.threshold", 10, "Number of delta directories in a table or partition that will trigger a minor\n" + "compaction."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7a0e32463d..0a2bb2312e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -51,10 +52,14 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Collections; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -68,7 +73,7 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; - private Map tblNameOwnersCache = new HashMap<>(); + private Executor compactionExecutor; private long checkInterval; private long prevStart = -1; @@ -101,78 +106,37 @@ public void run() { //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval) - .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet()); + .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + - "checking to see if we should compact any of them"); + "checking to see if we should compact any of them"); + + Map tblNameOwners = new ConcurrentHashMap<>(); + List compactions = new ArrayList<>(); + for (CompactionInfo ci : potentials) { - // Disable minor compaction for query based compactor - if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { - LOG.debug("Not compacting: " + ci.getFullPartitionName() - + ", as query based compaction currently does not " + "support minor compactions."); - continue; - } - LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { - Table t = resolveTable(ci); - - // Check if we already have initiated or are working on a compaction for this partition - // or table. If so, skip it. If we are just waiting on cleaning we can still check, - // as it may be time to compact again even though we haven't cleaned. - //todo: this is not robust. You can easily run Alter Table to start a compaction between - //the time currentCompactions is generated and now - if (lookForCurrentCompactions(currentCompactions, ci)) { - LOG.debug("Found currently initiated or working compaction for " + - ci.getFullPartitionName() + " so we will not initiate another compaction"); - continue; - } - if(txnHandler.checkFailedCompactions(ci)) { - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " - + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); - txnHandler.markFailed(ci); - continue; - } - - // Figure out who we should run the file operations as Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + - ", assuming it has been dropped and moving on."); - continue; + ", assuming it has been dropped and moving on."); + return; } - ValidTxnList validTxnList = TxnCommonUtils - .createValidReadTxnList(txnHandler.getOpenTxns(), 0); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - // The response will have one entry per table and hence we get only one ValidWriteIdList - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest rqst - = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - rqst.setValidTxnList(validTxnList.writeToString()); - final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( - txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); - - StorageDescriptor sd = resolveStorageDescriptor(t, p); - String runAs = tblNameOwnersCache.get(fullTableName); - if (runAs == null) { - LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " + - "will determine user based on table location"); - runAs = findUserToRunAs(sd.getLocation(), t); - tblNameOwnersCache.put(fullTableName, runAs); - } - /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. - * Long term we should consider having a thread pool here and running checkForCompactionS - * in parallel*/ - CompactionType compactionNeeded - = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); - if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); + compactions.add(CompletableFuture.runAsync( + ThrowingRunnable.unchecked(() -> + /* checkForCompaction includes many file metadata checks and may be expensive. + * Therefore, using a thread pool here and running checkForCompactions in parallel */ + scheduleCompactionIfRequired(ci, t, p, tblNameOwners)), compactionExecutor)); } catch (Throwable t) { - LOG.error("Caught exception while trying to determine if we should compact " + - ci + ". Marking failed to avoid repeated failures, " + - "" + StringUtils.stringifyException(t)); + LOG.error("Caught exception while trying to determine if we should compact {}. " + + "Marking failed to avoid repeated failures, {}", ci, t); txnHandler.markFailed(ci); } } + CompletableFuture.allOf(compactions.toArray(new CompletableFuture[0])) + .join(); // Check for timed out remote workers. recoverFailedCompactions(true); @@ -193,8 +157,9 @@ public void run() { } long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= checkInterval || stop.get()) continue; - else Thread.sleep(checkInterval - elapsedTime); + if (elapsedTime < checkInterval && !stop.get()) { + Thread.sleep(checkInterval - elapsedTime); + } } while (!stop.get()); } catch (Throwable t) { @@ -203,12 +168,56 @@ public void run() { } } + private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, + Map tblNameOwners) throws MetaException { + try { + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( + txnHandler.getOpenTxns(), 0); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + + // The response will have one entry per table and hence we get only one ValidWriteIdList + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + rqst.setValidTxnList(validTxnList.writeToString()); + + final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( + txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); + + // Figure out who we should run the file operations as + StorageDescriptor sd = resolveStorageDescriptor(t, p); + tblNameOwners.putIfAbsent(fullTableName, findUserToRunAs(sd.getLocation(), t)); + String runAs = tblNameOwners.get(fullTableName); + + CompactionType ctype = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); + if (ctype != null) { + requestCompaction(ci, runAs, ctype); + } + } catch (Throwable ex) { + LOG.error("Caught exception while trying to determine if we should compact {}. " + + "Marking failed to avoid repeated failures, {}", ci, ex); + txnHandler.markFailed(ci); + } + } + + private interface ThrowingRunnable { + void run() throws E; + + static Runnable unchecked(ThrowingRunnable r) { + return () -> { + try { + r.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + } @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - checkInterval = - conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; + checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + compactionExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE)); } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { @@ -395,13 +404,31 @@ private static boolean checkDynPartitioning(Table t, CompactionInfo ci){ return false; } - private boolean checkCompactionElig(CompactionInfo ci){ - Table t = null; + private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse compactions) { + // Disable minor compaction for query based compactor + if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { + LOG.debug("Not compacting: " + ci.getFullPartitionName() + + ", as query based compaction currently does not " + "support minor compactions."); + return false; + } + LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); + + // Check if we already have initiated or are working on a compaction for this partition + // or table. If so, skip it. If we are just waiting on cleaning we can still check, + // as it may be time to compact again even though we haven't cleaned. + // todo: this is not robust. You can easily run `alter table` to start a compaction between + // the time currentCompactions is generated and now + if (lookForCurrentCompactions(compactions, ci)) { + LOG.debug("Found currently initiated or working compaction for " + + ci.getFullPartitionName() + " so we will not initiate another compaction"); + return false; + } + try { - t = resolveTable(ci); + Table t = resolveTable(ci); if (t == null) { LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); + "table or has been dropped and moving on."); return false; } @@ -411,17 +438,21 @@ private boolean checkCompactionElig(CompactionInfo ci){ if (noAutoCompactSet(t)) { LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + - "=true so we will not compact it."); + "=true so we will not compact it."); return false; - } else if (replIsCompactionDisabledForTable(t)) { - return false; - } else if (checkDynPartitioning(t, ci)) { + } else if (replIsCompactionDisabledForTable(t) || checkDynPartitioning(t, ci)) { return false; } + if (txnHandler.checkFailedCompactions(ci)) { + LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); + txnHandler.markFailed(ci); + return false; + } } catch (Throwable e) { - LOG.error("Caught Exception while checking compactiton eligibility " + - StringUtils.stringifyException(e)); + LOG.error("Caught exception while checking compaction eligibility " + + StringUtils.stringifyException(e)); } return true; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 564839324f..1151466f8c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockType; @@ -769,6 +770,44 @@ public void dropPartition() throws Exception { List compacts = rsp.getCompacts(); Assert.assertEquals(0, compacts.size()); } + + @Test + public void processCompactionCandidatesInParallel() throws Exception { + Table t = newTable("default", "dp", true); + List components = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Partition p = newPartition(t, "part" + (i + 1)); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("dp"); + comp.setPartitionname("ds=part" + (i + 1)); + comp.setOperationType(DataOperationType.UPDATE); + components.add(comp); + } + burnThroughTransactions("default", "dp", 23); + long txnid = openTxn(); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + Assert.assertEquals(LockState.ACQUIRED, res.getState()); + + long writeid = allocateWriteId("default", "dp", txnid); + Assert.assertEquals(24, writeid); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 3); + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(10, compacts.size()); + } + @Override boolean useHive130DeltaDirName() { return false;