diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 089b88cdb0..d970cc86d0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2745,7 +2745,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false, "Means Major compaction on full CRUD tables is done as a query, " + "and minor compaction will be disabled."), - SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), + SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), "This is set to compactor from within the query based compactor. This enables the Tez SplitGrouper " + "to group splits based on their bucket number, so that all rows from different bucket files " + " for the same bucket number can end up in the same bucket file after the compaction."), @@ -4397,7 +4397,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", true, "Whether to setup split locations to match nodes on which llap daemons are running, " + "preferring one of the locations provided by the split itself. If there is no llap daemon " + - "running on any of those locations (or on the cloud), fall back to a cache affinity to" + + "running on any of those locations (or on the cloud), fall back to a cache affinity to" + " an LLAP node. This is effective only if hive.execution.mode is llap."), LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true, "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" + @@ -4438,7 +4438,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false, "Whether lock metrics (wait times, counts) are collected for LLAP " + "related locks"), - + LLAP_TASK_TIME_SUMMARY( + "hive.llap.task.time.print.summary", false, + "Display queue and runtime of tasks by host for every query executed by the shell."), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" + diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index e4dfe4e445..1d19948657 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -20,6 +20,8 @@ import java.util.concurrent.atomic.AtomicLongArray; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounters; /** @@ -31,10 +33,12 @@ private LlapWmCounters currentCounter = null; private long currentCounterStartTime = 0; private final AtomicLongArray fixedCounters; + private final boolean addTaskTimeCounters; - public WmFragmentCounters() { + public WmFragmentCounters(boolean addTaskTimeCounters) { // Note: WmFragmentCounters are created before Tez counters are created. this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length); + this.addTaskTimeCounters = addTaskTimeCounters; } public void changeStateQueued(boolean isGuaranteed) { @@ -119,6 +123,21 @@ public void dumpToTezCounters(TezCounters tezCounters, boolean isLast) { for (int i = 0; i < fixedCounters.length(); ++i) { tezCounters.findCounter(LlapWmCounters.values()[i]).setValue(fixedCounters.get(i)); } + + // add queue and runtime (together with task count) on a "per daemon" level + // to the Tez counters. + if (addTaskTimeCounters) { + String hostName = MetricsUtils.getHostName(); + long queued = fixedCounters.get(LlapWmCounters.GUARANTEED_QUEUED_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_QUEUED_NS.ordinal()); + long running = fixedCounters.get(LlapWmCounters.GUARANTEED_RUNNING_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_RUNNING_NS.ordinal()); + + CounterGroup cg =tezCounters.getGroup("LlapTaskRuntimeAgg by daemon"); + cg.findCounter("QueueTime-" + hostName).setValue(queued); + cg.findCounter("RunTime-" + hostName).setValue(running); + cg.findCounter("Count-" + hostName).setValue(1); + } } @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 7a3ca2f965..77db64305b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -27,12 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.NotTezEventHelper; -import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; -import org.apache.hadoop.hive.llap.counters.LlapWmCounters; import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -65,7 +64,6 @@ import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; -import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -73,7 +71,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.NDC; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; @@ -142,7 +139,6 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference this.executorService = executorService; completionListener = (SchedulerFragmentCompletingListener) executorService; - // Distribute the available memory between the tasks. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors); this.metrics = metrics; @@ -280,8 +276,12 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Configuration callableConf = new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); + + boolean addTaskTimes = callableConf.getBoolean(ConfVars.TEZ_EXEC_SUMMARY.varname, false) + && callableConf.getBoolean(ConfVars.LLAP_TASK_TIME_SUMMARY.varname, false); + // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. - WmFragmentCounters wmCounters = new WmFragmentCounters(); + WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index aaf9674621..82bb06adfd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase; @@ -54,7 +53,6 @@ import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -680,8 +678,8 @@ void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutio LOG.info("Attempting to execute {}", taskWrapper); } TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable(); - task.setWmCountersRunning(); ListenableFuture future = executorService.submit(task); + task.setWmCountersRunning(); runningFragmentCount.incrementAndGet(); taskWrapper.setIsInWaitQueue(false);