diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 94f0031662..8884defe4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -224,6 +224,10 @@ public static void initializeAndStartThread(CompactorThread thread, protected boolean replIsCompactionDisabledForTable(Table tbl) { // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(tbl.getParameters()); + boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); + if (isCompactDisabled){ + LOG.debug("Compaction is disabled for table " + tbl.getTableName()); + } + return isCompactDisabled; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 5fb255272c..9f078ec863 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -51,11 +51,13 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * A class to initiate compactions. This will run in a separate thread. @@ -66,6 +68,8 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; + Map tblNameOwnersCache = new HashMap<>(); + private long checkInterval; @@ -93,7 +97,8 @@ public void run() { startedAt = System.currentTimeMillis(); //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - Set potentials = txnHandler.findPotentialCompactions(abortedThreshold); + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold) + .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { @@ -105,40 +110,8 @@ public void run() { } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { - if (replIsCompactionDisabledForDatabase(ci.dbname)) { - // Compaction is disabled for replicated database until after first successful incremental load. - LOG.info("Compaction is disabled for database " + ci.dbname); - continue; - } Table t = resolveTable(ci); - if (t == null) { - // Most likely this means it's a temp table - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); - continue; - } - - // check if no compaction set for this table - if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); - continue; - } - - if (replIsCompactionDisabledForTable(t)) { - // Compaction is disabled for replicated table until after first successful incremental load. - LOG.info("Compaction is disabled for table " + ci.getFullTableName()); - continue; - } - - // Check to see if this is a table level request on a partitioned table. If so, - // then it's a dynamic partitioning case and we shouldn't check the table itself. - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null) { - LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); - continue; - } // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, @@ -176,7 +149,13 @@ public void run() { txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); StorageDescriptor sd = resolveStorageDescriptor(t, p); - String runAs = findUserToRunAs(sd.getLocation(), t); + String runAs = tblNameOwnersCache.get(fullTableName); + if(runAs == null) { + LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " + + "will determine user based on table location"); + runAs = findUserToRunAs(sd.getLocation(), t); + tblNameOwnersCache.put(fullTableName, runAs); + } /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. * Long term we should consider having a thread pool here and running checkForCompactionS * in parallel*/ @@ -218,7 +197,8 @@ public void run() { LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException(t)); } - } +} + @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @@ -398,4 +378,44 @@ private boolean noAutoCompactSet(Table t) { } return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true"); } + + // Check to see if this is a table level request on a partitioned table. If so, + // then it's a dynamic partitioning case and we shouldn't check the table itself. + public boolean checkDynPartitioning(Table t, CompactionInfo ci){ + if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && + ci.partName == null) { + LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + + " partitioning"); + return true; + } + return false; + } + + public boolean checkCompactionElig(CompactionInfo ci){ + Table t = null; + try { + t = resolveTable(ci); + if(t == null) { + LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + + "table or has been dropped and moving on."); + return false; + } + + if(replIsCompactionDisabledForDatabase(ci.dbname)){ + return false; + } + + if(noAutoCompactSet(t)) { + return false; + } else if(replIsCompactionDisabledForTable(t)) { + return false; + }else if(checkDynPartitioning(t, ci)) { + return false; + } + + } catch (Throwable e) { + LOG.error("Caught Exception while checking compactiton eligibility "+StringUtils.stringifyException(e)); + } + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index a6dd4fa003..efa39c0e5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -78,7 +78,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { try { Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(database.getParameters()); + boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); + if(isReplCompactDisabled){ + LOG.info("Compaction is disabled for database " + dbName); + } + return isReplCompactDisabled; } catch (NoSuchObjectException e) { LOG.info("Unable to find database " + dbName); return true;