diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 94f0031662..0250c4d4e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -224,6 +224,10 @@ public static void initializeAndStartThread(CompactorThread thread, protected boolean replIsCompactionDisabledForTable(Table tbl) { // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(tbl.getParameters()); + boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); + if(isCompactDisabled){ + LOG.info("Compaction is disabled for table " + tbl.getTableName()); + } + return isCompactDisabled; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 5fb255272c..8e65a283fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +53,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +63,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * A class to initiate compactions. This will run in a separate thread. @@ -66,6 +78,15 @@ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; + private ListeningExecutorService executor; + volatile ListenableFuture listenableFuture; + // bounded cache to store the full table name and it's owner + private static Cache tblNameOwnersCache = CacheBuilder.newBuilder() + .initialCapacity(1024) + .maximumSize(10240) + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .weakKeys().build(); + private long checkInterval; @@ -75,6 +96,9 @@ public void run() { // so wrap it in a big catch Throwable statement. try { recoverFailedCompactions(false); + executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(HiveConf.getIntVar(conf, + HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("initiator-helper-thread-%d").build())); int abortedThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); @@ -93,7 +117,8 @@ public void run() { startedAt = System.currentTimeMillis(); //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - Set potentials = txnHandler.findPotentialCompactions(abortedThreshold); + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold) + .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { @@ -105,40 +130,8 @@ public void run() { } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { - if (replIsCompactionDisabledForDatabase(ci.dbname)) { - // Compaction is disabled for replicated database until after first successful incremental load. - LOG.info("Compaction is disabled for database " + ci.dbname); - continue; - } Table t = resolveTable(ci); - if (t == null) { - // Most likely this means it's a temp table - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); - continue; - } - - // check if no compaction set for this table - if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); - continue; - } - - if (replIsCompactionDisabledForTable(t)) { - // Compaction is disabled for replicated table until after first successful incremental load. - LOG.info("Compaction is disabled for table " + ci.getFullTableName()); - continue; - } - - // Check to see if this is a table level request on a partitioned table. If so, - // then it's a dynamic partitioning case and we shouldn't check the table itself. - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null) { - LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); - continue; - } // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, @@ -176,13 +169,42 @@ public void run() { txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); StorageDescriptor sd = resolveStorageDescriptor(t, p); - String runAs = findUserToRunAs(sd.getLocation(), t); - /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. - * Long term we should consider having a thread pool here and running checkForCompactionS - * in parallel*/ - CompactionType compactionNeeded - = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); - if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); + String tblOwner = tblNameOwnersCache.getIfPresent(fullTableName); + if(tblOwner == null) { + LOG.info("unable to find the table owner in the cache for table "+ fullTableName + " " + + "will determine user based on table location"); + tblOwner = findUserToRunAs(sd.getLocation(), t); + tblNameOwnersCache.put(fullTableName,tblOwner); + } + String runAs = tblOwner; + Callable checkForCompactionTask = new Callable() { + @Override + public CompactionType call() throws Exception { + return checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); + } + }; + listenableFuture = executor.submit(checkForCompactionTask); + + Futures.addCallback(listenableFuture, new FutureCallback() { + public void onSuccess(CompactionType compactionNeeded) { + if (compactionNeeded != null) { + try { + requestCompaction(ci, runAs, compactionNeeded); + } catch (MetaException e) { + LOG.error("unable to request the compaction, caught unexpected exception "+StringUtils.stringifyException(e)); + } + } + } + + public void onFailure(Throwable e) { + LOG.error("Caught exception while trying to determine if we should compact " + ci +" logging exception"+ StringUtils.stringifyException(e)); + try { + txnHandler.markFailed(ci); + } catch (MetaException ex) { + LOG.error("unable to mark the compaction failure, caught unexpected exception: "+StringUtils.stringifyException(e)); + } + } + }); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + ci + ". Marking failed to avoid repeated failures, " + @@ -217,8 +239,23 @@ public void run() { } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + StringUtils.stringifyException(t)); - } + } finally { + if(executor != null) { + executor.shutdown(); + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info("Got interrupted exception while waiting for task to finished"); + } + if (listenableFuture != null) { + listenableFuture.cancel(true); + } + executor.shutdownNow(); + executor = null; + } } +} + @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @@ -398,4 +435,44 @@ private boolean noAutoCompactSet(Table t) { } return noAutoCompact != null && noAutoCompact.equalsIgnoreCase("true"); } + + // Check to see if this is a table level request on a partitioned table. If so, + // then it's a dynamic partitioning case and we shouldn't check the table itself. + public boolean checkDynPartitioning(Table t, CompactionInfo ci){ + if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && + ci.partName == null) { + LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + + " partitioning"); + return true; + } + return false; + } + + public boolean checkCompactionElig(CompactionInfo ci){ + Table t = null; + try { + t = resolveTable(ci); + if(t == null) { + LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + + "table or has been dropped and moving on."); + return false; + } + + if(replIsCompactionDisabledForDatabase(ci.dbname)){ + return false; + } + + if(noAutoCompactSet(t)) { + return false; + } else if(replIsCompactionDisabledForTable(t)) { + return false; + }else if(checkDynPartitioning(t,ci)) { + return false; + } + + } catch (Throwable e) { + LOG.error("Caught Exception while checking compactiton eligibility "+StringUtils.stringifyException(e)); + } + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index a6dd4fa003..efa39c0e5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -78,7 +78,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { try { Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - return ReplUtils.isFirstIncPending(database.getParameters()); + boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); + if(isReplCompactDisabled){ + LOG.info("Compaction is disabled for database " + dbName); + } + return isReplCompactDisabled; } catch (NoSuchObjectException e) { LOG.info("Unable to find database " + dbName); return true; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java index 1c3cad2d33..287561a967 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java @@ -49,7 +49,7 @@ * same construction parameters should result in exactly the same * ObjectInspector. */ -public final class ObjectInspectorFactory { +public final class ObjectInspectorFactory { /** * ObjectInspectorOptions describes what ObjectInspector to use. JAVA is to * use pure JAVA reflection. THRIFT is to use JAVA reflection and filter out