Index: src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/TestSlidingWindowMetric.java (revision 0) @@ -0,0 +1,165 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.*; + +import org.apache.hadoop.hbase.Stoppable; +import org.junit.Test; + +/** + * Tests the sliding window metric tracker. + */ +public class TestSlidingWindowMetric { + + @Test + public void testSlidingByTrigger() throws Exception { + + SlidingWindowMetric metric = + new SlidingWindowMetric(1000, 3, "testSlidingByTrigger", null); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + // 3 * 10 + metric.increment(10); + metric.increment(10); + metric.increment(10); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + metric.rollBucket(); + + assertEquals(3, metric.getNumOps()); + assertEquals(10f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(10, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 20 + metric.increment(20); + metric.increment(20); + metric.increment(20); + + metric.rollBucket(); + + assertEquals(6, metric.getNumOps()); + assertEquals(15f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(20, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 30 + metric.increment(30); + metric.increment(30); + metric.increment(30); + + metric.rollBucket(); + + assertEquals(9, metric.getNumOps()); + assertEquals(20f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(30, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 40 (and we expect an eviction) + metric.increment(40); + metric.increment(40); + metric.increment(40); + + metric.rollBucket(); + + assertEquals(9, metric.getNumOps()); + assertEquals(30f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(40, metric.getMax()); + assertEquals(20, metric.getMin()); + } + + @Test + public void testSlidingByTime() throws Exception { + + SlidingWindowMetric metric = + new SlidingWindowMetric(500, 3, "testSlidingByTime", + new Stoppable() { + @Override + public void stop(String why) {} + @Override + public boolean isStopped() {return false;} + }); + metric.start(); + + assertEquals(0, metric.getNumOps()); + assertEquals(0, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(0, metric.getMax()); + assertEquals(Long.MAX_VALUE, metric.getMin()); + + // 3 * 10 + metric.increment(10); + metric.increment(10); + metric.increment(10); + + Thread.sleep(550); + + assertEquals(3, metric.getNumOps()); + assertEquals(10f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(10, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 20 + metric.increment(20); + metric.increment(20); + metric.increment(20); + + Thread.sleep(550); + + assertEquals(6, metric.getNumOps()); + assertEquals(15f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(20, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 30 + metric.increment(30); + metric.increment(30); + metric.increment(30); + + Thread.sleep(550); + + assertEquals(9, metric.getNumOps()); + assertEquals(20f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(30, metric.getMax()); + assertEquals(10, metric.getMin()); + + // 3 * 40 (and we expect an eviction) + metric.increment(40); + metric.increment(40); + metric.increment(40); + + Thread.sleep(550); + + assertEquals(9, metric.getNumOps()); + assertEquals(30f, metric.getAverage(), Double.POSITIVE_INFINITY); + assertEquals(40, metric.getMax()); + assertEquals(20, metric.getMin()); + + } + +} Index: src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/SlidingWindowMetric.java (revision 0) @@ -0,0 +1,300 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; + +/** + * Used to track aggregates (sum/count/avg/max/min) of a metric over a sliding + * window of time. + *

+ * To create, specify a bucket size (in milliseconds) and the number of buckets + * in the window. + *

+ * For example, to track a metric over a one hour window with a granularity + * of one minute, use a bucket size of 60000 and 60 buckets. + *

+ * Implemented as a Chore so this uses an internal background thread. + */ +public class SlidingWindowMetric extends Chore { + + /** Size of each bucket, in milliseconds */ + private final int bucketSize; + + /** Maximum number of buckets in the sliding window */ + private final int maxBuckets; + + /** List of active buckets */ + private final LinkedList buckets = new LinkedList(); + + /** Current number of buckets in the sliding window */ + private int numBuckets; + + /** Currently incremented bucket */ + private Bucket currentBucket; + + /** Current window values, initially all 0s */ + private Bucket totals = new Bucket(); + + /** Read-Write lock for... nothing right now */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + + /** + * Construct a new sliding window metric with the specified bucket size and + * total number of buckets. + *

+ * The metrics emitted will be for a period of time equal to the bucket size + * times the number of buckets (ie. 1 minute bucket size * 60 buckets will + * yield a 1 hour rolling window metric). + *

+ * The sliding window begins when this is initialized via {@link #start()}. + * + * @param bucketSize time span of each bucket, in milliseconds + * @param numBuckets total number of buckets in the sliding window + * @param name metric name (used for naming thread) + * @param stopper stoppable so thread can be shutdown + */ + public SlidingWindowMetric(int bucketSize, int numBuckets, String name, + Stoppable stopper) { + super("SlidingWindowMetric-" + name, bucketSize, stopper); + this.bucketSize = bucketSize; + this.maxBuckets = numBuckets; + this.currentBucket = new Bucket(); + buckets.add(currentBucket); + } + + /** + * Increment the metric by the specified value. + * @param value + */ + public void increment(long value) { + currentBucket.increment(value); + } + + /** + * Gets the number of operations for this metric over the current window. + * @return total number of operations + */ + public long getNumOps() { + return totals.getCount(); + } + + /** + * Gets the average value for this metric over the current window. + * @return average value of operations + */ + public float getAverage() { + return totals.getAverage(); + } + + /** + * Gets the maximum value for this metric over the current window. + * @return maximum value of operations + */ + public long getMax() { + return totals.getMax(); + } + + /** + * Gets the minimum value for this metric over the current window. + * @return minimum value of operations + */ + public long getMin() { + return totals.getMin(); + } + + /** + * The maximum window size for the reported metrics. + * @return maximum size of time window, in milliseconds + */ + public long getMaximumWindowSize() { + return bucketSize * maxBuckets; + } + + /** + * The current window size for the reported metrics. + * @return current size of time window, in milliseconds + */ + public long getCurrentWindowSize() { + return bucketSize * numBuckets; + } + + /** + * Checks whether the time window is full. + *

+ * Can be used to guard emitting this metric until the window has filled + * completely, otherwise could get erratic behavior at the start. + * @return true if the time window is full, false if not + */ + public boolean isWindowFull() { + return numBuckets == maxBuckets; + } + + /** + * Bucket rolling thread. + */ + @Override + protected void chore() { + rollBucket(); + } + + /** + * Roll the time window bucket. Change the current bucket to a new bucket and + * if the window is full, evict the oldest bucket. + */ + void rollBucket() { + + // Make a new bucket and set it as the new current bucket + Bucket nextBucket = new Bucket(); + currentBucket = nextBucket; + + // Update totals with the current set of buckets + updateTotals(); + + // Add new bucket to the end of the list + buckets.add(nextBucket); + + // If we are too big, evict the oldest bucket + if (buckets.size() > maxBuckets) buckets.removeFirst(); + else numBuckets++; + } + + /** + * Roll the time window bucket. Change the current bucket to a new bucket and + * if the window is full, evict the oldest bucket. + */ + void rollBucketOld() { + // Make a new bucket and add it to the end of the list + Bucket nextBucket = new Bucket(); + buckets.add(nextBucket); + + // Grab the write lock and switch the current bucket + lock.writeLock().lock(); + currentBucket = nextBucket; + + // If we are too big, evict the oldest bucket + if (buckets.size() > maxBuckets) buckets.removeFirst(); + else numBuckets++; + + // Unlock + lock.writeLock().unlock(); + + // Update totals outside of lock. Reads currently do not lock and operate + // on AtomicLongs in the bucket. So this will be faster but you could get + // a "partial" read of an update. + updateTotals(); + } + + /** + * Updates the totals bucket which is used for the actual emitted values. + *

+ * Done without a lock so possible a reader gets a partially updated set + * of metrics. + */ + private void updateTotals() { + long count=0, sum=0, max=0, min=Long.MAX_VALUE; + for (Bucket bucket : buckets) { + count += bucket.getCount(); + sum += bucket.getValue(); + if (bucket.getMax() > max) max = bucket.getMax(); + if (bucket.getMin() < min) min = bucket.getMin(); + } + totals.count.set(count); + totals.sum.set(sum); + totals.max.set(max); + totals.min.set(min); + } + + /** + * A time bucket to track the sum/count/max/min of incoming metric increments. + */ + private static class Bucket { + + /** The aggregate value (sum) */ + private AtomicLong sum = new AtomicLong(0); + + /** The number of operations (count) */ + private AtomicLong count = new AtomicLong(0); + + /** The maximum value */ + private AtomicLong max = new AtomicLong(0); + + /** The minimum value */ + private AtomicLong min = new AtomicLong(Long.MAX_VALUE); + + /** + * Increments the bucket by the specified value. Handles tracking of + * sum, count, max, and min. + * @param value + */ + public void increment(long value) { + sum.addAndGet(value); + count.incrementAndGet(); + if (value > max.get()) max.set(value); + if (value < min.get()) min.set(value); + } + + /** + * Gets the aggregate value (total sum) for this metric bucket. + * @return total sum of metric + */ + public long getValue() { + return sum.get(); + } + + /** + * Gets the number of operations for this metric bucket. + * @return total number of operations + */ + public long getCount() { + return count.get(); + } + + /** + * Gets the maximum metric value for this metric bucket. + * @return max value + */ + public long getMax() { + return max.get(); + } + + /** + * Gets the minimum metric value for this metric bucket. + * @return min value + */ + public long getMin() { + return min.get(); + } + + /** + * Gets the average value of an operation for this metric bucket. + * @return average value + */ + public float getAverage() { + return getCount() == 0 ? 0f : (float)getValue() / (float)getCount(); + } + } +} \ No newline at end of file