diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index b378d40964..1b06b28922 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -120,11 +121,11 @@ protected Partition resolvePartition(CompactionInfo ci) throws Exception { return null; } } catch (Exception e) { - LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage()); + LOG.error("Unable to find partition " + ci.getFullPartitionName() , e); throw e; } if (parts.size() != 1) { - LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts); + LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + Arrays.toString(parts.toArray())); throw new MetaException("Too many partitions for : " + ci.getFullPartitionName()); } return parts.get(0); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java index 4235184fec..5d7bf0c9da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java @@ -51,7 +51,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { try { return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); } catch (TException e) { - LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); + LOG.error("Unable to find table " + ci.getFullTableName(), e); throw new MetaException(e.toString()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 8180adcd66..2d655d7c34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.Ref; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -47,7 +46,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.net.InetAddress; @@ -56,7 +54,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -71,7 +74,6 @@ private static final int NOT_SET = -1; private String workerName; - private JobConf mrJob; // the MR job for compaction /** * Get the hostname that this worker is run on. Made static and public so that other classes @@ -86,195 +88,34 @@ public static String hostname() { throw new RuntimeException(e); } } -//todo: this doesn;t check if compaction is already running (even though Initiator does but we -// don't go through Initiator for user initiated compactions) + + // TODO: this doesn't check if compaction is already running (even though Initiator does but we + // don't go through Initiator for user initiated compactions) @Override public void run() { LOG.info("Starting Worker thread"); boolean computeStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS); + long timeout = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS); + boolean launchedJob; + ExecutorService executor = getTimeoutHandlingExecutor(); do { - boolean launchedJob = false; - // Make sure nothing escapes this run method and kills the metastore at large, - // so wrap it in a big catch Throwable statement. - CompactionHeartbeater heartbeater = null; - CompactionInfo ci = null; - long compactorTxnId = NOT_SET; + Future singleRun = executor.submit(() -> findNextCompactionAndExecute(computeStats)); try { - if (msc == null) { - msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); - } - ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName)); - LOG.debug("Processing compaction request " + ci); - - if (ci == null && !stop.get()) { - try { - Thread.sleep(SLEEP_TIME); - continue; - } catch (InterruptedException e) { - LOG.warn("Worker thread sleep interrupted " + e.getMessage()); - continue; - } - } - - // Find the table we will be working with. - Table t1 = null; - try { - t1 = resolveTable(ci); - if (t1 == null) { - LOG.info("Unable to find table " + ci.getFullTableName() + - ", assuming it was dropped and moving on."); - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - continue; - } - } catch (MetaException e) { - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - continue; - } - // This chicanery is to get around the fact that the table needs to be final in order to - // go into the doAs below. - final Table t = t1; - - // Find the partition we will be working with, if there is one. - Partition p = null; - try { - p = resolvePartition(ci); - if (p == null && ci.partName != null) { - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped and moving on."); - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - continue; - } - } catch (Exception e) { - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - continue; - } - - // Find the appropriate storage descriptor - final StorageDescriptor sd = resolveStorageDescriptor(t, p); - - // Check that the table or partition isn't sorted, as we don't yet support that. - if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - LOG.error("Attempt to compact sorted table "+ci.getFullTableName()+", which is not yet supported!"); - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - continue; - } - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - if (ci.runAs == null) { - ci.runAs = findUserToRunAs(sd.getLocation(), t); - } - /** - * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since - * then the Driver would already have the an open txn but then this txn would have - * multiple statements in it (for query based compactor) which is not supported (and since - * this case some of the statements are DDL, even in the future will not be allowed in a - * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ - compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); - - heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); - heartbeater.start(); - - ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId); - //with this ValidWriteIdList is capped at whatever HWM validTxnList has - final ValidCompactorWriteIdList tblValidWriteIds = - TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( - Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); - LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - - ci.highestWriteId = tblValidWriteIds.getHighWatermark(); - //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about - //it until after any data written by it are physically removed - msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); - final StringBuilder jobName = new StringBuilder(workerName); - jobName.append("-compactor-"); - jobName.append(ci.getFullPartitionName()); - - // Don't start compaction or cleaning if not necessary - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, - tblValidWriteIds, Ref.from(false), true, null, false); - if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { - if (needsCleaning(dir, sd)) { - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - } else { - // do nothing - msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); - } - continue; - } - - LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + - JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats); - final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats( - CompactionInfo.compactionInfoToStruct(ci)), conf, - runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null; - final CompactorMR mr = new CompactorMR(); + launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + LOG.info("Timeout during executing compaction", te); + // Cancel the job, and recreate the Executor as well, so we can be sure that we have an available thread + // even if we can not interrupt the task somehow. (Trade possible resource hogging for compactor stability) + singleRun.cancel(true); + executor.shutdownNow(); + executor = getTimeoutHandlingExecutor(); + launchedJob = true; + } catch (ExecutionException e) { + LOG.info("Exception during executing compaction", e); + launchedJob = true; + } catch (InterruptedException ie) { + // Do not do anything - stop should be set anyway launchedJob = true; - try { - if (runJobAsSelf(ci.runAs)) { - mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir); - } else { - UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), - UserGroupInformation.getLoginUser()); - final Partition fp = p; - final CompactionInfo fci = ci; - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir); - return null; - } - }); - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName(), exception); - } - } - heartbeater.cancel(); - msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { - mrJob = mr.getMrJob(); - } - } catch (Throwable e) { - LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); - ci.errorMessage = e.getMessage(); - msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); - msc.abortTxns(Collections.singletonList(compactorTxnId)); - compactorTxnId = NOT_SET; - } - } catch (TException | IOException t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); - try { - if (msc != null && ci != null) { - ci.errorMessage = t.getMessage(); - msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); - compactorTxnId = NOT_SET; - } - } catch (TException e) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e); - } finally { - if (msc != null) { - msc.close(); - msc = null; - } - } - try { - Thread.sleep(SLEEP_TIME); - } catch (InterruptedException e) { - LOG.error("Interrupted while sleeping to instantiate metastore client"); - } - } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + - StringUtils.stringifyException(t)); - compactorTxnId = NOT_SET; - } finally { - commitTxnIfSet(compactorTxnId); - if (heartbeater != null) { - heartbeater.cancel(); - } } // If we didn't try to launch a job it either means there was no work to do or we got @@ -297,8 +138,7 @@ private void commitTxnIfSet(long compactorTxnId) { } } catch (TException e) { LOG.error( - "Caught an exception while committing compaction in worker " + workerName + ", " - + StringUtils.stringifyException(e)); + "Caught an exception while committing compaction in worker " + workerName, e); } } } @@ -314,10 +154,6 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { setName(name.toString()); } - public JobConf getMrJob() { - return mrJob; - } - static final class StatsUpdater { static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class); @@ -516,4 +352,196 @@ public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor s } return needsJustCleaning; } + + /** + * Creates a single threaded executor used for handling timeouts. + * The thread settings are inherited from the current thread. + * @return Single threaded executor service to be used for timeout handling + */ + private ExecutorService getTimeoutHandlingExecutor() { + return Executors.newSingleThreadExecutor((r) -> { + Thread masterThread = Thread.currentThread(); + Thread t = new Thread(masterThread.getThreadGroup(), r, masterThread.getName() + "_executor"); + t.setDaemon(masterThread.isDaemon()); + t.setPriority(masterThread.getPriority()); + return t; + }); + } + + /** + * Finds the next compaction and executes it. The main thread might interrupt the execution of this method + * in case of timeout. + * @param computeStats If true then for MR compaction the stats are regenerated + * @return Returns true, if there was compaction in the queue, and we started working on it. + */ + private Boolean findNextCompactionAndExecute(boolean computeStats) { + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + CompactionHeartbeater heartbeater = null; + CompactionInfo ci = null; + long compactorTxnId = NOT_SET; + try { + if (msc == null) { + msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf); + } + ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(workerName)); + LOG.debug("Processing compaction request " + ci); + + if (ci == null) { + return false; + } + + // Find the table we will be working with. + Table t1; + try { + t1 = resolveTable(ci); + if (t1 == null) { + LOG.info("Unable to find table " + ci.getFullTableName() + + ", assuming it was dropped and moving on."); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + } catch (MetaException e) { + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + // This chicanery is to get around the fact that the table needs to be final in order to + // go into the doAs below. + final Table t = t1; + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + + // Find the partition we will be working with, if there is one. + Partition p; + try { + p = resolvePartition(ci); + if (p == null && ci.partName != null) { + LOG.info("Unable to find partition " + ci.getFullPartitionName() + + ", assuming it was dropped and moving on."); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + } catch (Exception e) { + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + + // Find the appropriate storage descriptor + final StorageDescriptor sd = resolveStorageDescriptor(t, p); + + // Check that the table or partition isn't sorted, as we don't yet support that. + if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { + LOG.error("Attempt to compact sorted table " + ci.getFullTableName() + ", which is not yet supported!"); + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + + if (ci.runAs == null) { + ci.runAs = findUserToRunAs(sd.getLocation(), t); + } + + /** + * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since + * then the Driver would already have the an open txn but then this txn would have + * multiple statements in it (for query based compactor) which is not supported (and since + * this case some of the statements are DDL, even in the future will not be allowed in a + * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */ + compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + + heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf); + heartbeater.start(); + + ValidTxnList validTxnList = msc.getValidTxns(compactorTxnId); + //with this ValidWriteIdList is capped at whatever HWM validTxnList has + final ValidCompactorWriteIdList tblValidWriteIds = + TxnUtils.createValidCompactWriteIdList(msc.getValidWriteIds( + Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0)); + LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString()); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); + //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about + //it until after any data written by it are physically removed + msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId); + final StringBuilder jobName = new StringBuilder(workerName); + jobName.append("-compactor-"); + jobName.append(ci.getFullPartitionName()); + + // Don't start compaction or cleaning if not necessary + AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, + tblValidWriteIds, Ref.from(false), true, null, false); + if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + if (needsCleaning(dir, sd)) { + msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + } else { + // do nothing + msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci)); + } + return false; + } + + LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + + JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats); + final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats( + CompactionInfo.compactionInfoToStruct(ci)), conf, + runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null; + final CompactorMR mr = new CompactorMR(); + try { + if (runJobAsSelf(ci.runAs)) { + mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc, dir); + } else { + UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), + UserGroupInformation.getLoginUser()); + final Partition fp = p; + final CompactionInfo fci = ci; + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, fci, su, msc, dir); + return null; + } + }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + ci.getFullPartitionName(), exception); + } + } + heartbeater.cancel(); + msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); + } catch (Throwable e) { + LOG.error("Caught exception while trying to compact " + ci + + ". Marking failed to avoid repeated failures", e); + ci.errorMessage = e.getMessage(); + msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); + msc.abortTxns(Collections.singletonList(compactorTxnId)); + compactorTxnId = NOT_SET; + } + } catch (TException | IOException t) { + LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); + try { + if (msc != null && ci != null) { + ci.errorMessage = t.getMessage(); + msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); + compactorTxnId = NOT_SET; + } + } catch (TException e) { + LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e); + } finally { + if (msc != null) { + msc.close(); + msc = null; + } + } + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); + compactorTxnId = NOT_SET; + } finally { + commitTxnIfSet(compactorTxnId); + if (heartbeater != null) { + heartbeater.cancel(); + } + } + return true; + } }