diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 5dfa7ca974..93f4bc2561 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidTxnList; @@ -28,9 +27,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -44,7 +40,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import java.io.IOException; @@ -52,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,9 +57,8 @@ * A class to clean directories after compactions. This will run in a separate thread. */ public class Cleaner extends MetaStoreCompactorThread { - static final private String CLASS_NAME = Cleaner.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private long cleanerCheckInterval = 0; + + private long cleanerCheckInterval = 0L; private ReplChangeManager replChangeManager; @@ -75,7 +70,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override public void run() { - if (cleanerCheckInterval == 0) { + if (cleanerCheckInterval == 0L) { cleanerCheckInterval = conf.getTimeVar( HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS); } @@ -86,7 +81,7 @@ public void run() { // first to make sure we go through a complete iteration of the loop before resetting it. boolean setLooped = !looped.get(); TxnStore.MutexAPI.LockHandle handle = null; - long startedAt = -1; + long startedAt = -1L; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { @@ -97,8 +92,7 @@ public void run() { clean(compactionInfo, minOpenTxnId); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor cleaner, " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor cleaner", t); } finally { if (handle != null) { @@ -125,18 +119,17 @@ public void run() { private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { LOG.info("Starting cleaning for " + ci); try { - Table t = resolveTable(ci); - if (t == null) { + Optional t = resolveTable(ci); + if (!t.isPresent()) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + idWatermark(ci)); txnHandler.markCleaned(ci); return; } - Partition p = null; - if (ci.partName != null) { - p = resolvePartition(ci); - if (p == null) { + Optional p = resolvePartition(ci); + if (!p.isPresent()) { + if (ci.partName != null) { // The partition was dropped before we got around to cleaning it. LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped." + idWatermark(ci)); @@ -144,7 +137,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { return; } } - StorageDescriptor sd = resolveStorageDescriptor(t, p); + StorageDescriptor sd = resolveStorageDescriptor(t.get(), p); final String location = sd.getLocation(); ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); @@ -181,7 +174,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { * useful if there is all of a sudden a flood of aborted txns. (For another day). */ List tblNames = Collections.singletonList( - TableName.getDbTable(t.getDbName(), t.getTableName())); + TableName.getDbTable(t.get().getDbName(), t.get().getTableName())); GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); rqst.setValidTxnList(validTxnList.writeToString()); GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); @@ -215,8 +208,7 @@ public Object run() throws Exception { } txnHandler.markCleaned(ci); } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + - StringUtils.stringifyException(e)); + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci, e); txnHandler.markFailed(ci); } } @@ -251,9 +243,8 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); - if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + - ", that hardly seems right."); + if (filesToDelete.isEmpty()) { + LOG.warn("Nothing to delete in the cleaner for directory: " + location); return; } @@ -262,7 +253,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("Going to delete path {}", dead); if (isSourceOfRepl) { replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java new file mode 100644 index 0000000000..54f4626d1b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.txn.compactor; + +/** + * Signals that an exception of some sort has occurred during compaction. + */ +public class CompactionException extends Exception { + + private static final long serialVersionUID = 1L; + + public CompactionException(String message) { + super(message); + } + + public CompactionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index ee2c0f3e23..df784b62c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -78,7 +78,6 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.apache.parquet.Strings; import org.apache.thrift.TException; @@ -287,14 +286,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor Path path = stat.getFileStatus().getPath(); //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); - LOG.debug("Adding original file " + path + " to dirs to search"); + LOG.debug("Adding original file {} to dirs to search", path); } // Set base to the location so that the input format reads the original files. baseDir = new Path(sd.getLocation()); } } else { // add our base to the list of directories to search for files in. - LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + LOG.debug("Adding base directory {} to dirs to search", baseDir); dirsToSearch.add(baseDir); } } @@ -363,7 +362,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { - LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + LOG.debug("Adding delta {} to directories to search", delta.getPath()); dirsToSearch.add(delta.getPath()); deltaDirs.add(delta.getPath()); minTxn = Math.min(minTxn, delta.getMinWriteId()); @@ -441,21 +440,6 @@ private void setColumnTypes(JobConf job, List cols) { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); } - // Remove the directories for aborted transactions only - private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { - // For MM table, we only want to delete delta dirs for aborted txns. - List filesToDelete = dir.getAbortedDirectories(); - if (filesToDelete.size() < 1) { - return; - } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); - FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); - fs.delete(dead, true); - } - } - public JobConf getMrJob() { return mrJob; } @@ -510,7 +494,7 @@ public long getLength() throws IOException { @Override public String[] getLocations() throws IOException { - return locations.toArray(new String[locations.size()]); + return locations.toArray(new String[0]); } @Override @@ -543,20 +527,20 @@ public void readFields(DataInput dataInput) throws IOException { locations = new ArrayList(); length = dataInput.readLong(); - LOG.debug("Read length of " + length); + LOG.debug("Read length of {}", length); int numElements = dataInput.readInt(); - LOG.debug("Read numElements of " + numElements); + LOG.debug("Read numElements of {}", numElements); for (int i = 0; i < numElements; i++) { len = dataInput.readInt(); - LOG.debug("Read file length of " + len); + LOG.debug("Read file length of {}", len); buf = new byte[len]; dataInput.readFully(buf); locations.add(new String(buf)); } bucketNum = dataInput.readInt(); - LOG.debug("Read bucket number of " + bucketNum); + LOG.debug("Read bucket number of {}", bucketNum); len = dataInput.readInt(); - LOG.debug("Read base path length of " + len); + LOG.debug("Read base path length of {}", len); if (len > 0) { buf = new byte[len]; dataInput.readFully(buf); @@ -667,8 +651,8 @@ public String toString() { bt.sawBase ? baseDir : null, deltaDirs)); } - LOG.debug("Returning " + splits.size() + " splits"); - return splits.toArray(new InputSplit[splits.size()]); + LOG.debug("Returning {} splits", splits.size()); + return splits.toArray(new InputSplit[0]); } @Override @@ -693,7 +677,7 @@ private void addFileToMap(Matcher matcher, Path file, boolean sawBase, bt = new BucketTracker(); splitToBucketMap.put(bucketNum, bt); } - LOG.debug("Adding " + file.toString() + " to list of files for splits"); + LOG.debug("Adding {} to list of files for splits", file); bt.buckets.add(file); bt.sawBase |= sawBase; } @@ -925,15 +909,9 @@ public String toString() { LOG.error(s); throw new IOException(s); } - } catch (ClassNotFoundException e) { - LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); - throw new IOException(e); - } catch (InstantiationException e) { - LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); - throw new IOException(e); - } catch (IllegalAccessException e) { - LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e)); - throw new IOException(e); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + LOG.error("Unable to instantiate class", e); + throw new IOException("Unable to instantiate class", e); } return t; } @@ -971,8 +949,7 @@ public void commitJob(JobContext context) throws IOException { Path tmpLocation = new Path(conf.get(TMP_LOCATION));//this contains base_xxx or delta_xxx_yyy Path finalLocation = new Path(conf.get(FINAL_LOCATION)); FileSystem fs = tmpLocation.getFileSystem(conf); - LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + - finalLocation.toString()); + LOG.debug("Moving contents of {} to {}", tmpLocation, finalLocation); if(!fs.exists(tmpLocation)) { /** * No 'tmpLocation' may happen if job generated created 0 splits, which happens if all @@ -1015,7 +992,7 @@ public void abortJob(JobContext context, int status) throws IOException { JobConf conf = ShimLoader.getHadoopShims().getJobConf(context); Path tmpLocation = new Path(conf.get(TMP_LOCATION)); FileSystem fs = tmpLocation.getFileSystem(conf); - LOG.debug("Removing " + tmpLocation.toString()); + LOG.debug("Removing {}", tmpLocation); fs.delete(tmpLocation, true); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 99da86f910..52318edd8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -40,7 +39,11 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -49,8 +52,8 @@ * Superclass for all threads in the compactor. */ public abstract class CompactorThread extends Thread implements Configurable { - static final private String CLASS_NAME = CompactorThread.class.getName(); - protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); protected HiveConf conf; protected RawStore rs; @@ -89,11 +92,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { * Find the table being compacted * @param ci compaction info returned from the compaction queue * @return metastore table - * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found. + * @throws CompactionException if the table cannot be found. */ - abstract Table resolveTable(CompactionInfo ci) throws MetaException; + abstract Optional
resolveTable(CompactionInfo ci) throws CompactionException; - abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws TException; + abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException; /** * Get list of partitions by name. @@ -101,36 +104,41 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { * @return list of partitions * @throws MetaException if an error occurs. */ - abstract List getPartitionsByNames(CompactionInfo ci) throws MetaException; + abstract List getPartitionsByNames(CompactionInfo ci) throws CompactionException; /** * Get the partition being compacted. + * * @param ci compaction info returned from the compaction queue - * @return metastore partition, or null if there is not partition in this compaction info - * @throws Exception if underlying calls throw, or if the partition name resolves to more than - * one partition. + * @return a metastore partition, if a partition is present, otherwise an + * empty Optional + * @throws CompactionException if underlying calls to retrieve partiton + * information fails, or if the partition name resolves to more than + * one partition. */ - protected Partition resolvePartition(CompactionInfo ci) throws Exception { - if (ci.partName != null) { - List parts; - try { - parts = getPartitionsByNames(ci); - if (parts == null || parts.size() == 0) { - // The partition got dropped before we went looking for it. - return null; - } - } catch (Exception e) { - LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage()); - throw e; - } - if (parts.size() != 1) { - LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts); - throw new MetaException("Too many partitions for : " + ci.getFullPartitionName()); - } - return parts.get(0); - } else { - return null; + protected Optional resolvePartition(final CompactionInfo ci) throws CompactionException { + Objects.requireNonNull(ci, "CompactionInfo cannot be null"); + if (ci.partName == null) { + return Optional.empty(); } + List parts = Collections.emptyList(); + try { + parts = getPartitionsByNames(ci); + } catch (Exception e) { + throw new CompactionException("Failed to find partition " + ci.getFullPartitionName(), e); + } + + final Iterator iter = parts.iterator(); + if (!iter.hasNext()) { + LOG.debug("The partition got dropped before we went looking for it"); + return Optional.empty(); + } + + final Partition partition = iter.next(); + if (iter.hasNext()) { + throw new CompactionException("Too many partitions for: " + ci.getFullPartitionName()); + } + return Optional.of(partition); } /** @@ -139,8 +147,8 @@ protected Partition resolvePartition(CompactionInfo ci) throws Exception { * @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)} * @return metastore storage descriptor. */ - protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) { - return (p == null) ? t.getSd() : p.getSd(); + protected StorageDescriptor resolveStorageDescriptor(Table t, Optional p) { + return (p.isPresent()) ? p.get().getSd() : t.getSd(); } /** @@ -155,12 +163,12 @@ protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) { */ protected String findUserToRunAs(String location, Table t) throws IOException, InterruptedException { - LOG.debug("Determining who to run the job as."); + LOG.debug("Determining who to run the job as"); final Path p = new Path(location); final FileSystem fs = p.getFileSystem(conf); try { FileStatus stat = fs.getFileStatus(p); - LOG.debug("Running job as " + stat.getOwner()); + LOG.debug("Running job as {}", stat.getOwner()); return stat.getOwner(); } catch (AccessControlException e) { // TODO not sure this is the right exception @@ -187,13 +195,12 @@ public Object run() throws Exception { } if (wrapper.size() == 1) { - LOG.debug("Running job as " + wrapper.get(0)); + LOG.debug("Running job as {}", wrapper.get(0)); return wrapper.get(0); } } - LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() + - ") or table owner(" + t.getOwner() + "), giving up"); - throw new IOException("Unable to stat file: " + p); + throw new IOException("Unable to stat file " + p + " as either current user (" + UserGroupInformation.getLoginUser() + + ") or table owner (" + t.getOwner() + "), giving up"); } /** @@ -215,7 +222,6 @@ protected String tableName(Table t) { public static void initializeAndStartThread(CompactorThread thread, Configuration conf) throws Exception { - LOG.info("Starting compactor thread of type " + thread.getClass().getName()); thread.setConf(conf); thread.setThreadId(nextThreadId.incrementAndGet()); thread.init(new AtomicBoolean(), new AtomicBoolean()); @@ -224,10 +230,6 @@ public static void initializeAndStartThread(CompactorThread thread, protected boolean replIsCompactionDisabledForTable(Table tbl) { // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); - if (isCompactDisabled) { - LOG.info("Compaction is disabled for table " + tbl.getTableName()); - } - return isCompactDisabled; + return ReplUtils.isFirstIncPending(tbl.getParameters()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7a0e32463d..1cb190dc8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -45,8 +45,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -54,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,8 +63,6 @@ * It's critical that there exactly 1 of these in a given warehouse. */ public class Initiator extends MetaStoreCompactorThread { - static final private String CLASS_NAME = Initiator.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; private Map tblNameOwnersCache = new HashMap<>(); @@ -108,14 +105,15 @@ public void run() { for (CompactionInfo ci : potentials) { // Disable minor compaction for query based compactor if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { - LOG.debug("Not compacting: " + ci.getFullPartitionName() - + ", as query based compaction currently does not " + "support minor compactions."); + LOG.debug( + "Not compacting: {}, as query based compaction currently does not support minor compactions", + ci.getFullPartitionName()); continue; } LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); try { - Table t = resolveTable(ci); + Optional
t = resolveTable(ci); // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, @@ -135,9 +133,9 @@ public void run() { } // Figure out who we should run the file operations as - Partition p = resolvePartition(ci); - if (p == null && ci.partName != null) { - LOG.info("Can't find partition " + ci.getFullPartitionName() + + Optional p = resolvePartition(ci); + if (!p.isPresent() && ci.partName != null) { + LOG.info("Failed to find partition " + ci.getFullPartitionName() + ", assuming it has been dropped and moving on."); continue; } @@ -145,26 +143,26 @@ public void run() { .createValidReadTxnList(txnHandler.getOpenTxns(), 0); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); // The response will have one entry per table and hence we get only one ValidWriteIdList - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + String fullTableName = TxnUtils.getFullTableName(t.get().getDbName(), t.get().getTableName()); GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); rqst.setValidTxnList(validTxnList.writeToString()); final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); - StorageDescriptor sd = resolveStorageDescriptor(t, p); + StorageDescriptor sd = resolveStorageDescriptor(t.get(), p); String runAs = tblNameOwnersCache.get(fullTableName); if (runAs == null) { LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " + "will determine user based on table location"); - runAs = findUserToRunAs(sd.getLocation(), t); + runAs = findUserToRunAs(sd.getLocation(), t.get()); tblNameOwnersCache.put(fullTableName, runAs); } /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. * Long term we should consider having a thread pool here and running checkForCompactionS * in parallel*/ CompactionType compactionNeeded - = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs); + = checkForCompaction(ci, tblValidWriteIds, sd, t.get().getParameters(), runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + @@ -183,8 +181,7 @@ public void run() { // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); } catch (Throwable t) { - LOG.error("Initiator loop caught unexpected exception this time through the loop: " + - StringUtils.stringifyException(t)); + LOG.error("Initiator loop caught unexpected exception this time through the loop", t); } finally { if(handle != null) { @@ -198,8 +195,7 @@ public void run() { } while (!stop.get()); } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor initiator, exiting", t); } } @@ -283,7 +279,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi if (base != null) { stat = fs.getFileStatus(base); if (!stat.isDir()) { - LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!"); + LOG.error("Was assuming base " + base + " is directory, but it's a file!"); return null; } baseSize = sumDirSize(fs, base); @@ -299,7 +295,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi for (AcidUtils.ParsedDelta delta : deltas) { stat = fs.getFileStatus(delta.getPath()); if (!stat.isDir()) { - LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " + + LOG.error("Was assuming delta " + delta.getPath() + " is a directory, " + "but it's a file!"); return null; } @@ -314,19 +310,12 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi float deltaPctThreshold = deltaPctProp == null ? HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); - boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; - if (LOG.isDebugEnabled()) { - StringBuilder msg = new StringBuilder("delta size: "); - msg.append(deltaSize); - msg.append(" base size: "); - msg.append(baseSize); - msg.append(" threshold: "); - msg.append(deltaPctThreshold); - msg.append(" will major compact: "); - msg.append(bigEnough); - LOG.debug(msg.toString()); + boolean bigEnough = (float) deltaSize / (float) baseSize > deltaPctThreshold; + LOG.debug("delta size: {} base size: {} threshold: {} will major compact: {}", deltaSize, baseSize, + deltaPctThreshold, bigEnough); + if (bigEnough) { + return CompactionType.MAJOR; } - if (bigEnough) return CompactionType.MAJOR; } String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + @@ -339,8 +328,8 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi return null; } if (AcidUtils.isInsertOnlyTable(tblproperties)) { - LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size() - + " delta files, threshold is " + deltaNumThreshold); + LOG.debug("Requesting a major compaction for a MM table; found {}" + " delta files, threshold is {}", + deltas.size(), deltaNumThreshold); return CompactionType.MAJOR; } // TODO: this log statement looks wrong @@ -352,7 +341,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi } private long sumDirSize(FileSystem fs, Path dir) throws IOException { - long size = 0; + long size = 0L; FileStatus[] buckets = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER); for (int i = 0; i < buckets.length; i++) { size += buckets[i].getLen(); @@ -385,43 +374,46 @@ private boolean noAutoCompactSet(Table t) { // Check to see if this is a table level request on a partitioned table. If so, // then it's a dynamic partitioning case and we shouldn't check the table itself. - private static boolean checkDynPartitioning(Table t, CompactionInfo ci){ - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && - ci.partName == null) { - LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); - return true; + private boolean checkDynPartitioning(Table t, CompactionInfo ci){ + if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && ci.partName == null) { + LOG.debug("Skipping entry for {} as it is from dynamic partitioning", ci.getFullTableName()); + return true; } return false; } - private boolean checkCompactionElig(CompactionInfo ci){ - Table t = null; + private boolean checkCompactionElig(CompactionInfo ci) { try { - t = resolveTable(ci); - if (t == null) { - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); + Optional
t = resolveTable(ci); + if (!t.isPresent()) { + LOG.info("Failed to find table " + ci.getFullTableName() + ", assuming table is a " + + "temp table or has been dropped. Moving on."); return false; } if (replIsCompactionDisabledForDatabase(ci.dbname)) { + LOG.info("Compaction is disabled for database " + ci.dbname); return false; } - if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + - "=true so we will not compact it."); + if (noAutoCompactSet(t.get())) { + LOG.info("Table " + tableName(t.get()) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + + "=true so we will not compact it."); return false; - } else if (replIsCompactionDisabledForTable(t)) { + } + + if (replIsCompactionDisabledForTable(t.get())) { + LOG.info("Compaction is disabled for table " + t.get()); return false; - } else if (checkDynPartitioning(t, ci)) { + } + + if (checkDynPartitioning(t.get(), ci)) { + LOG.info("Dynamic partitioning in affect for table " + t.get()); return false; } } catch (Throwable e) { - LOG.error("Caught Exception while checking compactiton eligibility " + - StringUtils.stringifyException(e)); + LOG.error("Caught Exception while checking compactiton eligibility", e); } return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 10681c0202..e40e84e346 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -109,14 +108,13 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds, compactorTxnId); } catch (HiveException e) { - LOG.error("Error doing query based major compaction", e); - throw new IOException(e); + throw new IOException("Error doing query based major compaction", e); } finally { try { DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); } catch (HiveException e) { - LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); - LOG.error(ExceptionUtils.getStackTrace(e)); + LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName, + e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 8bfb524ac6..dab1a3a21e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.thrift.TException; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; @@ -65,40 +65,35 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); } - @Override Table resolveTable(CompactionInfo ci) throws MetaException { + @Override + Optional
resolveTable(final CompactionInfo ci) throws CompactionException { try { - return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + return Optional.ofNullable(rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName)); } catch (MetaException e) { - LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); - throw e; + throw new CompactionException("Unable to find table " + ci.getFullTableName(), e); } } - @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + @Override + boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException { try { Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); - // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. - boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); - if (isReplCompactDisabled) { - LOG.info("Compaction is disabled for database " + dbName); - } - return isReplCompactDisabled; + // Compaction is disabled until after first successful incremental load. + // Check HIVE-21197 for more detail. + return ReplUtils.isFirstIncPending(database.getParameters()); } catch (NoSuchObjectException e) { - LOG.info("Unable to find database " + dbName); + LOG.info("Failed to find database " + dbName); return true; } } - @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { + @Override + List getPartitionsByNames(CompactionInfo ci) throws CompactionException { try { return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, Collections.singletonList(ci.partName)); - } catch (MetaException e) { - LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); - throw e; - } catch (NoSuchObjectException e) { - LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); - throw new MetaException(e.toString()); + } catch (NoSuchObjectException | MetaException e) { + throw new CompactionException("Failed to get partition by name", e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 27a3ce8d2d..6fe9109831 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -68,8 +68,8 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { - LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table - .getTableName()); + LOG.debug("Going to delete directories for aborted transactions for MM table {}.{}", table.getDbName(), + table.getTableName()); AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); @@ -78,7 +78,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD // Then, actually do the compaction. if (!compactionInfo.isMajorCompaction()) { // Not supported for MM tables right now. - LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); + LOG.info("Not compacting {}; not a major compaction", storageDescriptor.getLocation()); return; } @@ -129,8 +129,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId); DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName); } catch (HiveException e) { - LOG.error("Error compacting a MM table", e); - throw new IOException(e); + throw new IOException("Error compacting a MM table", e); } } @@ -138,13 +137,13 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { // For MM table, we only want to delete delta dirs for aborted txns. List filesToDelete = dir.getAbortedDirectories(); - if (filesToDelete.size() < 1) { + if (filesToDelete.isEmpty()) { return; } LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("Going to delete path {}", dead); fs.delete(dead, true); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 80119de22f..5d39f4a1ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -37,7 +37,7 @@ */ abstract class QueryCompactor { - private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class); private static final String TMPDIR = "_tmp"; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java index 4235184fec..d4c3c937f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -30,6 +29,7 @@ import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; @@ -47,33 +47,37 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); } - @Override Table resolveTable(CompactionInfo ci) throws MetaException { + @Override + Optional
resolveTable(CompactionInfo ci) throws CompactionException { try { - return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + return Optional.ofNullable(msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName)); } catch (TException e) { - LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); - throw new MetaException(e.toString()); + throw new CompactionException("Unable to find table " + ci.getFullTableName(), e); } } - @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + @Override + boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException { try { Database database = msc.getDatabase(getDefaultCatalog(conf), dbName); - // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + // Compaction is disabled until after first successful incremental load. + // Check HIVE-21197 for more detail. return ReplUtils.isFirstIncPending(database.getParameters()); } catch (NoSuchObjectException e) { LOG.info("Unable to find database " + dbName); return true; + } catch (TException e) { + throw new CompactionException("Unable to get database: " + dbName, e); } } - @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { + @Override + List getPartitionsByNames(CompactionInfo ci) throws CompactionException { try { return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, Collections.singletonList(ci.partName)); } catch (TException e) { - LOG.error("Unable to get partitions by name for CompactionInfo=" + ci); - throw new MetaException(e.toString()); + throw new CompactionException("Unable to get partitions by name for CompactionInfo=" + ci, e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 3270175a80..bed149a035 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.net.InetAddress; @@ -53,6 +52,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -97,23 +97,23 @@ public void run() { } final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo( msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); + LOG.debug("Processing compaction request {}", ci); if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + LOG.warn("Worker thread sleep interrupted", e); continue; } } // Find the table we will be working with. - Table t1 = null; + Optional
t1 = Optional.empty(); try { t1 = resolveTable(ci); - if (t1 == null) { + if (!t1.isPresent()) { LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped and moving on."); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); @@ -125,13 +125,13 @@ public void run() { } // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. - final Table t = t1; + final Table t = t1.get(); // Find the partition we will be working with, if there is one. - Partition p = null; + Optional p = Optional.empty(); try { p = resolvePartition(ci); - if (p == null && ci.partName != null) { + if (!p.isPresent() && ci.partName != null) { LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped and moving on."); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); @@ -190,11 +190,11 @@ public void run() { launchedJob = true; try { if (runJobAsSelf(ci.runAs)) { - mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc); + mr.run(conf, jobName.toString(), t, p.orElse(null), sd, tblValidWriteIds, ci, su, msc); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); - final Partition fp = p; + final Partition fp = p.orElse(null); ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { @@ -217,13 +217,12 @@ public Object run() throws Exception { } } catch (Throwable e) { LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); + ". Marking failed to avoid repeated failures", e); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); } } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); if (msc != null) { msc.close(); } @@ -234,8 +233,7 @@ public Object run() throws Exception { LOG.error("Interrupted while sleeping to instantiate metastore client"); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); } finally { if(heartbeater != null) { heartbeater.cancel(); @@ -398,7 +396,7 @@ public void run() { Thread.sleep(interval); } } catch (Exception e) { - LOG.error("Error while heartbeating txn {} in {}, error: ", compactorTxnId, Thread.currentThread().getName(), e.getMessage()); + LOG.error("Error while heartbeating txn {}", compactorTxnId, e); } } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index a93cc1b7e1..45cd1dcbfe 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -1054,6 +1054,7 @@ private void maybeStartCompactorThreads(HiveConf hiveConf) throws Exception { int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); for (int i = 0; i < numWorkers; i++) { Worker w = new Worker(); + LOG.info("Starting compactor thread of type {}", w.getClass().getName()); CompactorThread.initializeAndStartThread(w, hiveConf); } }