diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index ee2c0f3e23..fc5a30c3bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -214,6 +214,8 @@ private void overrideMRProps(JobConf job, Map properties) { void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc) throws IOException { + final String threadName = Thread.currentThread().getName(); + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } @@ -257,7 +259,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor * Thus, force N minor compactions first to reduce number of deltas and then follow up with * the compaction actually requested in {@link ci} which now needs to compact a lot fewer deltas */ - LOG.warn(parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName() + LOG.warn(threadName + ";" + parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName() + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " + "especially if this message repeats. Check that compaction is running properly. Check for any " + "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); @@ -287,14 +289,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor Path path = stat.getFileStatus().getPath(); //note that originalFiles are all original files recursively not dirs dirsToSearch.add(path); - LOG.debug("Adding original file " + path + " to dirs to search"); + LOG.debug("{};Adding original file {} to dirs to search.", threadName, path); } // Set base to the location so that the input format reads the original files. baseDir = new Path(sd.getLocation()); } } else { // add our base to the list of directories to search for files in. - LOG.debug("Adding base directory " + baseDir + " to dirs to search"); + LOG.debug("{};Adding base directory {} to dirs to seatch.", threadName, baseDir); dirsToSearch.add(baseDir); } } @@ -351,6 +353,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa List parsedDeltas, int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf, IMetaStoreClient msc, long id, String jobName) throws IOException { + final String threadName = Thread.currentThread().getName(); job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); if(dirsToSearch == null) { dirsToSearch = new StringableList(); @@ -363,7 +366,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { - LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + LOG.debug("{};Adding delta {} to directories to search", threadName, delta.getPath()); dirsToSearch.add(delta.getPath()); deltaDirs.add(delta.getPath()); minTxn = Math.min(minTxn, delta.getMinWriteId()); @@ -375,6 +378,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + LOG.debug("{};Compactor job configuration {}", threadName, job.toString()); // Add tokens for all the file system in the input path. ArrayList dirs = new ArrayList<>(); @@ -389,26 +393,25 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa mrJob = job; } - LOG.info("Submitting " + compactionType + " compaction job '" + - job.getJobName() + "' to " + job.getQueueName() + " queue. " + - "(current delta dirs count=" + curDirNumber + - ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); + LOG.info("{};Submitting {} compaction job '{}' to {} queue.(current delta dirs count = {}, obsolete delta dirs " + + "count = {}. TxnIdRange[{}, {}]", threadName, compactionType, job.getJobName(), + job.getQueueName(), curDirNumber, obsoleteDirNumber, minTxn, maxTxn); JobClient jc = null; try { jc = new JobClient(job); RunningJob rj = jc.submitJob(job); - LOG.info("Submitted compaction job '" + job.getJobName() + - "' with jobID=" + rj.getID() + " compaction ID=" + id); + LOG.info("{};Submitted compaction job '{}' with jobId = {} compaction ID = {}", threadName, job.getJobName(), + rj.getID(), id); try { msc.setHadoopJobid(rj.getID().toString(), id); } catch (TException e) { - LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString() - + " compactionId=" + id, e); + LOG.warn("{};Error setting hadoop job, jobId = {}, compactionId = {}. {}", threadName, rj.getID(), id, + StringUtils.stringifyException(e)); } rj.waitForCompletion(); if (!rj.isSuccessful()) { - throw new IOException((compactionType == CompactionType.MAJOR ? "Major" : "Minor") + - " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID()); + LOG.error("{};{} compactor job failed for {}! Hadoop JobId: {}", threadName, compactionType, jobName, + rj.getID()); } } finally { if (jc!=null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 10681c0202..689e74b151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +51,10 @@ private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName()); - @Override - void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + final String threadName = Thread.currentThread().getName(); + LOG.debug("{};Running query based major compaction.", threadName); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); AcidUtils.Directory dir = AcidUtils @@ -79,7 +81,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD try { // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, table); - LOG.info("Running major compaction query into temp table with create definition: {}", query); + LOG.info("{};Running major compaction query into temp table with create definition: {}", threadName, query); try { DriverUtils.runOnDriver(conf, user, sessionState, query); } catch (Exception ex) { @@ -92,7 +94,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD } } query = buildCrudMajorCompactionQuery(table, partition, tmpTableName); - LOG.info("Running major compaction via query: {}", query); + LOG.info("{};Running major compaction via query: {}", threadName, query); /* * This will create bucket files like: * db/db_tmp_compactor_tbl_1234/00000_0 @@ -109,13 +111,14 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds, compactorTxnId); } catch (HiveException e) { - LOG.error("Error doing query based major compaction", e); + LOG.error(threadName + ";Error doing query based major compaction", e); throw new IOException(e); } finally { try { DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); } catch (HiveException e) { - LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); + LOG.error("{};Unable to delete drop temp table {} which was created for running major compaction. {}", + threadName, tmpTableName, StringUtils.stringifyException(e)); LOG.error(ExceptionUtils.getStackTrace(e)); } } @@ -197,11 +200,11 @@ private void commitCrudMajorCompaction(String from, String tmpTableName, String .statementId(-1); Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); if (!fs.exists(fromPath)) { - LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); + LOG.info("{};{} not found. Assuming 0 splits. Creating {}", Thread.currentThread().getName(), from, newBaseDir); fs.mkdirs(newBaseDir); return; } - LOG.info("Moving contents of {} to {}", tmpTablePath, to); + LOG.info("{};Moving contents of {} to {}", Thread.currentThread().getName(), tmpTablePath, to); /* * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on * TODO/ToThink: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index f7e0a85c1f..374693cf92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -68,8 +68,9 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { - LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table - .getTableName()); + final String threadName = Thread.currentThread().getName(); + LOG.debug("{}; Going to delete directories for aborted transactions for MM table {}.{}", threadName, + table.getDbName(), table.getTableName()); AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); @@ -78,7 +79,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD // Then, actually do the compaction. if (!compactionInfo.isMajorCompaction()) { // Not supported for MM tables right now. - LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); + LOG.info("{}; Not compacting {}; not a major compaction", threadName, storageDescriptor.getLocation()); return; } @@ -108,7 +109,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String query = buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), baseLocation.toString()); - LOG.info("Compacting a MM table into " + query); + LOG.info("{} Compacting a MM table into {}", threadName, query); try { DriverUtils.runOnDriver(driverConf, user, sessionState, query); break; @@ -123,13 +124,13 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD } } String query = buildMmCompactionQuery(table, partition, tmpTableName); - LOG.info("Compacting a MM table via " + query); + LOG.info("{};Compacting a MM table via {}", threadName, query); long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf); DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId); DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName); } catch (HiveException e) { - LOG.error("Error compacting a MM table", e); + LOG.error("{};Error compacting a MM table {}", threadName, StringUtils.stringifyException(e)); throw new IOException(e); } } @@ -141,10 +142,11 @@ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throw if (filesToDelete.size() < 1) { return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + LOG.info("{};About to remove {} aborted directories from {}", Thread.currentThread().getName(), + filesToDelete.size(), dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); + LOG.debug("{};Going to delete path {}", Thread.currentThread().getName(), dead.toString()); fs.delete(dead, true); } } @@ -286,13 +288,14 @@ private void commitMmCompaction(String from, String to, Configuration conf, Vali .statementId(-1).visibilityTxnId(compactorTxnId); Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + LOG.info("{};{} not found. Assuming 0 splits. Creating {}", Thread.currentThread().getName(), from, newBaseDir); fs.mkdirs(newBaseDir); return; } - LOG.info("Moving contents of " + from + " to " + to); + LOG.info("{};Moving contents of {} to {}", Thread.currentThread().getName(), from, to); FileStatus[] children = fs.listStatus(fromPath); if (children.length != 1) { + LOG.error("{};Too many files in the source {}", Thread.currentThread().getName(), Arrays.toString(children)); throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); } FileStatus dirPath = children[0]; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 80119de22f..cebf919f07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -88,8 +88,8 @@ static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory } if (!isEnoughToCompact) { - LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(), - dir.getBaseDirectory(), deltaInfo, origCount); + LOG.debug("{};Not compacting {}; current base: {}, delta files: {}, originals: {}", + Thread.currentThread().getName(), sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); } return isEnoughToCompact; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 41cb4b64fb..738203b4d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -43,7 +43,8 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com if (compactionInfo.isMajorCompaction()) { return new MajorQueryCompactor(); } else { - throw new RuntimeException("Query based compaction is not currently supported for minor compactions"); + throw new RuntimeException(Thread.currentThread().getName() + + "Query based compaction is not currently supported for minor compactions"); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 3270175a80..55a9942086 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -64,8 +64,8 @@ static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private long SLEEP_TIME = 10000; + private static final String COMPACTOR_THREAD_PREFIX = "Compactor_thread"; - private String workerName; private JobConf mrJob; // the MR job for compaction /** @@ -85,7 +85,7 @@ public static String hostname() { // don't go through Initiator for user initiated compactions) @Override public void run() { - LOG.info("Starting Worker thread"); + LOG.info("{};Starting Worker thread", getName()); do { boolean launchedJob = false; // Make sure nothing escapes this run method and kills the metastore at large, @@ -96,15 +96,15 @@ public void run() { msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); } final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo( - msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); + msc.findNextCompact(getName())); + LOG.debug("{};Processing compaction request {}", getName(), ci); if (ci == null && !stop.get()) { try { Thread.sleep(SLEEP_TIME); continue; } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); + LOG.warn("{};Worker thread sleep interrupted ", getName(), e.getMessage()); continue; } } @@ -114,30 +114,32 @@ public void run() { try { t1 = resolveTable(ci); if (t1 == null) { - LOG.info("Unable to find table " + ci.getFullTableName() + - ", assuming it was dropped and moving on."); + LOG.info("{};Unable to find table {}, assuming it was dropped and moving on.", getName(), + ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (MetaException e) { + LOG.debug("{};Exception happened durign compaction cleaning. Retrying...", getName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } // This chicanery is to get around the fact that the table needs to be final in order to // go into the doAs below. final Table t = t1; - + LOG.debug("{};Collection of compaction table {} successfully finished.", getName(), ci.getFullTableName()); // Find the partition we will be working with, if there is one. Partition p = null; try { p = resolvePartition(ci); if (p == null && ci.partName != null) { - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped and moving on."); + LOG.info("{};Unable to find partition {}, assuming it was dropped and moving on.", getName(), + ci.getFullPartitionName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } } catch (Exception e) { + LOG.debug("{};Exception happened during compaction cleaning. Retrying...", getName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -147,7 +149,8 @@ public void run() { // Check that the table or partition isn't sorted, as we don't yet support that. if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!"); + LOG.error("{};Attempt to compact sorted table {}, which is not yet supported!", getName(), + ci.getFullTableName()); msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); continue; } @@ -162,6 +165,7 @@ public void run() { * this case some of the statements are DDL, even in the future will not be allowed in a * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + LOG.debug("{};Compaction transaction ID {}", getName(), compactorTxnId); heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); heartbeater.start(); @@ -171,24 +175,27 @@ public void run() { final ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + LOG.debug("{};ValidCompactWriteIdList: {}", getName(), tblValidWriteIds); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); - final StringBuilder jobName = new StringBuilder(workerName); + final StringBuilder jobName = new StringBuilder(getName()); jobName.append("-compactor-"); jobName.append(ci.getFullPartitionName()); - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId)); + LOG.info("{};Starting {} compaction for table {} in partition {} in transaction {}", getName(), + ci.type.toString(), ci.getFullTableName(), ci.getFullPartitionName(), + JavaUtils.txnIdToString(compactorTxnId)); final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); launchedJob = true; try { + LOG.debug("{};Compaction will be running as user {}", getName(), ci.runAs); if (runJobAsSelf(ci.runAs)) { mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc); } else { @@ -205,24 +212,26 @@ public Object run() throws Exception { try { FileSystem.closeAllForUGI(ugi); } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + LOG.error(getName() + "Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), exception); } } heartbeater.cancel(); msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + LOG.debug("{};Compaction is marked as compacted.", getName()); msc.commitTxn(compactorTxnId); + LOG.debug("{};Compaction is committed under transaction ID {}", getName(), compactorTxnId); if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { mrJob = mr.getMrJob(); } } catch (Throwable e) { - LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); + LOG.error("{};Caught exception while trying to compact {}. Marking compaction as failed to avoid " + + "repeated failures", getName(), ci, StringUtils.stringifyException(e)); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); } } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + + LOG.error("{};Caught an exception in the main loop of compactor worker {}", getName(), StringUtils.stringifyException(t)); if (msc != null) { msc.close(); @@ -231,10 +240,10 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { - LOG.error("Interrupted while sleeping to instantiate metastore client"); + LOG.error("{};Interrupted while sleepin to instantiate metastore client", getName()); } } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + + LOG.error("{};Caught an exception in the main loop of compactor worker {}", getName(), StringUtils.stringifyException(t)); } finally { if(heartbeater != null) { @@ -249,6 +258,8 @@ public Object run() throws Exception { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + LOG.error("{};Interrupted while sleeping before retrying compaction {}", getName(), + StringUtils.stringifyException(e)); } } } while (!stop.get()); @@ -257,12 +268,7 @@ public Object run() throws Exception { @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); - - StringBuilder name = new StringBuilder(hostname()); - name.append("-"); - name.append(getId()); - this.workerName = name.toString(); - setName(name.toString()); + setName("[" + COMPACTOR_THREAD_PREFIX + "-" + hostname() + "-" + getId() + "]"); } public JobConf getMrJob() {