diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 91a321d..e4c4fce 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -462,7 +462,7 @@ public void taskKilled(String amLocation, int port, String user, } } - public Set getExecutorStatus() { + public Set getExecutorStatus() { return executorService.getExecutorsStatus(); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 752e6ee..7beed83 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -526,7 +526,7 @@ public String getLocalDirs() { } @Override - public Set getExecutorsStatus() { + public Set getExecutorsStatus() { return containerRunner.getExecutorStatus(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java index d6449db..d4d8ae0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java @@ -55,7 +55,7 @@ * Executor states. * @return Executor states. */ - public Set getExecutorsStatus(); + public Set getExecutorsStatus(); /** * Gets llap daemon configured executor memory per instance. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index fd6234a..1e931b6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -24,18 +24,33 @@ */ public interface Scheduler { + public interface ExecutorStatus { + + public abstract String getApplicationId(); + + public abstract String getDagId(); + + public abstract String getFragment(); + + public abstract long getStartTime(); + + public abstract String getThreadName(); + + public abstract boolean isPreemptable(); + + public abstract boolean isWaiting(); + + } + enum SubmissionState { ACCEPTED, // request accepted REJECTED, // request rejected as wait queue is full EVICTED_OTHER; // request accepted but evicted other low priority task } - /** - * Schedule the task or throw RejectedExecutionException if queues are full - * @param t - task to schedule - * @return SubmissionState - */ - SubmissionState schedule(T t); + QueryIdentifier findQueryByFragment(String fragmentId); + + Set getExecutorsStatus(); /** * Attempt to kill the fragment with the specified fragmentId @@ -43,7 +58,10 @@ */ void killFragment(String fragmentId); - Set getExecutorsStatus(); - - QueryIdentifier findQueryByFragment(String fragmentId); + /** + * Schedule the task or throw RejectedExecutionException if queues are full + * @param t - task to schedule + * @return SubmissionState + */ + SubmissionState schedule(T t); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 7744611..df1bb37 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -22,7 +22,9 @@ import java.text.SimpleDateFormat; import java.util.Comparator; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -79,6 +81,7 @@ * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService extends AbstractService implements Scheduler { + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -184,40 +187,10 @@ protected SimpleDateFormat initialValue() { }; @Override - public Set getExecutorsStatus() { - // TODO Change this method to make the output easier to parse (parse programmatically) - Set result = new HashSet<>(); - StringBuilder value = new StringBuilder(); + public Set getExecutorsStatus() { + Set result = new HashSet<>(); for (Map.Entry e : knownTasks.entrySet()) { - value.setLength(0); - value.append(e.getKey()); - TaskWrapper task = e.getValue(); - boolean isFirst = true; - TaskRunnerCallable c = task.getTaskRunnerCallable(); - if (c != null && c.getVertexSpec() != null) { - SignableVertexSpec fs = c.getVertexSpec(); - value.append(isFirst ? " (" : ", ").append(fs.getDagName()) - .append("/").append(fs.getVertexName()); - isFirst = false; - } - value.append(isFirst ? " (" : ", "); - if (task.isInWaitQueue()) { - value.append("in queue"); - } else if (c != null) { - long startTime = c.getStartTime(); - if (startTime != 0) { - value.append("started at ").append(sdf.get().format(new Date(startTime))); - } else { - value.append("not started"); - } - } else { - value.append("has no callable"); - } - if (task.isInPreemptionQueue()) { - value.append(", ").append("preemptable"); - } - value.append(")"); - result.add(value.toString()); + result.add(new TaskExecutorStatus(e.getKey(), e.getValue())); } return result; } @@ -832,6 +805,79 @@ public void finishableStateUpdated(boolean finishableState) { } } + public class TaskExecutorStatus implements ExecutorStatus { + + private final String fragment; + private final String vertex; + private final String dag; + private final String query; + private final String application; + private final boolean canFinish; + private final String thread; + private final boolean preemptable; + private final long startTime; + private final boolean waiting; + + public TaskExecutorStatus(String fragment, TaskWrapper task) { + this.fragment = fragment; + TaskRunnerCallable c = task.getTaskRunnerCallable(); + if (c != null) { + this.canFinish = c.canFinish(); + this.thread = c.getThreadName(); + this.vertex = c.getFragmentInfo().getVertexName(); + this.dag = c.getFragmentInfo().getQueryInfo().getDagIdString(); + this.application = c.getFragmentInfo().getQueryInfo().getAppIdString(); + this.query = c.getQueryId(); + } else { + this.canFinish = false; + this.thread = null; + this.dag = null; + this.vertex = null; + this.query = null; + this.application = null; + } + this.preemptable = task.isInPreemptionQueue(); + this.waiting = task.inWaitQueue.get(); + this.startTime = task.getTaskRunnerCallable().getStartTime(); + } + + @Override + public String getThreadName() { + return thread; + } + + @Override + public String getDagId() { + return dag; + } + + @Override + public boolean isPreemptable() { + return preemptable; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public String getFragment() { + return this.fragment; + } + + @Override + public String getApplicationId() { + return this.application; + } + + @Override + public boolean isWaiting() { + return this.waiting; + } + + } + private static class ExecutorThreadFactory implements ThreadFactory { private final ClassLoader classLoader; private final ThreadFactory defaultFactory; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 87bd5c8..d722774 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -541,4 +541,8 @@ public SignableVertexSpec getVertexSpec() { // TODO: support for binary spec? presumably we'd parse it somewhere earlier return vertex; } + + public String getThreadName() { + return this.threadName; + } } diff --git llap-server/src/main/resources/hive-webapps/llap/cluster.html llap-server/src/main/resources/hive-webapps/llap/cluster.html new file mode 100644 index 0000000..c8d3361 --- /dev/null +++ llap-server/src/main/resources/hive-webapps/llap/cluster.html @@ -0,0 +1,105 @@ + + + + + + LLAP Monitor + + + + + + + + + + + + + + + + +
+
+ +
+ +
+
+

+ LLAP has executed 0 fragments, with 0 failed + fragments, there are 0 fragments pre-empted with an expected wastage of 0 cpu seconds. +

+

+ There are 0 active fragments, with + 0 fragments in the queue. +

+

+ The cache is 0.00% full, with 0 mb and a hit-rate of + 0.00%. +

+

+ Across nodes, the system CPU is at 0.00%, with a system load average of 0. +

+

+ The cluster heap is at 0.00% of its capacity, with the system memory usage at 0.00%. +

+
+
+

0 nodes

+ + + +
+
+
+
+ + diff --git llap-server/src/main/resources/hive-webapps/llap/css/heatmap.css llap-server/src/main/resources/hive-webapps/llap/css/heatmap.css new file mode 100644 index 0000000..c4b451a5 --- /dev/null +++ llap-server/src/main/resources/hive-webapps/llap/css/heatmap.css @@ -0,0 +1,35 @@ +#heatmap-table { +} + +.heatmap-cell { + text-overflow: ellipsis; + overflow: hidden; + height: 64px; + min-width: 64px; + text-align: center; + vertical-align: bottom; +} + +.heatmap-cell a { + color: black; +} + +.heatmap-cell-great { + background-color:rgb(0,204,0) +} +.heatmap-cell-good { + background-color:rgb(159,238,0) +} +.heatmap-cell-ok { + background-color:rgb(255,255,0) +} +.heatmap-cell-worry { + background-color:rgb(255,192,0) +} +.heatmap-cell-bad { + background-color:rgb(255,60,60) +} +.heatmap-cell-unknown { + background-image:-webkit-repeating-linear-gradient(-45deg, #FF1E10, #FF1E10 3px, #ff6c00 3px, #ff6c00 6px); + opacity: 0.2; +} diff --git llap-server/src/main/resources/hive-webapps/llap/index.html llap-server/src/main/resources/hive-webapps/llap/index.html index 31a27a0..781bfd1 100644 --- llap-server/src/main/resources/hive-webapps/llap/index.html +++ llap-server/src/main/resources/hive-webapps/llap/index.html @@ -31,6 +31,7 @@ + @@ -39,6 +40,15 @@ + diff --git llap-server/src/main/resources/hive-webapps/llap/js/main.js llap-server/src/main/resources/hive-webapps/llap/js/main.js new file mode 100644 index 0000000..dc8ad8e --- /dev/null +++ llap-server/src/main/resources/hive-webapps/llap/js/main.js @@ -0,0 +1,12 @@ +$(function() { + var models = [llap.model.JvmMetrics, llap.model.LlapDaemonCacheMetrics, llap.model.LlapDaemonExecutorMetrics, llap.model.LlapDaemonInfo, llap.model.OperatingSystem] + + var views = [llap.view.Hostname, llap.view.Heap, llap.view.Cache, llap.view.Executors, llap.view.Tasks, llap.view.System] + + setInterval(function() { + $.getJSON("/jmx", function(jmx){ + models.forEach(function (m) { m.push(jmx); }); + views.forEach(function (v) { v.refresh(); }); + }); + }, 1000); // Update par sec +}); diff --git llap-server/src/main/resources/hive-webapps/llap/js/metrics.js llap-server/src/main/resources/hive-webapps/llap/js/metrics.js index 7bb2890..a2980c9 100644 --- llap-server/src/main/resources/hive-webapps/llap/js/metrics.js +++ llap-server/src/main/resources/hive-webapps/llap/js/metrics.js @@ -51,7 +51,7 @@ var jmxbean = function(jmx, name) { } -llap.model.JvmMetrics = new function() { +llap.model.JvmMetricsProto = function() { this.name = "Hadoop:service=LlapDaemon,name=JvmMetrics"; this.heap_used = 0; this.heap_max = 0; @@ -73,7 +73,8 @@ llap.model.JvmMetrics = new function() { return this; } -llap.model.LlapDaemonCacheMetrics = new function() { + +llap.model.LlapDaemonCacheMetricsProto = function() { this.name = "Hadoop:service=LlapDaemon,name=LlapDaemonCacheMetrics"; this.hit_rate = trendlist(50); this.fill_rate = trendlist(50); @@ -96,8 +97,7 @@ llap.model.LlapDaemonCacheMetrics = new function() { return this; } - -llap.model.LlapDaemonInfo = new function() { +llap.model.LlapDaemonInfoProto = function() { this.name = "Hadoop:service=LlapDaemon,name=LlapDaemonInfo"; this.active_rate = trendlist(50); this.push = function(jmx) { @@ -108,14 +108,14 @@ llap.model.LlapDaemonInfo = new function() { } } -llap.model.LlapDaemonExecutorMetrics = new function() { +llap.model.LlapDaemonExecutorMetricsProto = function() { this.name = "Hadoop:service=LlapDaemon,name=LlapDaemonExecutorMetrics"; this.queue_rate = trendlist(50); this.push = function(jmx) { var bean = jmxbean(jmx, this.name); this.queue_rate.add(bean["ExecutorNumQueuedRequests"] || 0); this.lost_time = bean["PreemptionTimeLost"] || 0; - this.num_tasks = bean["ExecutorTotalRequestsHandled"]; + this.num_tasks = bean["ExecutorTotalRequestsHandled"] || 0; this.interrupted_tasks = bean["ExecutorTotalInterrupted"] || 0; this.failed_tasks = bean["ExecutorTotalExecutionFailure"] || 0; } @@ -123,7 +123,7 @@ llap.model.LlapDaemonExecutorMetrics = new function() { } -llap.model.OperatingSystem = new function() { +llap.model.OperatingSystemProto = function() { this.name = "java.lang:type=OperatingSystem"; this.sys_cpu_rate = trendlist(50); this.proc_cpu_rate = trendlist(50); @@ -141,6 +141,157 @@ llap.model.OperatingSystem = new function() { } } +llap.model.PeerProto = function(name) { + this.name = name; + var depth = 7; + // todo: use gradients for each box + this.heap_used = trendlist(depth) + this.cache_used = trendlist(depth) + this.hit_rate = trendlist(depth) + this.queue_rate = trendlist(depth) + this.active_rate = trendlist(depth) + this.fragments_failed = trendlist(depth) + this.fragments_preempted = trendlist(depth) + this.system_load = trendlist(depth) + this.system_cpu = trendlist(depth) + this.system_mem = trendlist(depth) + this.models = [new llap.model.JvmMetricsProto(), new llap.model.LlapDaemonCacheMetricsProto(), + new llap.model.LlapDaemonExecutorMetricsProto(), new llap.model.LlapDaemonInfoProto(), + new llap.model.OperatingSystemProto()] + this.host = "" + this.push = function(node) { + this.node = node + this.host = node.host + var jmxport = node["jmx-port"] || 15002 + var jmxurl = "http://"+node.host+":" + jmxport + "/jmx" + this.url = "http://"+node.host+":" + jmxport + var models = this.models // closure-ref + var peer = this // closure-ref + $.getJSON(jmxurl, function(jmx) { + peer.failed = false; + models.forEach(function (m) { m.push(jmx); }); + var model; + // heap + model = peer.models[0]; + peer.heap_used.add(model.heap_rate.peek()) + // cache + model = peer.models[1] + peer.cache_used.add(model.fill_rate.peek()) + peer.hit_rate.add(model.hit_rate.peek()) + // executors + model = peer.models[2] + peer.queue_rate.add(model.queue_rate.peek()) + // tasks + peer.fragments_failed.add(100*(model.failed_tasks/model.num_tasks)); + peer.fragments_preempted.add(100*(model.interrupted_tasks/model.num_tasks)); + // info + model = peer.models[3] + peer.active_rate.add(100*(model.active_rate.peek()/model.executors)) + // os + model = peer.models[4] + peer.system_load.add(100*(model.loadavg_rate.peek()/model.cpu_cores)) + peer.system_cpu.add((model.proc_cpu_rate.peek()+model.sys_cpu_rate.peek())) + peer.system_mem.add(model.used_ram_rate.peek()) + }) +} + + // scaled by 100% + this.get = function(metric) { + if (this.failed) { + return undefined; + } + switch(metric) { + case "heap-used": + return this.heap_used + case "cache-used": + return this.cache_used + case "cache-hits": + return this.hit_rate + case "fragments-failed": + return this.fragments_failed + case "fragments-killed": + return this.fragments_preempted + case "executors-active": + return this.active_rate + case "executors-queue": + return this.queue_rate + case "system-load": + return this.system_load + case "system-cpu": + return this.system_cpu + case "system-ram": + return this.system_mem + } + return undefined; + } + + this.getvalue = function(metric) { + var trend = this.get(metric) + if (!(trend === undefined)) { + return trend.peek() + } + return -1; + } + + var convert = function(metric, n) { + // bigger is better, invert metric + if (metric == "cache-hits") n = 100 - n; + if (n < 0) return "unknown" + if (n <= 25) return "great" + if (n <= 50) return "good" + if (n <= 75) return "ok" + if (n <= 99) return "worry" + return "bad" + } + + this.getquartile = function(metric) { + var n = this.getvalue(metric) + return convert(metric, n) + } + + this.trend = function(metric) { + var trend = this.get(metric) + return trend.map(function(v) { return convert(metric, v) }) + } +} + +llap.model.PeersProto = function() { + this.instances = [] + this.peers = {} + + var uniq = this.instances + this.peers.get = function(key) { + if(!(key in this)) { + this[key] = new llap.model.PeerProto(key) + uniq.push(key) + } + return this[key]; + } + this.push = function(peers) { + var singletons = this.peers; // closure-ref + peers.peers.forEach(function(node) { + singletons.get(node.identity).push(node); + }) + var active = peers.peers.map(function(node) { + return node.identity; + }) + var dead = this.instances.filter(function(name) { return active.indexOf(name) < 0 ; } ) + console.log(dead) + dead.forEach(function(name) { + singletons.get(name).failed = true; + }) + } +} + + +// create objects before the views - the views are bound by closures to these objects +llap.model.JvmMetrics = new llap.model.JvmMetricsProto() +llap.model.LlapDaemonCacheMetrics = new llap.model.LlapDaemonCacheMetricsProto() +llap.model.LlapDaemonInfo = new llap.model.LlapDaemonInfoProto() +llap.model.LlapDaemonExecutorMetrics = new llap.model.LlapDaemonExecutorMetricsProto() +llap.model.OperatingSystem = new llap.model.OperatingSystemProto() +llap.model.Peers = new llap.model.PeersProto() + llap.view.Hostname = new function () { this.refresh = function() { $("#hostname").text(llap.model.JvmMetrics.hostname); @@ -195,7 +346,7 @@ llap.view.Tasks = new function() { $("#fragments-total").text(model.num_tasks); $("#fragments-failed").text(model.failed_tasks); $("#fragments-preempted").text(model.interrupted_tasks); - $("#fragments-preemption-time").text((model.lost_time / 1000.0).toFixed(3)); + $("#fragments-preemption-time").text((model.lost_time / 1000.0).toFixed(3)); } } @@ -219,17 +370,82 @@ llap.view.System = new function() { } } -})(); - -$(function() { - var models = [llap.model.JvmMetrics, llap.model.LlapDaemonCacheMetrics, llap.model.LlapDaemonExecutorMetrics, llap.model.LlapDaemonInfo, llap.model.OperatingSystem] +var gradient = function(states) { + var colours = { + // same as CSS rules + "bad" : "rgb(255,60,60)", + "worry" : " rgb(255,192,0)", + "ok" : "rgb(255,255,0)", + "good" : "rgb(159,238,0)", + "great" : "rgb(0,204,0)", + "unknown" : "rgba(255,60,60,0.3)" + }; + var s = "background: linear-gradient(to bottom"; + var prev = colours["unknown"] + for (var i = 0 ; i < states.length; i++) { + var pct = Math.ceil(100*(i/states.length)); + var colour = colours[states[i]] + if (colour != prev) { + s += ", " + colour + " " + pct + "% "; + prev = colour; + } + } + s += " )"; + return s; + // linear-gradient(to bottom, #1e5799 0%,#2989d8 50%,#207cca 51%,#7db9e8 100%); +} - var views = [llap.view.Hostname, llap.view.Heap, llap.view.Cache, llap.view.Executors, llap.view.Tasks, llap.view.System] +llap.view.Heatmap = new function() { + this.count = 0; + this.refresh = function() { + var metric = $("#heatmap-metric").val() + var model = llap.model.Peers + var count = model.instances.length + $("#peer-count").text(count + " nodes") + var maxCols = 20 + var cols = (count >= maxCols) ? maxCols : count + var rows = Math.max(1,count / maxCols) + if (count != this.count) { + // count can only go up really, the identities are only additive + this.count = count; + var t = $("
") + for (var r = 0; r < rows; r++) { + var tr = $("") + for (var c = 0; c < cols; c++) { + var i = r*cols + c; + console.log(i, model.instances.length) + var td = $("") + if (i < model.instances.length) { + var peer = model.peers[model.instances[i]]; + var div = $("
") + div.attr("id", "heat-"+model.instances[i]) + div.attr("class", "heatmap-cell-unknown") + div.attr("title", peer.host) + var a = $("") + a.attr("href", peer.url) + a.attr("id", "label-"+model.instances[i]) + div.append(a) + td.append(div) + } + tr.append(td) + } + t.append(tr) + } + $("#heatmap-table").replaceWith(t) + } + for (var i = 0; i < model.instances.length; i++) { + var peer = model.peers[model.instances[i]]; + var div = $("#heat-"+model.instances[i]) + var span = $("#label-"+model.instances[i]) + var state = peer.getquartile(metric) + var value = peer.getvalue(metric) === undefined ? "?" : peer.getvalue(metric).toFixed(2) + div.attr("class", "heatmap-cell heatmap-cell-"+state) + if (state !== "unknown") { + div.attr("style", gradient(peer.trend(metric))); + } + span.text(value) + } + } +} - setInterval(function() { - $.getJSON("/jmx", function(jmx){ - models.forEach(function (m) { m.push(jmx); }); - views.forEach(function (v) { v.refresh(); }); - }); - }, 1000); // Update par sec -}); +})(); diff --git llap-server/src/main/resources/hive-webapps/llap/js/overall.js llap-server/src/main/resources/hive-webapps/llap/js/overall.js new file mode 100644 index 0000000..b026d75 --- /dev/null +++ llap-server/src/main/resources/hive-webapps/llap/js/overall.js @@ -0,0 +1,12 @@ +$(function() { + var root = llap.model.Peers + + var views = [llap.view.Heatmap] + + setInterval(function() { + $.getJSON("/peers", function(peers){ + root.push(peers); + views.forEach(function (v) { v.refresh(); }); + }); + }, 1000); // Update per sec +});