diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 94f0031662..0250c4d4e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -224,6 +224,10 @@ public static void initializeAndStartThread(CompactorThread thread, protected boolean replIsCompactionDisabledForTable(Table tbl) { // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(tbl.getParameters()); + boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); + if(isCompactDisabled){ + LOG.info("Compaction is disabled for table " + tbl.getTableName()); + } + return isCompactDisabled; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 5fb255272c..32ac239cbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +61,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * A class to initiate compactions. This will run in a separate thread. @@ -66,6 +76,8 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; + private ListeningExecutorService executor; + volatile ListenableFuture listenableFuture; private long checkInterval; @@ -75,6 +87,9 @@ public void run() { // so wrap it in a big catch Throwable statement. try { recoverFailedCompactions(false); + executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(HiveConf.getIntVar(conf, + HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("initiator-helper-thread-%d").build())); int abortedThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); @@ -93,9 +108,27 @@ public void run() { startedAt = System.currentTimeMillis(); //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - Set potentials = txnHandler.findPotentialCompactions(abortedThreshold); + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold) + .stream().filter(ci -> { + try { + return replIsCompactionDisabledForDatabase(ci.dbname); + } catch (TException te) { + LOG.info("Can't find database " + ci.dbname+" caught exception "+StringUtils.stringifyException(te)); + } + return false; + }).filter(ci ->{ + Table t = null; + try { + t = resolveTable(ci); + return t != null || noAutoCompactSet(t) || replIsCompactionDisabledForTable(t) || checkDynPartitioning(t,ci); + } catch (Exception e) { + LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + + "table or has been dropped and moving on."); + } + return false; + }).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + - "checking to see if we should compact any of them"); + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { // Disable minor compaction for query based compactor if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { @@ -105,40 +138,8 @@ public void run() { } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { - if (replIsCompactionDisabledForDatabase(ci.dbname)) { - // Compaction is disabled for replicated database until after first successful incremental load. - LOG.info("Compaction is disabled for database " + ci.dbname); - continue; - } Table t = resolveTable(ci); - if (t == null) { - // Most likely this means it's a temp table - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); - continue; - } - - // check if no compaction set for this table - if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); - continue; - } - - if (replIsCompactionDisabledForTable(t)) { - // Compaction is disabled for replicated table until after first successful incremental load. - LOG.info("Compaction is disabled for table " + ci.getFullTableName()); - continue; - } - - // Check to see if this is a table level request on a partitioned table. If so, - // then it's a dynamic partitioning case and we shouldn't check the table itself. - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null) { - LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); - continue; - } // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, @@ -177,12 +178,34 @@ public void run() { StorageDescriptor sd = resolveStorageDescriptor(t, p); String runAs = findUserToRunAs(sd.getLocation(), t); - /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. - * Long term we should consider having a thread pool here and running checkForCompactionS - * in parallel*/ - CompactionType compactionNeeded - = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); - if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); + Callable checkForCompactionTask = new Callable() { + @Override + public CompactionType call() throws Exception { + return checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); + } + }; + listenableFuture = executor.submit(checkForCompactionTask); + + Futures.addCallback(listenableFuture, new FutureCallback() { + public void onSuccess(CompactionType compactionNeeded) { + if (compactionNeeded != null) { + try { + requestCompaction(ci, runAs, compactionNeeded); + } catch (MetaException e) { + LOG.error("unable to request the compaction, caught unexpected exception "+StringUtils.stringifyException(e)); + } + } + } + + public void onFailure(Throwable e) { + LOG.error("Caught exception while trying to determine if we should compact " + ci +" logging exception"+ StringUtils.stringifyException(e)); + try { + txnHandler.markFailed(ci); + } catch (MetaException ex) { + LOG.error("unable to mark the compaction failure, caught unexpected exception: "+StringUtils.stringifyException(e)); + } + } + }); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + ci + ". Marking failed to avoid repeated failures, " + @@ -217,8 +240,14 @@ public void run() { } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException(t)); + } finally { + if(listenableFuture != null){ + listenableFuture.cancel(true); } + executor.shutdownNow(); } +} + @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @@ -398,4 +427,16 @@ private boolean noAutoCompactSet(Table t) { } return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true"); } + + // Check to see if this is a table level request on a partitioned table. If so, + // then it's a dynamic partitioning case and we shouldn't check the table itself. + public boolean checkDynPartitioning(Table t, CompactionInfo ci){ + if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && + ci.partName == null) { + LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + + " partitioning"); + return false; + } + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index a6dd4fa003..efa39c0e5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -78,7 +78,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { try { Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(database.getParameters()); + boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); + if(isReplCompactDisabled){ + LOG.info("Compaction is disabled for database " + dbName); + } + return isReplCompactDisabled; } catch (NoSuchObjectException e) { LOG.info("Unable to find database " + dbName); return true;