Index: src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (revision 1166510) +++ src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (working copy) @@ -44,13 +44,13 @@ // Mark it as finished task.markComplete("Finished!"); - assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState()); + assertEquals(MonitoredTask.State.COMPLETE, task.getState()); // It should still show up in the TaskMonitor list assertEquals(1, tm.getTasks().size()); // If we mark its completion time back a few minutes, it should get gced - ((MonitoredTaskImpl)taskFromTm).expireNow(); + task.expireNow(); assertEquals(0, tm.getTasks().size()); } Index: src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (working copy) @@ -19,28 +19,32 @@ */ package org.apache.hadoop.hbase.monitoring; -public interface MonitoredTask { +import java.io.IOException; +import java.util.Map; + +public interface MonitoredTask extends Cloneable { enum State { RUNNING, + WAITING, COMPLETE, ABORTED; } public abstract long getStartTime(); - public abstract String getDescription(); - public abstract String getStatus(); - + public abstract long getStatusTime(); public abstract State getState(); - + public abstract long getStateTime(); public abstract long getCompletionTimestamp(); public abstract void markComplete(String msg); + public abstract void pause(String msg); + public abstract void resume(String msg); public abstract void abort(String msg); + public abstract void expireNow(); public abstract void setStatus(String status); - public abstract void setDescription(String description); /** @@ -49,5 +53,24 @@ */ public abstract void cleanup(); + /** + * Public exposure of Object.clone() in order to allow clients to easily + * capture current state. + * @returns a copy of the object whose references will not change + */ + public abstract MonitoredTask clone(); -} \ No newline at end of file + /** + * Creates a string map of internal details for extensible exposure of + * monitored tasks. + * @return A Map containing information for this task. + */ + public abstract Map toMap() throws IOException; + + /** + * Creates a JSON object for parseable exposure of monitored tasks. + * @return An encoded JSON object containing information for this task. + */ + public abstract String toJSON() throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (revision 0) @@ -0,0 +1,44 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * A MonitoredTask implementation optimized for use with RPC Handlers + * handling frequent, short duration tasks. String concatenations and object + * allocations are avoided in methods that will be hit by every RPC call. + */ +public interface MonitoredRPCHandler extends MonitoredTask { + public abstract String getRPC(); + public abstract String getRPC(boolean withParams); + public abstract long getRPCPacketLength(); + public abstract String getClient(); + public abstract long getRPCStartTime(); + public abstract long getRPCQueueTime(); + public abstract boolean isRPCRunning(); + public abstract boolean isOperationRunning(); + + public abstract void setRPC(String methodName, Object [] params, + long queueTime); + public abstract void setRPCPacket(Writable param); + public abstract void setConnection(String clientAddress, int remotePort); +} Index: src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (working copy) @@ -19,22 +19,38 @@ */ package org.apache.hadoop.hbase.monitoring; -import com.google.common.annotations.VisibleForTesting; +import org.codehaus.jackson.map.ObjectMapper; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + class MonitoredTaskImpl implements MonitoredTask { private long startTime; - private long completionTimestamp = -1; + private long statusTime; + private long stateTime; - private String status; - private String description; + private volatile String status; + private volatile String description; - private State state = State.RUNNING; + protected volatile State state = State.RUNNING; public MonitoredTaskImpl() { startTime = System.currentTimeMillis(); + statusTime = startTime; + stateTime = startTime; } @Override + public synchronized MonitoredTaskImpl clone() { + try { + return (MonitoredTaskImpl) super.clone(); + } catch (CloneNotSupportedException e) { + throw new AssertionError(); // Won't happen + } + } + + @Override public long getStartTime() { return startTime; } @@ -48,6 +64,11 @@ public String getStatus() { return status; } + + @Override + public long getStatusTime() { + return statusTime; + } @Override public State getState() { @@ -55,29 +76,53 @@ } @Override - public long getCompletionTimestamp() { - return completionTimestamp; + public long getStateTime() { + return stateTime; } @Override + public long getCompletionTimestamp() { + if (state == State.COMPLETE || state == State.ABORTED) { + return stateTime; + } + return -1; + } + + @Override public void markComplete(String status) { - state = State.COMPLETE; + setState(State.COMPLETE); setStatus(status); - completionTimestamp = System.currentTimeMillis(); } @Override + public void pause(String msg) { + setState(State.WAITING); + setStatus(msg); + } + + @Override + public void resume(String msg) { + setState(State.RUNNING); + setStatus(msg); + } + + @Override public void abort(String msg) { setStatus(msg); - state = State.ABORTED; - completionTimestamp = System.currentTimeMillis(); + setState(State.ABORTED); } @Override public void setStatus(String status) { this.status = status; + statusTime = System.currentTimeMillis(); } + protected void setState(State state) { + this.state = state; + stateTime = System.currentTimeMillis(); + } + @Override public void setDescription(String description) { this.description = description; @@ -86,8 +131,7 @@ @Override public void cleanup() { if (state == State.RUNNING) { - state = State.ABORTED; - completionTimestamp = System.currentTimeMillis(); + setState(State.ABORTED); } } @@ -95,8 +139,41 @@ * Force the completion timestamp backwards so that * it expires now. */ - @VisibleForTesting - void expireNow() { - completionTimestamp -= 180 * 1000; + public void expireNow() { + stateTime -= 180 * 1000; } + + @Override + public Map toMap() { + Map map = new HashMap(); + map.put("description", getDescription()); + map.put("status", getStatus()); + map.put("state", getState()); + map.put("starttimems", getStartTime()); + map.put("statustimems", getCompletionTimestamp()); + map.put("statetimems", getCompletionTimestamp()); + return map; + } + + @Override + public String toJSON() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(toMap()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(512); + sb.append(getDescription()); + sb.append(": status="); + sb.append(getStatus()); + sb.append(", state="); + sb.append(getState()); + sb.append(", startTime="); + sb.append(getStartTime()); + sb.append(", completionTime="); + sb.append(getCompletionTimestamp()); + return sb.toString(); + } + } Index: src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (revision 0) @@ -0,0 +1,256 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * A MonitoredTask implementation designed for use with RPC Handlers + * handling frequent, short duration tasks. String concatenations and object + * allocations are avoided in methods that will be hit by every RPC call. + */ +public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl + implements MonitoredRPCHandler { + private String clientAddress; + private int remotePort; + private long rpcQueueTime; + private long rpcStartTime; + private String methodName = ""; + private Object [] params = {}; + private Writable packet; + + public MonitoredRPCHandlerImpl() { + super(); + // in this implementation, WAITING indicates that the handler is not + // actively servicing an RPC call. + setState(State.WAITING); + } + + @Override + public synchronized MonitoredRPCHandlerImpl clone() { + return (MonitoredRPCHandlerImpl) super.clone(); + } + + /** + * Gets the status of this handler; if it is currently servicing an RPC, + * this status will include the RPC information. + * @return a String describing the current status. + */ + @Override + public String getStatus() { + if (getState() != State.RUNNING) { + return super.getStatus(); + } + return super.getStatus() + " from " + getClient() + ": " + getRPC(); + } + + /** + * Accesses the queue time for the currently running RPC on the + * monitored Handler. + * @return the queue timestamp or -1 if there is no RPC currently running. + */ + public long getRPCQueueTime() { + if (getState() != State.RUNNING) { + return -1; + } + return rpcQueueTime; + } + + /** + * Accesses the start time for the currently running RPC on the + * monitored Handler. + * @return the start timestamp or -1 if there is no RPC currently running. + */ + public long getRPCStartTime() { + if (getState() != State.RUNNING) { + return -1; + } + return rpcStartTime; + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @return a string representing the method call without parameters + */ + public String getRPC() { + return getRPC(false); + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @param withParams toggle inclusion of parameters in the RPC String + * @return A human-readable string representation of the method call. + */ + public synchronized String getRPC(boolean withParams) { + if (getState() != State.RUNNING) { + // no RPC is currently running + return ""; + } + StringBuilder buffer = new StringBuilder(256); + buffer.append(methodName); + if (withParams) { + buffer.append("("); + for (int i = 0; i < params.length; i++) { + if (i != 0) + buffer.append(", "); + buffer.append(params[i]); + } + buffer.append(")"); + } + return buffer.toString(); + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @return A human-readable string representation of the method call. + */ + public long getRPCPacketLength() { + if (getState() != State.RUNNING || packet == null) { + // no RPC is currently running, or we don't have an RPC's packet info + return -1L; + } + if (!(packet instanceof WritableWithSize)) { + // the packet passed to us doesn't expose size information + return -1L; + } + return ((WritableWithSize) packet).getWritableSize(); + } + + /** + * If an RPC call is currently running, produces a String representation of + * the connection from which it was received. + * @return A human-readable string representation of the address and port + * of the client. + */ + public String getClient() { + return clientAddress + ":" + remotePort; + } + + /** + * Indicates to the client whether this task is monitoring a currently active + * RPC call. + * @return true if the monitored handler is currently servicing an RPC call. + */ + public boolean isRPCRunning() { + return getState() == State.RUNNING; + } + + /** + * Indicates to the client whether this task is monitoring a currently active + * RPC call to a database command. (as defined by + * o.a.h.h.client.Operation) + * @return true if the monitored handler is currently servicing an RPC call + * to a database command. + */ + public boolean isOperationRunning() { + if(!isRPCRunning()) { + return false; + } + for(Object param : params) { + if (param instanceof Operation) { + return true; + } + } + return false; + } + + /** + * Tells this instance that it is monitoring a new RPC call. + * @param methodName The name of the method that will be called by the RPC. + * @param params The parameters that will be passed to the indicated method. + */ + public synchronized void setRPC(String methodName, Object [] params, + long queueTime) { + this.methodName = methodName; + this.params = params; + this.rpcStartTime = System.currentTimeMillis(); + this.rpcQueueTime = queueTime; + this.state = State.RUNNING; + } + + /** + * Gives this instance a reference to the Writable received by the RPC, so + * that it can later compute its size if asked for it. + * @param param The Writable received by the RPC for this call + */ + public void setRPCPacket(Writable param) { + this.packet = param; + } + + /** + * Registers current handler client details. + * @param clientAddress the address of the current client + * @param remotePort the port from which the client connected + */ + public void setConnection(String clientAddress, int remotePort) { + this.clientAddress = clientAddress; + this.remotePort = remotePort; + } + + public synchronized Map toMap() { + // only include RPC info if the Handler is actively servicing an RPC call + Map map = super.toMap(); + if (getState() != State.RUNNING) { + return map; + } + Map rpcJSON = new HashMap(); + ArrayList paramList = new ArrayList(); + map.put("rpcCall", rpcJSON); + rpcJSON.put("queuetimems", getRPCQueueTime()); + rpcJSON.put("starttimems", getRPCStartTime()); + rpcJSON.put("clientaddress", clientAddress); + rpcJSON.put("remoteport", remotePort); + rpcJSON.put("packetlength", getRPCPacketLength()); + rpcJSON.put("method", methodName); + rpcJSON.put("params", paramList); + for(Object param : params) { + if(param instanceof byte []) { + paramList.add(Bytes.toStringBinary((byte []) param)); + } else if (param instanceof Operation) { + paramList.add(((Operation) param).toMap()); + } else { + paramList.add(param.toString()); + } + } + return map; + } + + @Override + public String toString() { + if (getState() != State.RUNNING) { + return super.toString(); + } + return super.toString() + ", rpcMethod=" + getRPC(); + } + +} Index: src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (working copy) @@ -71,12 +71,23 @@ stat.getClass().getClassLoader(), new Class[] { MonitoredTask.class }, new PassthroughInvocationHandler(stat)); + TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); + tasks.add(pair); + return proxy; + } + public MonitoredRPCHandler createRPCStatus(String description) { + MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(); + stat.setDescription(description); + MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance( + stat.getClass().getClassLoader(), + new Class[] { MonitoredRPCHandler.class }, + new PassthroughInvocationHandler(stat)); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); tasks.add(pair); return proxy; } - + private synchronized void purgeExpiredTasks() { int size = 0; @@ -107,11 +118,17 @@ } } + /** + * Produces a list containing copies of the current state of all non-expired + * MonitoredTasks handled by this TaskMonitor. + * @return A complete list of MonitoredTasks. + */ public synchronized List getTasks() { purgeExpiredTasks(); ArrayList ret = Lists.newArrayListWithCapacity(tasks.size()); for (TaskAndWeakRefPair pair : tasks) { - ret.add(pair.get()); + MonitoredTask t = pair.get(); + ret.add(t.clone()); } return ret; } @@ -181,7 +198,8 @@ } /** - * An InvocationHandler that simply passes through calls to the original object. + * An InvocationHandler that simply passes through calls to the original + * object. */ private static class PassthroughInvocationHandler implements InvocationHandler { private T delegatee; Index: src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java (working copy) @@ -40,7 +40,12 @@ assert hrs != null : "No RS in context!"; resp.setContentType("text/html"); - new RSStatusTmpl().render(resp.getWriter(), hrs); + RSStatusTmpl tmpl = new RSStatusTmpl(); + if (req.getParameter("format") != null) + tmpl.setFormat(req.getParameter("format")); + if (req.getParameter("filter") != null) + tmpl.setFilter(req.getParameter("filter")); + tmpl.render(resp.getWriter(), hrs); } } Index: src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java (working copy) @@ -61,13 +61,17 @@ List servers = master.getServerManager().getOnlineServersList(); response.setContentType("text/html"); - new MasterStatusTmpl() + MasterStatusTmpl tmpl = new MasterStatusTmpl() .setFrags(frags) .setShowAppendWarning(shouldShowAppendWarning(conf)) .setRootLocation(rootLocation) .setMetaLocation(metaLocation) - .setServers(servers) - .render(response.getWriter(), + .setServers(servers); + if (request.getParameter("filter") != null) + tmpl.setFilter(request.getParameter("filter")); + if (request.getParameter("format") != null) + tmpl.setFormat(request.getParameter("format")); + tmpl.render(response.getWriter(), master, admin); } Index: src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Objects; @@ -316,7 +317,7 @@ @Override public Writable call(Class protocol, - Writable param, long receivedTime) + Writable param, long receivedTime, MonitoredRPCHandler status) throws IOException { try { Invocation call = (Invocation)param; @@ -325,6 +326,9 @@ "cause is a version mismatch between client and server."); } if (verbose) log("Call: " + call); + status.setRPC(call.getMethodName(), call.getParameters(), receivedTime); + status.setRPCPacket(param); + status.resume("Servicing call"); Method method = protocol.getMethod(call.getMethodName(), @@ -369,7 +373,8 @@ // when tagging, we let TooLarge trump TooSmall to keep output simple // note that large responses will often also be slow. logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"), - startTime, processingTime, qTime, responseSize); + status.getClient(), startTime, processingTime, qTime, + responseSize); // provides a count of log-reported slow responses if (tooSlow) { rpcMetrics.rpcSlowResponseTime.inc(processingTime); @@ -407,13 +412,14 @@ * client Operations. * @param call The call to log. * @param tag The tag that will be used to indicate this event in the log. + * @param client The address of the client who made this call. * @param startTime The time that the call was initiated, in ms. * @param processingTime The duration that the call took to run, in ms. * @param qTime The duration that the call spent on the queue * prior to being initiated, in ms. * @param responseSize The size in bytes of the response buffer. */ - private void logResponse(Invocation call, String tag, + private void logResponse(Invocation call, String tag, String clientAddress, long startTime, int processingTime, int qTime, long responseSize) throws IOException { Object params[] = call.getParameters(); @@ -425,6 +431,7 @@ responseInfo.put("processingtimems", processingTime); responseInfo.put("queuetimems", qTime); responseInfo.put("responsesize", responseSize); + responseInfo.put("client", clientAddress); responseInfo.put("class", instance.getClass().getSimpleName()); responseInfo.put("method", call.getMethodName()); if (params.length == 2 && instance instanceof HRegionServer && Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -59,6 +59,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -1029,6 +1031,10 @@ return hostAddress; } + public int getRemotePort() { + return remotePort; + } + public void setLastContact(long lastContact) { this.lastContact = lastContact; } @@ -1176,6 +1182,7 @@ /** Handles queued calls . */ private class Handler extends Thread { private final BlockingQueue myCallQueue; + private MonitoredRPCHandler status; public Handler(final BlockingQueue cq, int instanceNumber) { this.myCallQueue = cq; @@ -1187,15 +1194,21 @@ threadName = "PRI " + threadName; } this.setName(threadName); + this.status = TaskMonitor.get().createRPCStatus(threadName); } @Override public void run() { LOG.info(getName() + ": starting"); + status.setStatus("starting"); SERVER.set(HBaseServer.this); while (running) { try { + status.pause("Waiting for a call"); Call call = myCallQueue.take(); // pop the queue; maybe blocked here + status.setStatus("Setting up call"); + status.setConnection(call.connection.getHostAddress(), + call.connection.getRemotePort()); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + @@ -1209,7 +1222,9 @@ try { if (!started) throw new ServerNotRunningYetException("Server is not running yet"); - value = call(call.connection.protocol, call.param, call.timestamp); // make the call + // make the call + value = call(call.connection.protocol, call.param, call.timestamp, + status); } catch (Throwable e) { LOG.debug(getName()+", call "+call+": error: " + e, e); errorClass = e.getClass().getName(); Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1166510) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -23,6 +23,7 @@ import com.google.common.base.Function; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import java.io.IOException; import java.net.InetSocketAddress; @@ -48,7 +49,7 @@ * @throws java.io.IOException e */ Writable call(Class protocol, - Writable param, long receiveTime) + Writable param, long receiveTime, MonitoredRPCHandler status) throws IOException; int getNumOpenConnections(); Index: src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon =================================================================== --- src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon (revision 1166510) +++ src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon (working copy) @@ -20,40 +20,74 @@ <%import> java.util.*; org.apache.hadoop.hbase.monitoring.*; +org.apache.hadoop.util.StringUtils; <%args> TaskMonitor taskMonitor = TaskMonitor.get(); +String filter = "general"; +String format = "html"; <%java> +List tasks = taskMonitor.getTasks(); +Iterator iter = tasks.iterator(); +// apply requested filter +while (iter.hasNext()) { + MonitoredTask t = iter.next(); + if (filter.equals("general")) { + if (t instanceof MonitoredRPCHandler) + iter.remove(); + } else if (filter.equals("handler")) { + if (!(t instanceof MonitoredRPCHandler)) + iter.remove(); + } else if (filter.equals("rpc")) { + if (!(t instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) t).isRPCRunning()) + iter.remove(); + } else if (filter.equals("operation")) { + if (!(t instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) t).isOperationRunning()) + iter.remove(); + } +} long now = System.currentTimeMillis(); -List tasks = taskMonitor.getTasks(); Collections.reverse(tasks); - +boolean first = true; -

Currently running tasks

- -<%if tasks.isEmpty()%> -No tasks currently running on this node. +<%if format.equals("json")%> +[<%for MonitoredTask task : tasks%><%if first%><%java>first = false;<%else>,<% task.toJSON() %>] <%else> + +

Recent tasks

+ <%if tasks.isEmpty()%> + No tasks currently running on this node. + <%else> + + + + + + + + <%for MonitoredTask task : tasks %> + + + + + + + +
Start TimeDescriptionStateStatus
<% new Date(task.getStartTime()) %><% task.getDescription() %><% task.getState() %> + (since <% StringUtils.formatTimeDiff(now, task.getStateTime()) %> ago) + <% task.getStatus() %> + (since <% StringUtils.formatTimeDiff(now, task.getStatusTime()) %> + ago)
- - - - - - -<%for MonitoredTask task : tasks %> - - - - - - - -
DescriptionStatusAge
<% task.getDescription() %><% task.getStatus() %><% (int)((now - task.getStartTime())/1000) %>s - <%if task.getCompletionTimestamp() != -1%> - (Completed <% (now - task.getCompletionTimestamp())/1000 %>s ago) -
- - \ No newline at end of file + Index: src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon =================================================================== --- src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (revision 1166510) +++ src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (working copy) @@ -19,6 +19,8 @@ <%args> HRegionServer regionServer; +String filter = "general"; +String format = "html"; <%import> java.util.*; @@ -33,6 +35,10 @@ org.apache.hadoop.hbase.HServerLoad; org.apache.hadoop.hbase.HRegionInfo; +<%if format.equals("json") %> + <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &> + <%java return; %> + <%java> HServerInfo serverInfo = null; try { @@ -73,7 +79,7 @@ Zookeeper Quorum<% regionServer.getZooKeeper().getQuorum() %>Addresses of all registered ZK servers -<& ../common/TaskMonitorTmpl &> +<& ../common/TaskMonitorTmpl; filter = filter &>

Online Regions

<%if (onlineRegions != null && onlineRegions.size() > 0) %> Index: src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon =================================================================== --- src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon (revision 1166510) +++ src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon (working copy) @@ -25,6 +25,8 @@ ServerName metaLocation = null; List servers = null; boolean showAppendWarning = false; +String filter = "general"; +String format = "html"; <%import> java.util.*; @@ -40,6 +42,10 @@ org.apache.hadoop.hbase.client.HConnectionManager; org.apache.hadoop.hbase.HTableDescriptor; +<%if format.equals("json") %> + <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &> + <%java return; %> + @@ -94,7 +100,7 @@ Zookeeper Quorum<% master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump. -<& ../common/TaskMonitorTmpl &> +<& ../common/TaskMonitorTmpl; filter = filter &> <%if (rootLocation != null) %> <& catalogTables &> Index: src/main/resources/hbase-webapps/static/hbase.css =================================================================== --- src/main/resources/hbase-webapps/static/hbase.css (revision 1166510) +++ src/main/resources/hbase-webapps/static/hbase.css (working copy) @@ -25,3 +25,9 @@ tr.task-monitor-ABORTED td { background-color: #ffa; } + +tr.task-monitor-WAITING td { + background-color: #ccc; + font-style: italic; +} ++