Index: common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java (revision 1292876) +++ common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java (working copy) @@ -33,7 +33,7 @@ * (ii) Using the incrementCounter method to increment and set named * parameters in one go, rather than having to make a get and then a set. * (iii) Using the startScope and endScope methods to start and end - * named "scopes" that record the number of times they've been + * named "scopes" that record the number of times they've been * instantiated and amount of time(in milliseconds) spent inside * the scopes. */ @@ -52,6 +52,7 @@ Long startTime = null; String numCounter = null; String timeCounter = null; + String avgTimeCounter = null; //disable default ctor - so that it can't be created without a name @SuppressWarnings("unused") @@ -67,6 +68,7 @@ this.name = name; this.numCounter = name + ".n"; this.timeCounter = name + ".t"; + this.avgTimeCounter = name + ".avg_t"; open(); } @@ -77,7 +79,6 @@ */ public void open() throws IOException { if (!isOpen) { - Metrics.incrementCounter(numCounter); isOpen = true; startTime = System.currentTimeMillis(); } else { @@ -92,7 +93,13 @@ public void close() throws IOException { if (isOpen) { Long endTime = System.currentTimeMillis(); - Metrics.incrementCounter(timeCounter, endTime - startTime); + synchronized(metrics) { + Long num = Metrics.incrementCounter(numCounter); + Long time = Metrics.incrementCounter(timeCounter, endTime - startTime); + if (num != null && time != null) { + Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue())); + } + } } else { throw new IOException("Scope named " + name + " is not open, cannot be closed."); } @@ -116,7 +123,7 @@ static MetricsMBean metrics = new MetricsMBeanImpl(); - static ThreadLocal> threadLocalScopes + static ThreadLocal> threadLocalScopes = new ThreadLocal>() { @Override protected synchronized HashMap initialValue() { @@ -138,29 +145,37 @@ } } - public static void incrementCounter(String name) throws IOException{ + public static Long incrementCounter(String name) throws IOException{ if (!initialized) { - return; + return null; } - incrementCounter(name,Long.valueOf(1)); + return incrementCounter(name,Long.valueOf(1)); } - public static void incrementCounter(String name, long increment) throws IOException{ + public static Long incrementCounter(String name, long increment) throws IOException{ if (!initialized) { - return; + return null; } - if (!metrics.hasKey(name)) { - set(name,Long.valueOf(increment)); - } else { - set(name, ((Long)get(name)) + increment); + Long value; + synchronized(metrics) { + if (!metrics.hasKey(name)) { + value = Long.valueOf(increment); + set(name, value); + } else { + value = ((Long)get(name)) + increment; + set(name, value); + } } + return value; } public static void set(String name, Object value) throws IOException{ if (!initialized) { return; } - metrics.put(name,value); + synchronized(metrics) { + metrics.put(name,value); + } } public static Object get(String name) throws IOException{ Index: common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java (revision 1292876) +++ common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java (working copy) @@ -42,7 +42,8 @@ boolean dirtyAttributeInfoCache = true; MBeanConstructorInfo[] ctors = null; - MBeanOperationInfo[] ops = null; + MBeanOperationInfo[] ops = {new MBeanOperationInfo("reset", + "Sets the values of all Attributes to 0", null, "void", MBeanOperationInfo.ACTION)}; MBeanNotificationInfo[] notifs = null; @Override @@ -88,10 +89,13 @@ } @Override - public Object invoke(String arg0, Object[] arg1, String[] arg2) + public Object invoke(String name, Object[] args, String[] signature) throws MBeanException, ReflectionException { - // no invocations. - return null; + if (name.equals("reset")) { + reset(); + return null; + } + throw new ReflectionException(new NoSuchMethodException(name)); } @Override @@ -112,7 +116,7 @@ setAttribute(attr); attributesSet.add(attr); } catch (AttributeNotFoundException e) { - // ignore exception - we simply don't add this attribute + // ignore exception - we simply don't add this attribute // back in to the resultant set. } catch (InvalidAttributeValueException e) { // ditto @@ -152,4 +156,11 @@ } } + public void reset() { + synchronized(metricsMap) { + for (String key : metricsMap.keySet()) { + metricsMap.put(key, Long.valueOf(0)); + } + } + } }