Index: src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java (revision 0) @@ -0,0 +1,165 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.*; + +import org.apache.hadoop.hbase.Stoppable; +import org.junit.Test; + +/** + * Tests the sliding window metric tracker. + */ +public class TestSlidingWindowMetric { + + @Test + public void testSlidingByTrigger() throws Exception { + + SlidingWindowMetric metric = + new SlidingWindowMetric(1000, 3, "testSlidingByTrigger", null); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + // 3 * 10 + metric.increment(10); + metric.increment(10); + metric.increment(10); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + metric.rollBucket(); + + assertEquals(3, metric.getNumOps()); + assertEquals(10f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(10, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 20 + metric.increment(20); + metric.increment(20); + metric.increment(20); + + metric.rollBucket(); + + assertEquals(6, metric.getNumOps()); + assertEquals(15f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(20, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 30 + metric.increment(30); + metric.increment(30); + metric.increment(30); + + metric.rollBucket(); + + assertEquals(9, metric.getNumOps()); + assertEquals(20f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(30, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 40 (and we expect an eviction) + metric.increment(40); + metric.increment(40); + metric.increment(40); + + metric.rollBucket(); + + assertEquals(9, metric.getNumOps()); + assertEquals(30f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(40, metric.getMax()); + assertEquals(20, metric.getMin()); + } + + @Test + public void testSlidingByTime() throws Exception { + + SlidingWindowMetric metric = + new SlidingWindowMetric(500, 3, "testSlidingByTime", + new Stoppable() { + @Override + public void stop(String why) {} + @Override + public boolean isStopped() {return false;} + }); + metric.start(); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + // 3 * 10 + metric.increment(10); + metric.increment(10); + metric.increment(10); + + Thread.sleep(550); + + assertEquals(3, metric.getNumOps()); + assertEquals(10f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(10, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 20 + metric.increment(20); + metric.increment(20); + metric.increment(20); + + Thread.sleep(550); + + assertEquals(6, metric.getNumOps()); + assertEquals(15f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(20, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 30 + metric.increment(30); + metric.increment(30); + metric.increment(30); + + Thread.sleep(550); + + assertEquals(9, metric.getNumOps()); + assertEquals(20f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(30, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 40 (and we expect an eviction) + metric.increment(40); + metric.increment(40); + metric.increment(40); + + Thread.sleep(550); + + assertEquals(9, metric.getNumOps()); + assertEquals(30f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(40, metric.getMax()); + assertEquals(20, metric.getMin()); + + } + +} Index: src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java (revision 0) @@ -0,0 +1,300 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; + +/** + * Used to track aggregates (sum/count/avg/max/min) of a metric over a sliding + * window of time. + *
+ * To create, specify a bucket size (in milliseconds) and the number of buckets + * in the window. + *
+ * For example, to track a metric over a one hour window with a granularity + * of one minute, use a bucket size of 60000 and 60 buckets. + *
+ * Implemented as a Chore so this uses an internal background thread.
+ */
+public class SlidingWindowMetric extends Chore {
+
+ /** Size of each bucket, in milliseconds */
+ private final int bucketSize;
+
+ /** Maximum number of buckets in the sliding window */
+ private final int maxBuckets;
+
+ /** List of active buckets */
+ private final LinkedList
+ * The metrics emitted will be for a period of time equal to the bucket size
+ * times the number of buckets (ie. 1 minute bucket size * 60 buckets will
+ * yield a 1 hour rolling window metric).
+ *
+ * The sliding window begins when this is initialized via {@link #start()}.
+ *
+ * @param bucketSize time span of each bucket, in milliseconds
+ * @param numBuckets total number of buckets in the sliding window
+ * @param name metric name (used for naming thread)
+ * @param stopper stoppable so thread can be shutdown
+ */
+ public SlidingWindowMetric(int bucketSize, int numBuckets, String name,
+ Stoppable stopper) {
+ super("SlidingWindowMetric-" + name, bucketSize, stopper);
+ this.bucketSize = bucketSize;
+ this.maxBuckets = numBuckets;
+ this.currentBucket = new Bucket();
+ buckets.add(currentBucket);
+ }
+
+ /**
+ * Increment the metric by the specified value.
+ * @param value
+ */
+ public void increment(long value) {
+ currentBucket.increment(value);
+ }
+
+ /**
+ * Gets the number of operations for this metric over the current window.
+ * @return total number of operations
+ */
+ public long getNumOps() {
+ return totals.getCount();
+ }
+
+ /**
+ * Gets the average value for this metric over the current window.
+ * @return average value of operations
+ */
+ public float getAverage() {
+ return totals.getAverage();
+ }
+
+ /**
+ * Gets the maximum value for this metric over the current window.
+ * @return maximum value of operations
+ */
+ public long getMax() {
+ return totals.getMax();
+ }
+
+ /**
+ * Gets the minimum value for this metric over the current window.
+ * @return minimum value of operations
+ */
+ public long getMin() {
+ return totals.getMin();
+ }
+
+ /**
+ * The maximum window size for the reported metrics.
+ * @return maximum size of time window, in milliseconds
+ */
+ public long getMaximumWindowSize() {
+ return bucketSize * maxBuckets;
+ }
+
+ /**
+ * The current window size for the reported metrics.
+ * @return current size of time window, in milliseconds
+ */
+ public long getCurrentWindowSize() {
+ return bucketSize * numBuckets;
+ }
+
+ /**
+ * Checks whether the time window is full.
+ *
+ * Can be used to guard emitting this metric until the window has filled
+ * completely, otherwise could get erratic behavior at the start.
+ * @return true if the time window is full, false if not
+ */
+ public boolean isWindowFull() {
+ return numBuckets == maxBuckets;
+ }
+
+ /**
+ * Bucket rolling thread.
+ */
+ @Override
+ protected void chore() {
+ rollBucket();
+ }
+
+ /**
+ * Roll the time window bucket. Change the current bucket to a new bucket and
+ * if the window is full, evict the oldest bucket.
+ */
+ void rollBucket() {
+
+ // Make a new bucket and set it as the new current bucket
+ Bucket nextBucket = new Bucket();
+ currentBucket = nextBucket;
+
+ // Update totals with the current set of buckets
+ updateTotals();
+
+ // Add new bucket to the end of the list
+ buckets.add(nextBucket);
+
+ // If we are too big, evict the oldest bucket
+ if (buckets.size() > maxBuckets) buckets.removeFirst();
+ else numBuckets++;
+ }
+
+ /**
+ * Roll the time window bucket. Change the current bucket to a new bucket and
+ * if the window is full, evict the oldest bucket.
+ */
+ void rollBucketOld() {
+ // Make a new bucket and add it to the end of the list
+ Bucket nextBucket = new Bucket();
+ buckets.add(nextBucket);
+
+ // Grab the write lock and switch the current bucket
+ lock.writeLock().lock();
+ currentBucket = nextBucket;
+
+ // If we are too big, evict the oldest bucket
+ if (buckets.size() > maxBuckets) buckets.removeFirst();
+ else numBuckets++;
+
+ // Unlock
+ lock.writeLock().unlock();
+
+ // Update totals outside of lock. Reads currently do not lock and operate
+ // on AtomicLongs in the bucket. So this will be faster but you could get
+ // a "partial" read of an update.
+ updateTotals();
+ }
+
+ /**
+ * Updates the totals bucket which is used for the actual emitted values.
+ *
+ * Done without a lock so possible a reader gets a partially updated set
+ * of metrics.
+ */
+ private void updateTotals() {
+ long count=0, sum=0, max=0, min=Long.MAX_VALUE;
+ for (Bucket bucket : buckets) {
+ count += bucket.getCount();
+ sum += bucket.getValue();
+ if (bucket.getMax() > max) max = bucket.getMax();
+ if (bucket.getMin() < min) min = bucket.getMin();
+ }
+ totals.count.set(count);
+ totals.sum.set(sum);
+ totals.max.set(max);
+ totals.min.set(min);
+ }
+
+ /**
+ * A time bucket to track the sum/count/max/min of incoming metric increments.
+ */
+ private static class Bucket {
+
+ /** The aggregate value (sum) */
+ private AtomicLong sum = new AtomicLong(0);
+
+ /** The number of operations (count) */
+ private AtomicLong count = new AtomicLong(0);
+
+ /** The maximum value */
+ private AtomicLong max = new AtomicLong(0);
+
+ /** The minimum value */
+ private AtomicLong min = new AtomicLong(Long.MAX_VALUE);
+
+ /**
+ * Increments the bucket by the specified value. Handles tracking of
+ * sum, count, max, and min.
+ * @param value
+ */
+ public void increment(long value) {
+ sum.addAndGet(value);
+ count.incrementAndGet();
+ if (value > max.get()) max.set(value);
+ if (value < min.get()) min.set(value);
+ }
+
+ /**
+ * Gets the aggregate value (total sum) for this metric bucket.
+ * @return total sum of metric
+ */
+ public long getValue() {
+ return sum.get();
+ }
+
+ /**
+ * Gets the number of operations for this metric bucket.
+ * @return total number of operations
+ */
+ public long getCount() {
+ return count.get();
+ }
+
+ /**
+ * Gets the maximum metric value for this metric bucket.
+ * @return max value
+ */
+ public long getMax() {
+ return max.get();
+ }
+
+ /**
+ * Gets the minimum metric value for this metric bucket.
+ * @return min value
+ */
+ public long getMin() {
+ return min.get();
+ }
+
+ /**
+ * Gets the average value of an operation for this metric bucket.
+ * @return average value
+ */
+ public float getAverage() {
+ return getCount() == 0 ? 0f : (float)getValue() / (float)getCount();
+ }
+ }
+}
\ No newline at end of file