diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index bb70db4524..deec7c4236 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -276,14 +276,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor Path path = stat.getFileStatus().getPath(); //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); - LOG.debug("Adding original file " + path + " to dirs to search"); + LOG.debug("Adding original file {} to dirs to search.", path); } // Set base to the location so that the input format reads the original files. baseDir = new Path(sd.getLocation()); } } else { // add our base to the list of directories to search for files in. - LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + LOG.debug("Adding base directory {} to dirs to seatch.", baseDir); dirsToSearch.add(baseDir); } } @@ -316,7 +316,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { - LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + LOG.debug("Adding delta {} to directories to search", delta.getPath()); dirsToSearch.add(delta.getPath()); deltaDirs.add(delta.getPath()); minTxn = Math.min(minTxn, delta.getMinWriteId()); @@ -328,6 +328,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + LOG.debug("Compactor job configuration {}", job.toString()); // Add tokens for all the file system in the input path. ArrayList dirs = new ArrayList<>(); @@ -342,26 +343,25 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa mrJob = job; } - LOG.info("Submitting " + compactionType + " compaction job '" + - job.getJobName() + "' to " + job.getQueueName() + " queue. " + - "(current delta dirs count=" + curDirNumber + - ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); + LOG.info("Submitting {} compaction job '{}' to {} queue.(current delta dirs count = {}, obsolete delta dirs " + + "count = {}. TxnIdRange[{}, {}]", compactionType, job.getJobName(), + job.getQueueName(), curDirNumber, obsoleteDirNumber, minTxn, maxTxn); JobClient jc = null; try { jc = new JobClient(job); RunningJob rj = jc.submitJob(job); - LOG.info("Submitted compaction job '" + job.getJobName() + - "' with jobID=" + rj.getID() + " compaction ID=" + id); + LOG.info("Submitted compaction job '{}' with jobId = {} compaction ID = {}", job.getJobName(), + rj.getID(), id); try { msc.setHadoopJobid(rj.getID().toString(), id); } catch (TException e) { - LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString() - + " compactionId=" + id, e); + LOG.warn("Error setting hadoop job, jobId = {}, compactionId = {}. {}", rj.getID(), id, + StringUtils.stringifyException(e)); } rj.waitForCompletion(); if (!rj.isSuccessful()) { - throw new IOException((compactionType == CompactionType.MAJOR ? "Major" : "Minor") + - " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID()); + LOG.error("{} compactor job failed for {}! Hadoop JobId: {}", compactionType, jobName, + rj.getID()); } } finally { if (jc!=null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index f238eb5dd0..ea2c9f613f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -37,7 +37,8 @@ */ final class MajorQueryCompactor extends QueryCompactor { - @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + @Override + void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -67,7 +68,8 @@ * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], * Configuration, boolean)}, we will end up with one file per bucket. */ - @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); Util.moveContents(new Path(tempTable.getSd().getLocation()), new Path(dest), true, false, conf, actualWriteIds, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index bad5d00a8d..c809557a18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -61,8 +61,8 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { - LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table - .getTableName()); + LOG.debug("Going to delete directories for aborted transactions for MM table {}.{}", + table.getDbName(), table.getTableName()); AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); @@ -71,7 +71,7 @@ // Then, actually do the compaction. if (!compactionInfo.isMajorCompaction()) { // Not supported for MM tables right now. - LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); + LOG.info("Not compacting {}; not a major compaction", storageDescriptor.getLocation()); return; } @@ -136,10 +136,10 @@ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throw if (filesToDelete.size() < 1) { return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + LOG.info("About to remove {} aborted directories from {}", filesToDelete.size(), dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("Going to delete path {}", dead.toString()); fs.delete(dead, true); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index d234910490..ad3ce66a83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -174,8 +174,8 @@ static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory } if (!isEnoughToCompact) { - LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(), - dir.getBaseDirectory(), deltaInfo, origCount); + LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", + sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); } return isEnoughToCompact; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 2f2bb21a13..a0d7290f3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -26,7 +26,6 @@ * Simple factory class, which returns an instance of {@link QueryCompactor}. */ final class QueryCompactorFactory { - /** * Factory class, no need to expose constructor. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 383969a3a6..012204612e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.net.InetAddress; @@ -67,8 +66,8 @@ static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private long SLEEP_TIME = 10000; + private static final String COMPACTOR_THREAD_PREFIX = "Compactor_thread"; - private String workerName; private JobConf mrJob; // the MR job for compaction /** @@ -88,7 +87,7 @@ public static String hostname() { // don't go through Initiator for user initiated compactions) @Override public void run() { - LOG.info("Starting Worker thread"); + LOG.info("Starting Worker thread with name {}", getName()); do { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, @@ -99,15 +98,15 @@ public void run() { if (msc == null) { msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); } - ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); + ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(getName())); + LOG.debug("Processing compaction request {}", ci); if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + LOG.warn("Worker thread sleep interrupted ", e); continue; } } @@ -117,30 +116,30 @@ public void run() { try { t1 = resolveTable(ci); if (t1 == null) { - LOG.info("Unable to find table " + ci.getFullTableName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find table {}, assuming it was dropped and moving on.", ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (MetaException e) { + LOG.debug("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; - + LOG.debug("Collection of compaction table {} successfully finished.", ci.getFullTableName()); // Find the partition we will be working with, if there is one. Partition p = null; try { p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find partition {}, assuming it was dropped and moving on.", ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (Exception e) { + LOG.debug("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -150,7 +149,7 @@ public void run() { // Check that the table or partition isn't sorted, as we don't yet support that. if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!"); + LOG.error("Attempt to compact sorted table {}, which is not yet supported!", ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -165,6 +164,7 @@ public void run() { * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + LOG.debug("Compaction transaction ID {}", compactorTxnId); heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -174,14 +174,14 @@ public void run() { final ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + LOG.debug("ValidCompactWriteIdList: {}", tblValidWriteIds); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); - final StringBuilder jobName = new StringBuilder(workerName); + final StringBuilder jobName = new StringBuilder(getName()); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); @@ -198,13 +198,17 @@ public void run() { continue; } - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); + LOG.info("Starting {} compaction for table {} in partition {} in transaction {}", + ci.type.toString(), ci.getFullTableName(), ci.getFullPartitionName(), + JavaUtils.txnIdToString(compactorTxnId)); + final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { + LOG.debug("Compaction will be running as user {}", ci.runAs); if (runJobAsSelf(ci.runAs)) { mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir); } else { @@ -228,20 +232,21 @@ public Object run() throws Exception { } heartbeater.cancel(); msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + LOG.debug("Compaction {} is marked as compacted.", compactorTxnId); msc.commitTxn(compactorTxnId); + LOG.debug("Compaction is committed under transaction ID {}", compactorTxnId); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } } catch (Throwable e) { - LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); + LOG.error("Caught exception while trying to compact " + ci + ". Marking compaction as failed to avoid" + + " repeated failures.", e); ci.errorMessage = e.getMessage(); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); } } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker " + getName(), t); try { if (msc != null && ci != null) { ci.errorMessage = t.getMessage(); @@ -258,11 +263,10 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { - LOG.error("Interrupted while sleeping to instantiate metastore client"); + LOG.error("Interrupted while sleeping to instantiate metastore client", e); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker {}", t); } finally { if(heartbeater != null) { heartbeater.cancel(); @@ -276,6 +280,7 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + LOG.error("Interrupted while sleeping before retrying compaction " + getName(), e); } } } while (!stop.get()); @@ -284,12 +289,7 @@ public Object run() throws Exception { @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - - StringBuilder name = new StringBuilder(hostname()); - name.append("-"); - name.append(getId()); - this.workerName = name.toString(); - setName(name.toString()); + setName("[" + COMPACTOR_THREAD_PREFIX + "-" + hostname() + "-" + getId() + "]"); } public JobConf getMrJob() {