diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index ee2c0f3e23..aa59bd14e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -287,14 +287,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor Path path = stat.getFileStatus().getPath(); //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); - LOG.debug("Adding original file " + path + " to dirs to search"); + LOG.debug("Adding original file {} to dirs to search.", path); } // Set base to the location so that the input format reads the original files. baseDir = new Path(sd.getLocation()); } } else { // add our base to the list of directories to search for files in. - LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + LOG.debug("Adding base directory {} to dirs to seatch.", baseDir); dirsToSearch.add(baseDir); } } @@ -363,7 +363,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { - LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + LOG.debug("Adding delta {} to directories to search", delta.getPath()); dirsToSearch.add(delta.getPath()); deltaDirs.add(delta.getPath()); minTxn = Math.min(minTxn, delta.getMinWriteId()); @@ -375,6 +375,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + LOG.debug("Compactor job configuration {}", job.toString()); // Add tokens for all the file system in the input path. ArrayList dirs = new ArrayList<>(); @@ -389,26 +390,25 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa mrJob = job; } - LOG.info("Submitting " + compactionType + " compaction job '" + - job.getJobName() + "' to " + job.getQueueName() + " queue. " + - "(current delta dirs count=" + curDirNumber + - ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); + LOG.info("Submitting {} compaction job '{}' to {} queue.(current delta dirs count = {}, obsolete delta dirs " + + "count = {}. TxnIdRange[{}, {}]", compactionType, job.getJobName(), + job.getQueueName(), curDirNumber, obsoleteDirNumber, minTxn, maxTxn); JobClient jc = null; try { jc = new JobClient(job); RunningJob rj = jc.submitJob(job); - LOG.info("Submitted compaction job '" + job.getJobName() + - "' with jobID=" + rj.getID() + " compaction ID=" + id); + LOG.info("Submitted compaction job '{}' with jobId = {} compaction ID = {}", job.getJobName(), + rj.getID(), id); try { msc.setHadoopJobid(rj.getID().toString(), id); } catch (TException e) { - LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString() - + " compactionId=" + id, e); + LOG.warn("Error setting hadoop job, jobId = {}, compactionId = {}. {}", rj.getID(), id, + StringUtils.stringifyException(e)); } rj.waitForCompletion(); if (!rj.isSuccessful()) { - throw new IOException((compactionType == CompactionType.MAJOR ? "Major" : "Minor") + - " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID()); + LOG.error("{} compactor job failed for {}! Hadoop JobId: {}", compactionType, jobName, + rj.getID()); } } finally { if (jc!=null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 10681c0202..d76fe4728d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -53,6 +53,7 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + LOG.debug("Running query based major compaction."); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); AcidUtils.Directory dir = AcidUtils @@ -115,7 +116,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD try { DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); } catch (HiveException e) { - LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); + LOG.error("Unable to delete temp table " + tmpTableName + " which was created for major compaction.", e); LOG.error(ExceptionUtils.getStackTrace(e)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 27a3ce8d2d..7ad8434f4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -68,8 +68,8 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { - LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table - .getTableName()); + LOG.debug("Going to delete directories for aborted transactions for MM table {}.{}", + table.getDbName(), table.getTableName()); AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); @@ -78,7 +78,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD // Then, actually do the compaction. if (!compactionInfo.isMajorCompaction()) { // Not supported for MM tables right now. - LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); + LOG.info("Not compacting {}; not a major compaction", storageDescriptor.getLocation()); return; } @@ -108,7 +108,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String query = buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), baseLocation.toString()); - LOG.info("Compacting a MM table into " + query); + LOG.info("Compacting a MM table into {}", query); try { DriverUtils.runOnDriver(driverConf, user, sessionState, query); break; @@ -123,7 +123,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD } } String query = buildMmCompactionQuery(table, partition, tmpTableName); - LOG.info("Compacting a MM table via " + query); + LOG.info("Compacting a MM table via {}", query); long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf); DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId); @@ -141,10 +141,10 @@ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throw if (filesToDelete.size() < 1) { return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + LOG.info("About to remove {} aborted directories from {}", filesToDelete.size(), dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("Going to delete path {}", dead.toString()); fs.delete(dead, true); } } @@ -286,11 +286,11 @@ private void commitMmCompaction(String from, String to, Configuration conf, Vali .statementId(-1).visibilityTxnId(compactorTxnId); Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); fs.mkdirs(newBaseDir); return; } - LOG.info("Moving contents of " + from + " to " + to); + LOG.info("Moving contents of {} to {}", from, to); FileStatus[] children = fs.listStatus(fromPath); if (children.length != 1) { throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 80119de22f..42cc254703 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -88,8 +88,8 @@ static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory } if (!isEnoughToCompact) { - LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(), - dir.getBaseDirectory(), deltaInfo, origCount); + LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", + sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); } return isEnoughToCompact; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 41cb4b64fb..7a4e704cd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -27,6 +27,7 @@ */ final class QueryCompactorFactory { + private QueryCompactorFactory() { } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 749cdb6c27..67a5a03cc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.net.InetAddress; @@ -64,8 +63,8 @@ static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private long SLEEP_TIME = 10000; + private static final String COMPACTOR_THREAD_PREFIX = "Compactor_thread"; - private String workerName; private JobConf mrJob; // the MR job for compaction /** @@ -85,7 +84,7 @@ public static String hostname() { // don't go through Initiator for user initiated compactions) @Override public void run() { - LOG.info("Starting Worker thread"); + LOG.info("Starting Worker thread with name {}", getName()); do { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, @@ -96,15 +95,15 @@ public void run() { msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); } final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo( - msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); + msc.findNextCompact(getName())); + LOG.debug("Processing compaction request {}", ci); if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + LOG.warn("Worker thread sleep interrupted ", e); continue; } } @@ -114,30 +113,30 @@ public void run() { try { t1 = resolveTable(ci); if (t1 == null) { - LOG.info("Unable to find table " + ci.getFullTableName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find table {}, assuming it was dropped and moving on.", ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (MetaException e) { + LOG.debug("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; - + LOG.debug("Collection of compaction table {} successfully finished.", ci.getFullTableName()); // Find the partition we will be working with, if there is one. Partition p = null; try { p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped and moving on."); + LOG.info("Unable to find partition {}, assuming it was dropped and moving on.", ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (Exception e) { + LOG.debug("Exception happened during compaction cleaning. Retrying...", e); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -147,7 +146,7 @@ public void run() { // Check that the table or partition isn't sorted, as we don't yet support that. if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!"); + LOG.error("Attempt to compact sorted table {}, which is not yet supported!", ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -162,6 +161,7 @@ public void run() { * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + LOG.debug("Compaction transaction ID {}", compactorTxnId); heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -171,24 +171,27 @@ public void run() { final ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + LOG.debug("ValidCompactWriteIdList: {}", tblValidWriteIds); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); - final StringBuilder jobName = new StringBuilder(workerName); + final StringBuilder jobName = new StringBuilder(getName()); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); + LOG.info("Starting {} compaction for table {} in partition {} in transaction {}", + ci.type.toString(), ci.getFullTableName(), ci.getFullPartitionName(), + JavaUtils.txnIdToString(compactorTxnId)); final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { + LOG.debug("Compaction will be running as user {}", ci.runAs); if (runJobAsSelf(ci.runAs)) { mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc); } else { @@ -211,19 +214,20 @@ public Object run() throws Exception { } heartbeater.cancel(); msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + LOG.debug("Compaction {} is marked as compacted.", compactorTxnId); msc.commitTxn(compactorTxnId); + LOG.debug("Compaction is committed under transaction ID {}", compactorTxnId); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } } catch (Throwable e) { - LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); + LOG.error("Caught exception while trying to compact " + ci + ". Marking compaction as failed to avoid" + + " repeated failures.", e); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); } } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker " + getName(), t); if (msc != null) { msc.close(); } @@ -231,11 +235,10 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { - LOG.error("Interrupted while sleeping to instantiate metastore client"); + LOG.error("Interrupted while sleeping to instantiate metastore client", e); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); + LOG.error("Caught an exception in the main loop of compactor worker " + getName(), t); } finally { if(heartbeater != null) { heartbeater.cancel(); @@ -249,6 +252,7 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + LOG.error("Interrupted while sleeping before retrying compaction " + getName(), e); } } } while (!stop.get()); @@ -257,12 +261,7 @@ public Object run() throws Exception { @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - - StringBuilder name = new StringBuilder(hostname()); - name.append("-"); - name.append(getId()); - this.workerName = name.toString(); - setName(name.toString()); + setName("[" + COMPACTOR_THREAD_PREFIX + "-" + hostname() + "-" + getId() + "]"); } public JobConf getMrJob() {