diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 54b616e60c..b614f5b8fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidTxnList; @@ -43,7 +42,6 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import java.io.IOException; @@ -71,10 +69,13 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); replChangeManager = ReplChangeManager.getInstance(conf); + setName("CompactionCleaner_" + getId()); } @Override public void run() { + LOG.info("Starting Compactor Cleaner thread with name {} and ID {}", getName(), getId()); + if (cleanerCheckInterval == 0) { cleanerCheckInterval = conf.getTimeVar( HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS); @@ -97,8 +98,7 @@ public void run() { clean(compactionInfo, minOpenTxnId); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor cleaner, " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor cleaner", t); } finally { if (handle != null) { @@ -110,26 +110,24 @@ public void run() { } // Now, go back to bed until it's time to do this again long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= cleanerCheckInterval || stop.get()) { - continue; - } else { + if (elapsedTime < cleanerCheckInterval && !stop.get()) { try { Thread.sleep(cleanerCheckInterval - elapsedTime); } catch (InterruptedException ie) { - // What can I do about it? + LOG.error("Caught an exception while compactor cleaner was sleeping.", ie); } } } while (!stop.get()); } private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { - LOG.info("Starting cleaning for " + ci); + LOG.info("Starting cleaning for {}", ci); try { Table t = resolveTable(ci); if (t == null) { // The table was dropped before we got around to cleaning it. - LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + - idWatermark(ci)); + LOG.info("Unable to find table {} for compaction cleaning request {}, assuming it was dropped, moving on", + ci.getFullPartitionName(), ci); txnHandler.markCleaned(ci); return; } @@ -138,8 +136,8 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { p = resolvePartition(ci); if (p == null) { // The partition was dropped before we got around to cleaning it. - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped." + idWatermark(ci)); + LOG.info("Unable to find partition {} for compaction cleaning request {}, assuming it was dropped, moving on", + ci.getFullPartitionName(), ci); txnHandler.markCleaned(ci); return; } @@ -150,7 +148,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); //save it so that getAcidState() sees it conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - /** + /* * {@code validTxnList} is capped by minOpenTxnGLB so if * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta * produced by a compactor, that means every reader that could be active right now see it @@ -196,41 +194,34 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { if (runJobAsSelf(ci.runAs)) { removeFiles(location, validWriteIdList, ci); } else { - LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); + LOG.info("Cleaning as user {} for {}", ci.runAs, ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, UserGroupInformation.getLoginUser()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - removeFiles(location, validWriteIdList, ci); - return null; - } + ugi.doAs((PrivilegedExceptionAction) () -> { + removeFiles(location, validWriteIdList, ci); + return null; }); try { FileSystem.closeAllForUGI(ugi); } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName() + idWatermark(ci), exception); + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci, exception); } } txnHandler.markCleaned(ci); } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + - StringUtils.stringifyException(e)); + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci, e); ci.errorMessage = e.getMessage(); txnHandler.markFailed(ci); } } - private static String idWatermark(CompactionInfo ci) { - return " id=" + ci.id; - } + private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) throws IOException, NoSuchObjectException, MetaException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from( false), false, null, false); List obsoleteDirs = dir.getObsolete(); - /** + /* * add anything in 'dir' that only has data from aborted transactions - no one should be * trying to read anything in that dir (except getAcidState() that only reads the name of * this dir itself) @@ -246,15 +237,14 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti filesToDelete.add(stat); extraDebugInfo.append(stat.getName()).append(","); if(!FileUtils.isPathWithinSubtree(stat, locPath)) { - LOG.info(idWatermark(ci) + " found unexpected file: " + stat); + LOG.info("For compaction cleaner request {} found unexpected file with path {}", ci, stat); } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + - " obsolete directories from " + location + ". " + extraDebugInfo.toString()); + LOG.info("Compaction cleaner request {} is about to remove {} obsolete directories from {}. {}", ci, + filesToDelete.size(), location, extraDebugInfo); if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + - ", that hardly seems right."); + LOG.warn("Nothing to delete for compaction cleaner {} for directory {}.", ci, location); return; } @@ -263,7 +253,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("Going to delete path {}", dead); if (ReplChangeManager.shouldEnableCm(db, table)) { replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 543ec0b991..4612ecd76d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -108,7 +108,6 @@ static final private String BASE_DIR = "hive.compactor.base.dir"; static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; - static final private String TMPDIR = "_tmp"; static final private String TBLPROPS_PREFIX = "tblprops."; static final private String COMPACTOR_PREFIX = "compactor."; @@ -159,6 +158,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag // Set appropriate Acid readers/writers based on the table properties. AcidUtils.setAcidOperationalProperties(job, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); + LOG.debug("Compaction job configuration {}", job); return job; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..cfb7c21261 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +81,7 @@ @Override public void run() { + LOG.info("Starting Compactor Initiator thread with name {} and ID {}", getName(), getId()); // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { @@ -111,11 +111,11 @@ public void run() { Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval) .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet()); - LOG.debug("Found " + potentials.size() + " potential compactions, " + - "checking to see if we should compact any of them"); + LOG.debug("Found {} potential compactions, checking to see if we should compact any of them.", + potentials.size()); Map tblNameOwners = new HashMap<>(); - List compactionList = new ArrayList<>(); + List> compactionList = new ArrayList<>(); if (!potentials.isEmpty()) { ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( @@ -128,8 +128,8 @@ public void run() { Table t = resolveTable(ci); Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Can't find partition " + ci.getFullPartitionName() + - ", assuming it has been dropped and moving on."); + LOG.info("Can't find partition {}, assuming it has been dropped and moving on.", + ci.getFullPartitionName()); continue; } String runAs = resolveUserToRunAs(tblNameOwners, t, p); @@ -156,8 +156,7 @@ public void run() { // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. txnHandler.cleanTxnToWriteIdTable(); } catch (Throwable t) { - LOG.error("Initiator loop caught unexpected exception this time through the loop: " + - StringUtils.stringifyException(t)); + LOG.error("Compactor initiator loop caught unexpected exception this time through the loop.", t); } finally { if(handle != null) { @@ -172,8 +171,7 @@ public void run() { } while (!stop.get()); } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor initiator, exiting " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor initiator, exiting.", t); } } @@ -233,6 +231,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS); compactionExecutor = Executors.newFixedThreadPool(conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE)); + setName("CompactionInitiator_" + getId()); } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { @@ -249,8 +248,8 @@ private boolean lookForCurrentCompactions(ShowCompactResponse compactions, if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && - (e.getPartitionname() == null && ci.partName == null || - e.getPartitionname().equals(ci.partName))) { + ((e.getPartitionname() == null && ci.partName == null) || + (e.getPartitionname() != null && e.getPartitionname().equals(ci.partName)))) { return true; } } @@ -266,25 +265,20 @@ private CompactionType checkForCompaction(final CompactionInfo ci, throws IOException, InterruptedException { // If it's marked as too many aborted, we already know we need to compact if (ci.tooManyAborts) { - LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + - "initiating major compaction"); + LOG.debug("Found too many aborted transaction for {}, initiating major compaction.", ci.getFullPartitionName()); return CompactionType.MAJOR; } if (runJobAsSelf(runAs)) { - return determineCompactionType(ci, writeIds, sd, tblproperties); + return determineCompactionType(writeIds, sd, tblproperties); } else { - LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); + LOG.info("Going to initiate as user {} for {}.", runAs, ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); CompactionType compactionType; try { - compactionType = ugi.doAs(new PrivilegedExceptionAction() { - @Override - public CompactionType run() throws Exception { - return determineCompactionType(ci, writeIds, sd, tblproperties); - } - }); + compactionType = ugi.doAs( + (PrivilegedExceptionAction) () -> determineCompactionType(writeIds, sd, tblproperties)); } finally { try { FileSystem.closeAllForUGI(ugi); @@ -297,9 +291,9 @@ public CompactionType run() throws Exception { } } - private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds, + private CompactionType determineCompactionType(ValidWriteIdList writeIds, StorageDescriptor sd, Map tblproperties) - throws IOException, InterruptedException { + throws IOException { boolean noBase = false; Path location = new Path(sd.getLocation()); @@ -307,11 +301,11 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false); Path base = dir.getBaseDirectory(); long baseSize = 0; - FileStatus stat = null; + FileStatus stat; if (base != null) { stat = fs.getFileStatus(base); - if (!stat.isDir()) { - LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!"); + if (!stat.isDirectory()) { + LOG.error("Was assuming base {} is directory, but it's a file!", base.toString()); return null; } baseSize = sumDirSize(fs, base); @@ -326,9 +320,8 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi List deltas = dir.getCurrentDirectories(); for (AcidUtils.ParsedDelta delta : deltas) { stat = fs.getFileStatus(delta.getPath()); - if (!stat.isDir()) { - LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " + - "but it's a file!"); + if (!stat.isDirectory()) { + LOG.error("Was assuming delta {} is a directory, but it's a file!", delta.getPath().toString()); return null; } deltaSize += sumDirSize(fs, delta.getPath()); @@ -343,18 +336,11 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; - if (LOG.isDebugEnabled()) { - StringBuilder msg = new StringBuilder("delta size: "); - msg.append(deltaSize); - msg.append(" base size: "); - msg.append(baseSize); - msg.append(" threshold: "); - msg.append(deltaPctThreshold); - msg.append(" will major compact: "); - msg.append(bigEnough); - LOG.debug(msg.toString()); + if (bigEnough) { + LOG.debug("delta size: {}, base size: {}, threshold: {}, will major compact.", deltaSize, baseSize, + deltaPctThreshold); + return CompactionType.MAJOR; } - if (bigEnough) return CompactionType.MAJOR; } String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + @@ -367,14 +353,12 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi return null; } if (AcidUtils.isInsertOnlyTable(tblproperties)) { - LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size() - + " delta files, threshold is " + deltaNumThreshold); + LOG.debug("Requesting a major compaction for a MM table; found {} delta files, threshold is {}", + deltas.size(), deltaNumThreshold); return CompactionType.MAJOR; } - // TODO: this log statement looks wrong - LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + - (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + - " compaction"); + LOG.info("Found {} delta files {} base, threshold is {}, requesting {} compaction", deltas.size(), + noBase ? "without" : "with", deltaNumThreshold, noBase ? "major" : "minor"); // If there's no base file, do a major compaction return noBase ? CompactionType.MAJOR : CompactionType.MINOR; } @@ -382,8 +366,8 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi private long sumDirSize(FileSystem fs, Path dir) throws IOException { long size = 0; FileStatus[] buckets = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER); - for (int i = 0; i < buckets.length; i++) { - size += buckets[i].getLen(); + for (FileStatus bucket : buckets) { + size += bucket.getLen(); } return size; } @@ -416,15 +400,14 @@ private boolean noAutoCompactSet(Table t) { private static boolean checkDynPartitioning(Table t, CompactionInfo ci){ if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && ci.partName == null) { - LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + - " partitioning"); + LOG.debug("Skipping entry for {} as it is from dynamic partitioning", ci.getFullTableName()); return true; } return false; } private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse currentCompactions) { - LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); + LOG.info("Checking to see if we should compact {}", ci.getFullPartitionName()); // Check if we already have initiated or are working on a compaction for this partition // or table. If so, skip it. If we are just waiting on cleaning we can still check, @@ -432,16 +415,16 @@ private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse curre // todo: this is not robust. You can easily run `alter table` to start a compaction between // the time currentCompactions is generated and now if (lookForCurrentCompactions(currentCompactions, ci)) { - LOG.debug("Found currently initiated or working compaction for " + - ci.getFullPartitionName() + " so we will not initiate another compaction"); + LOG.debug("Found currently initiated or working compaction for {}, so we will not initiate another compaction", + ci.getFullPartitionName()); return false; } try { Table t = resolveTable(ci); if (t == null) { - LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " + - "table or has been dropped and moving on."); + LOG.info("Can't find table {}, assuming it's a temp table or has been dropped, moving on.", + ci.getFullTableName()); return false; } @@ -450,16 +433,16 @@ private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse curre } if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + - "=true so we will not compact it."); + LOG.info("Table {} marked with {}=true, initiator will not trigger compaction on it.", tableName(t), + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT); return false; } else if (replIsCompactionDisabledForTable(t) || checkDynPartitioning(t, ci)) { return false; } if (txnHandler.checkFailedCompactions(ci)) { - LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last " + - MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed."); + LOG.warn("Will not initiate compaction for {} since last {} attempts to compact it failed.", + ci.getFullPartitionName(), MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); ci.errorMessage = "Compaction is not initiated since last " + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " consecutive compaction attempts failed)"; txnHandler.markFailed(ci); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 1bf0beea40..80f7d98877 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -45,7 +45,7 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { LOG.info("Running query based minor compaction"); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -190,7 +190,7 @@ private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir, /** * Get a list of compaction queries which fills up the delta/delete-delta temporary result tables. * @param tmpTableBase an unique identifier, which helps to find all the temporary tables - * @param table + * @param table the table to compact * @param validWriteIdList list of valid write IDs. This list is used to filter out aborted/open * transactions * @return list of compaction queries, always non-null diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index 383891bfad..0e5e3c65d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -172,7 +172,7 @@ private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir, * * * @param sourceTmpTableName an unique identifier, which helps to find all the temporary tables - * @param resultTmpTableName + * @param resultTmpTableName an unique identifier, which helps to find the result temporary table * @return list of compaction queries, always non-null */ private List getCompactionQueries(String sourceTmpTableName, String resultTmpTableName, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index a96cf1e731..78ab0018bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.net.InetAddress; @@ -70,7 +69,6 @@ static final private long SLEEP_TIME = 10000; private static final int NOT_SET = -1; - private String workerName; private JobConf mrJob; // the MR job for compaction /** @@ -90,7 +88,7 @@ public static String hostname() { // don't go through Initiator for user initiated compactions) @Override public void run() { - LOG.info("Starting Worker thread"); + LOG.info("Starting Compactor Worker thread with name {} and ID {}", getName(), getId()); do { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, @@ -102,48 +100,47 @@ public void run() { if (msc == null) { msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); } - ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); - - if (ci == null && !stop.get()) { + ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(getName())); + if (ci == null) { + LOG.info("No new compaction. Sleeping {} worker thread for {}", getName(), SLEEP_TIME); try { Thread.sleep(SLEEP_TIME); continue; } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + LOG.warn("Worker thread sleep interrupted ", e); continue; } } + LOG.info("Processing compaction request {}", ci); // Find the table we will be working with. - Table t1 = null; + Table t1; try { t1 = resolveTable(ci); if (t1 == null) { - LOG.info("Unable to find table " + ci.getFullTableName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find table {}, assuming it was dropped and moving on.", ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (MetaException e) { + LOG.warn("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; - // Find the partition we will be working with, if there is one. - Partition p = null; + Partition p; try { p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find partition {}, assuming it was dropped and moving on.", ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (Exception e) { + LOG.warn("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -153,7 +150,7 @@ public void run() { // Check that the table or partition isn't sorted, as we don't yet support that. if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!"); + LOG.error("Attempt to compact sorted table {}, which is not yet supported!", ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -161,14 +158,13 @@ public void run() { if (ci.runAs == null) { ci.runAs = findUserToRunAs(sd.getLocation(), t); } - /** + /* * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since * then the Driver would already have the an open txn but then this txn would have * multiple statements in it (for query based compactor) which is not supported (and since * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); - heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -177,14 +173,14 @@ public void run() { final ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + LOG.debug("Valid compaction writIDs: {}", tblValidWriteIds.writeToString()); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); - final StringBuilder jobName = new StringBuilder(workerName); + final StringBuilder jobName = new StringBuilder(getName()); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); @@ -200,8 +196,8 @@ public void run() { } continue; } - - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); + LOG.info("Starting {} compaction for {} in {}", ci.type, ci.getFullPartitionName(), + JavaUtils.txnIdToString(compactorTxnId)); final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); @@ -209,20 +205,20 @@ public void run() { launchedJob = true; try { if (runJobAsSelf(ci.runAs)) { + LOG.debug("Running compaction as user {}", ci.runAs); mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); final Partition fp = p; final CompactionInfo fci = ci; - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir); - return null; - } + LOG.debug("Running compaction as UGI {}", ugi); + ugi.doAs((PrivilegedExceptionAction) () -> { + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir); + return null; }); try { + LOG.debug("Closing all resources for UGI {}", ugi); FileSystem.closeAllForUGI(ugi); } catch (IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + @@ -231,20 +227,21 @@ public Object run() throws Exception { } heartbeater.cancel(); msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + LOG.info("{} compaction finished for {} in {}.", ci.type, ci.getFullPartitionName(), + JavaUtils.txnIdToString(compactorTxnId)); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } } catch (Throwable e) { LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); + ". Marking it failed to avoid repeated failures. ", e); ci.errorMessage = e.getMessage(); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); compactorTxnId = NOT_SET; } - } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker " + getName(), t); try { if (msc != null && ci != null) { ci.errorMessage = t.getMessage(); @@ -252,22 +249,19 @@ public Object run() throws Exception { compactorTxnId = NOT_SET; } } catch (TException e) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e); + LOG.error("Caught an exception while trying to mark compaction " + ci + " as failed.", e); } finally { if (msc != null) { msc.close(); msc = null; } } + // Due to the caught exception, sleep a bit before retrying the compaction try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { - LOG.error("Interrupted while sleeping to instantiate metastore client"); + LOG.error("Interrupted while compactor worker thread " + getName() + " was sleeping.", e); } - } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); - compactorTxnId = NOT_SET; } finally { commitTxnIfSet(compactorTxnId); if (heartbeater != null) { @@ -282,6 +276,7 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + LOG.error("Interrupted while compactor worker thread " + getName() + " was sleeping.", e); } } } while (!stop.get()); @@ -294,9 +289,7 @@ private void commitTxnIfSet(long compactorTxnId) { msc.commitTxn(compactorTxnId); } } catch (TException e) { - LOG.error( - "Caught an exception while committing compaction in worker " + workerName + ", " - + StringUtils.stringifyException(e)); + LOG.error("Caught an exception while committing compaction in worker " + getName(), e); } } } @@ -304,12 +297,7 @@ private void commitTxnIfSet(long compactorTxnId) { @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - - StringBuilder name = new StringBuilder(hostname()); - name.append("-"); - name.append(getId()); - this.workerName = name.toString(); - setName(name.toString()); + setName("CompactionWorker" + "_" + hostname() + "_" + getId()); } public JobConf getMrJob() { @@ -362,7 +350,7 @@ void gatherStats() { return; } if (columnList.isEmpty()) { - LOG.debug(ci + ": No existing stats found. Will not run analyze."); + LOG.debug("{}: No existing stats found. Will not run analyze.", ci); return;//nothing to do } //e.g. analyze table page_view partition(dt='10/15/2014',country=’US’) @@ -383,7 +371,7 @@ void gatherStats() { sb.append(colName).append(","); } sb.setLength(sb.length() - 1); //remove trailing , - LOG.info(ci + ": running '" + sb.toString() + "'"); + LOG.info("{}: running '{}'", ci, sb); conf.setVar(HiveConf.ConfVars.METASTOREURIS,""); //todo: use DriverUtils.runOnDriver() here @@ -445,7 +433,8 @@ public void run() { Thread.sleep(interval); } } catch (Exception e) { - LOG.error("Error while heartbeating txn {} in {}, error: ", compactorTxnId, Thread.currentThread().getName(), e.getMessage()); + LOG.error("Error while heartbeating txt " + JavaUtils.txnIdToString(compactorTxnId) + " in " + + Thread.currentThread().getName(), e); } }