diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 5ce8145..8c4c048 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -24,8 +24,12 @@ import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -46,6 +50,8 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; +import com.google.common.collect.Lists; + /** * File format for hbase. * A file of sorted key/value pairs. Both keys and values are byte arrays. @@ -139,10 +145,70 @@ public class HFile { DEFAULT_COMPRESSION_ALGORITHM.getName(); // For measuring latency of "typical" reads and writes - static volatile AtomicLong readOps = new AtomicLong(); - static volatile AtomicLong readTimeNano = new AtomicLong(); - static volatile AtomicLong writeOps = new AtomicLong(); - static volatile AtomicLong writeTimeNano = new AtomicLong(); + private static final AtomicLong readOps = new AtomicLong(); + private static final AtomicLong readTimeNano = new AtomicLong(); + private static final AtomicLong writeOps = new AtomicLong(); + private static final AtomicLong writeTimeNano = new AtomicLong(); + + // For getting more detailed stats on FS latencies + // If, for some reason, the metrics subsystem stops polling for latencies, + // I don't want data to pile up in a memory leak + // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing, + // fs latency stats will be dropped (and this behavior will be logged) + private static final int LATENCY_BUFFER_SIZE = 5000; + private static final BlockingQueue fsReadLatenciesNanos = new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsWriteLatenciesNanos = new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final AtomicLong lastLoggedDataDrop = new AtomicLong(0); + + // we don't want to fill up the logs with this message, so only log it once every 30 seconds at most + // I want to avoid locks on the 'critical path' - hence the CAS + private static void logDroppedLatencyStat() { + final long now = System.currentTimeMillis(); + final long earliestAcceptableLog = now - TimeUnit.SECONDS.toMillis(30L); + while (true) { + final long lastLog = lastLoggedDataDrop.get(); + if (lastLog < earliestAcceptableLog) { + if (lastLoggedDataDrop.compareAndSet(lastLog, now)) { + LOG.warn("Dropping fs latency stats since buffer is full"); + break; + } // otherwise (if the compaseAndSet failed) the while loop retries + } else { + break; + } + } + } + + public static final void offerReadLatency(long latencyNanos) { + final boolean stored = fsReadLatenciesNanos.offer(latencyNanos); + if (!stored) { + logDroppedLatencyStat(); + } + + readTimeNano.addAndGet(latencyNanos); + readOps.incrementAndGet(); + } + + public static final void offerWriteLatency(long latencyNanos) { + final boolean stored = fsWriteLatenciesNanos.offer(latencyNanos); + if (!stored) { + logDroppedLatencyStat(); + } + + writeTimeNano.addAndGet(latencyNanos); + writeOps.incrementAndGet(); + } + + public static final Collection getReadLatenciesNanos() { + final List latencies = Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size()); + fsReadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getWriteLatenciesNanos() { + final List latencies = Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size()); + fsWriteLatenciesNanos.drainTo(latencies); + return latencies; + } public static final long getReadOps() { return readOps.getAndSet(0); diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index ee9f44a..0f544c0 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -238,10 +238,10 @@ public class HFileReaderV1 extends AbstractHFileReader { nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block), true); hfileBlock.expectType(BlockType.META); - - HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.readOps.incrementAndGet(); - + + final long latency = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(latency); + // Cache the block if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) { cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, @@ -313,9 +313,9 @@ public class HFileReaderV1 extends AbstractHFileReader { hfileBlock.expectType(BlockType.DATA); ByteBuffer buf = hfileBlock.getBufferWithoutHeader(); - HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.readOps.incrementAndGet(); - + final long latency = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(latency); + // Cache the block if (cacheBlock && cacheConf.shouldCacheBlockOnRead( hfileBlock.getBlockType().getCategory())) { diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 8c20bf8..a3b52d5 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -201,9 +201,9 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, -1, true); - HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.readOps.incrementAndGet(); - + final long latency = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(latency); + // Cache the block if (cacheBlock) { cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, @@ -267,9 +267,9 @@ public class HFileReaderV2 extends AbstractHFileReader { onDiskBlockSize, -1, pread); BlockCategory blockCategory = hfileBlock.getBlockType().getCategory(); - HFile.readTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.readOps.incrementAndGet(); - + final long latency = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(latency); + // Cache the block if (cacheBlock && cacheConf.shouldCacheBlockOnRead( hfileBlock.getBlockType().getCategory())) { diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java index 5b9c230..8ce27fb 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java @@ -195,9 +195,8 @@ public class HFileWriterV1 extends AbstractHFileWriter { blockDataSizes.add(Integer.valueOf(size)); this.totalUncompressedBytes += size; - HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.writeOps.incrementAndGet(); - + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); + if (cacheConf.shouldCacheDataOnWrite()) { baosDos.flush(); byte[] bytes = baos.toByteArray(); diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 83b7b8a..d6a56da 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -227,9 +227,8 @@ public class HFileWriterV2 extends AbstractHFileWriter { onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.writeOps.incrementAndGet(); - + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); + if (cacheConf.shouldCacheDataOnWrite()) { cacheConf.getBlockCache().cacheBlock( HFile.getBlockCacheKey(name, lastDataBlockOffset), diff --git src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java new file mode 100644 index 0000000..073e854 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.cliffc.high_scale_lib.Counter; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.MapMaker; + +public class ExactCounterMetric extends MetricsBase { + + private static final int DEFAULT_TOP_N = 5; // publish stats on the TOP_N items + private final int topN; + private final Map counts; + + // all access to the 'counts' map should use this lock. + // take a write lock iff you want to guarantee exclusive access + // (the map stripes locks internally, so it's already thread safe - + // this lock is just so you can take a consistent snapshot of data) + private final ReadWriteLock lock; + + + /** + * Constructor to create a new counter metric + * @param nam - the name to publish this metric under + * @param registry - where the metrics object will be registered + * @param description metrics description + * @param topN - how many 'keys' to publish metrics on + */ + public ExactCounterMetric(final String nam, final MetricsRegistry registry, final String description, int topN) { + super(nam, description); + + this.counts = new MapMaker().makeComputingMap(new Function() { + @Override + public Counter apply(String input) { + return new Counter(); + } + }); + + this.lock = new ReentrantReadWriteLock(); + this.topN = topN; + + if (registry != null) { + registry.add(nam, this); + } + } + + /** + * Constructor - create a new exact counter metric + * @param nam the name of the metrics to be used to publish the metric + * @param registry - where the metrics object will be registered + */ + public ExactCounterMetric(final String nam, MetricsRegistry registry) { + this(nam, registry, NO_DESCRIPTION, DEFAULT_TOP_N); + } + + + public void update(String type) { + this.lock.readLock().lock(); + try { + this.counts.get(type).increment(); + } finally { + this.lock.readLock().unlock(); + } + } + + public void update(String type, long count) { + this.lock.readLock().lock(); + try { + this.counts.get(type).add(count); + } finally { + this.lock.readLock().unlock(); + } + } + + public List> getTop(int n) { + final List> countsSnapshot = Lists.newArrayListWithCapacity(this.counts.size()); + this.lock.writeLock().lock(); // no updates are allowed while I'm holding this lock, so move fast + try { + for(Entry entry : this.counts.entrySet()) { + countsSnapshot.add(Pair.newPair(entry.getKey(), entry.getValue().get())); + } + } finally { + this.lock.writeLock().unlock(); + } + + Collections.sort(countsSnapshot, new Comparator>() { + @Override + public int compare(Pair a, Pair b) { + return b.getSecond().compareTo(a.getSecond()); + } + }); + + return countsSnapshot.subList(0, Math.min(n, countsSnapshot.size())); + } + + @Override + public void pushMetric(MetricsRecord mr) { + final List> topKeys = getTop(Integer.MAX_VALUE); + int sum = 0; + + int counter = 0; + for (Pair keyCount : topKeys) { + counter++; + // only push stats on the topN keys + if (counter <= this.topN) { + mr.setMetric(getName() + "_" + keyCount.getFirst(), keyCount.getSecond()); + } + sum += keyCount.getSecond(); + } + mr.setMetric(getName() + "_map_size", this.counts.size()); + mr.setMetric(getName() + "_total_count", sum); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java new file mode 100644 index 0000000..210dcaf --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * An exponentially-decaying random sample of {@code long}s. Uses Cormode et al's forward-decaying + * priority reservoir sampling method to produce a statistically representative sample, + * exponentially biased towards newer entries. + * + * @see + * Cormode et al. Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09 + */ +public class ExponentiallyDecayingSample implements Sample { + + private static final Random RANDOM = new Random(); + + private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1); + private final ConcurrentSkipListMap values; + private final ReentrantReadWriteLock lock; + private final double alpha; + private final int reservoirSize; + private final AtomicLong count = new AtomicLong(0); + private volatile long startTime; + private final AtomicLong nextScaleTime = new AtomicLong(0); + + /** + * Creates a new {@link ExponentiallyDecayingSample}. + * + * @param reservoirSize the number of samples to keep in the sampling reservoir + * @param alpha the exponential decay factor; the higher this is, the more biased the + * sample will be towards newer values + */ + public ExponentiallyDecayingSample(int reservoirSize, double alpha) { + this.values = new ConcurrentSkipListMap(); + this.lock = new ReentrantReadWriteLock(); + this.alpha = alpha; + this.reservoirSize = reservoirSize; + clear(); + } + + @Override + public void clear() { + lockForRescale(); + try { + values.clear(); + count.set(0); + this.startTime = tick(); + nextScaleTime.set(System.nanoTime() + RESCALE_THRESHOLD); + } finally { + unlockForRescale(); + } + } + + @Override + public int size() { + return (int) Math.min(reservoirSize, count.get()); + } + + @Override + public void update(long value) { + update(value, tick()); + } + + /** + * Adds an old value with a fixed timestamp to the sample. + * + * @param value the value to be added + * @param timestamp the epoch timestamp of {@code value} in seconds + */ + public void update(long value, long timestamp) { + lockForRegularUsage(); + try { + final double priority = weight(timestamp - startTime) / RANDOM.nextDouble(); + final long newCount = count.incrementAndGet(); + if (newCount <= reservoirSize) { + values.put(priority, value); + } else { + Double first = values.firstKey(); + if (first < priority) { + if (values.putIfAbsent(priority, value) == null) { + // ensure we always remove an item + while (values.remove(first) == null) { + first = values.firstKey(); + } + } + } + } + } finally { + unlockForRegularUsage(); + } + + final long now = System.nanoTime(); + final long next = nextScaleTime.get(); + if (now >= next) { + rescale(now, next); + } + } + + @Override + public Snapshot getSnapshot() { + lockForRegularUsage(); + try { + return new Snapshot(values.values()); + } finally { + unlockForRegularUsage(); + } + } + + private long tick() { + return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + } + + private double weight(long t) { + return Math.exp(alpha * t); + } + + /* "A common feature of the above techniques—indeed, the key technique that + * allows us to track the decayed weights efficiently—is that they maintain + * counts and other quantities based on g(ti − L), and only scale by g(t − L) + * at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero + * and one, the intermediate values of g(ti − L) could become very large. For + * polynomial functions, these values should not grow too large, and should be + * effectively represented in practice by floating point values without loss of + * precision. For exponential functions, these values could grow quite large as + * new values of (ti − L) become large, and potentially exceed the capacity of + * common floating point types. However, since the values stored by the + * algorithms are linear combinations of g values (scaled sums), they can be + * rescaled relative to a new landmark. That is, by the analysis of exponential + * decay in Section III-A, the choice of L does not affect the final result. We + * can therefore multiply each value based on L by a factor of exp(−α(L′ − L)), + * and obtain the correct value as if we had instead computed relative to a new + * landmark L′ (and then use this new L′ at query time). This can be done with + * a linear pass over whatever data structure is being used." + */ + private void rescale(long now, long next) { + if (nextScaleTime.compareAndSet(next, now + RESCALE_THRESHOLD)) { + lockForRescale(); + try { + final long oldStartTime = startTime; + this.startTime = tick(); + final ArrayList keys = new ArrayList(values.keySet()); + for (Double key : keys) { + final Long value = values.remove(key); + values.put(key * Math.exp(-alpha * (startTime - oldStartTime)), value); + } + } finally { + unlockForRescale(); + } + } + } + + private void unlockForRescale() { + lock.writeLock().unlock(); + } + + private void lockForRescale() { + lock.writeLock().lock(); + } + + private void lockForRegularUsage() { + lock.readLock().lock(); + } + + private void unlockForRegularUsage() { + lock.readLock().unlock(); + } +} diff --git src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java new file mode 100644 index 0000000..e0f77f4 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +public class MetricsHistogram extends MetricsBase { + + // 1028 items implies 99.9% CI w/ 5% margin of error (assuming a normal distribution on the underlying data) + private static final int DEFAULT_SAMPLE_SIZE = 1028; + + // the bias towards sampling from more recent data. + // Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes + private static final double DEFAULT_ALPHA = 0.015; + + /** + * Constructor to create a new histogram metric + * @param nam - the name of the metrics to be used to publish the metric + * @param registry - where the metrics object will be registered + * @param description metrics description + * @param forwardBiased - true if you want this histogram to give more 'weight' to recent data, + * false if you want all data to have uniform weight + */ + public MetricsHistogram(final String nam, final MetricsRegistry registry, + final String description, boolean forwardBiased) { + super(nam, description); + + this.min = new AtomicLong(); + this.max = new AtomicLong(); + this.sum = new AtomicLong(); + this.sample = forwardBiased ? + new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA) : new UniformSample(DEFAULT_SAMPLE_SIZE); + + this.variance = new AtomicReference(new double[]{-1, 0}); + this.count = new AtomicLong(); + + this.clear(); + + if (registry != null) { + registry.add(nam, this); + } + } + + /** + * Constructor - create a new (forward biased) histogram metric + * @param nam the name of the metrics to be used to publish the metric + * @param registry - where the metrics object will be registered + * @param description - metrics description + */ + public MetricsHistogram(final String nam, MetricsRegistry registry, final String description) { + this(nam, registry, NO_DESCRIPTION, true); + } + + /** + * Constructor - create a new (forward biased) histogram metric + * @param nam the name of the metrics to be used to publish the metric + * @param registry - where the metrics object will be registered + */ + public MetricsHistogram(final String nam, MetricsRegistry registry) { + this(nam, registry, NO_DESCRIPTION); + } + + private final Sample sample; + private final AtomicLong min; + private final AtomicLong max; + private final AtomicLong sum; + + // these are for computing a running-variance, without letting floating point errors accumulate via Welford's algo. + private final AtomicReference variance; + private final AtomicLong count; + + /** + * Clears all recorded values. + */ + public void clear() { + this.sample.clear(); + this.count.set(0); + this.max.set(Long.MIN_VALUE); + this.min.set(Long.MAX_VALUE); + this.sum.set(0); + variance.set(new double[]{-1, 0}); + } + + public void update(int val) { + update((long) val); + } + + public void update(final long val) { + count.incrementAndGet(); + sample.update(val); + setMax(val); + setMin(val); + sum.getAndAdd(val); + updateVariance(val); + } + + private void setMax(final long potentialMax) { + boolean done = false; + while (!done) { + final long currentMax = max.get(); + done = currentMax >= potentialMax || max.compareAndSet(currentMax, potentialMax); + } + } + + private void setMin(long potentialMin) { + boolean done = false; + while (!done) { + final long currentMin = min.get(); + done = currentMin <= potentialMin || min.compareAndSet(currentMin, potentialMin); + } + } + + private void updateVariance(long value) { + boolean done = false; + while (!done) { + final double[] oldValues = variance.get(); + final double[] newValues = new double[2]; + if (oldValues[0] == -1) { + newValues[0] = value; + newValues[1] = 0; + } else { + final double oldM = oldValues[0]; + final double oldS = oldValues[1]; + + final double newM = oldM + ((value - oldM) / getCount()); + final double newS = oldS + ((value - oldM) * (value - newM)); + + newValues[0] = newM; + newValues[1] = newS; + } + done = variance.compareAndSet(oldValues, newValues); + } + } + + + public long getCount() { + return count.get(); + } + + public long getMax() { + if (getCount() > 0) { + return max.get(); + } + return 0L; + } + + public long getMin() { + if (getCount() > 0) { + return min.get(); + } + return 0L; + } + + public double getMean() { + if (getCount() > 0) { + return sum.get() / (double) getCount(); + } + return 0.0; + } + + public double getStdDev() { + if (getCount() > 0) { + return Math.sqrt(getVariance()); + } + return 0.0; + } + + public Snapshot getSnapshot() { + return sample.getSnapshot(); + } + + private double getVariance() { + if (getCount() <= 1) { + return 0.0; + } + return variance.get()[1] / (getCount() - 1); + } + + @Override + public void pushMetric(MetricsRecord mr) { + final Snapshot s = this.getSnapshot(); + mr.setMetric(getName() + "_num_ops", this.getCount()); + mr.setMetric(getName() + "_min", this.getMin()); + mr.setMetric(getName() + "_max", this.getMax()); + + mr.setMetric(getName() + "_mean", (float) this.getMean()); + mr.setMetric(getName() + "_std_dev", (float) this.getStdDev()); + + mr.setMetric(getName() + "_median", (float) s.getMedian()); + mr.setMetric(getName() + "_75th_percentile", (float) s.get75thPercentile()); + mr.setMetric(getName() + "_95th_percentile", (float) s.get95thPercentile()); + mr.setMetric(getName() + "_99th_percentile", (float) s.get99thPercentile()); + } +} diff --git src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java new file mode 100644 index 0000000..55b91d6 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +/** + * A statistically representative sample of items from a stream. + */ +public interface Sample { + /** + * Clears all recorded values. + */ + void clear(); + + /** + * Returns the number of values recorded. + * + * @return the number of values recorded + */ + int size(); + + /** + * Adds a new recorded value to the sample. + * + * @param value a new recorded value + */ + void update(long value); + + /** + * Returns a snapshot of the sample's values. + * + * @return a snapshot of the sample's values + */ + Snapshot getSnapshot(); +} diff --git src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java new file mode 100644 index 0000000..3f5fd56 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; + +/** + * A snapshot of all the information seen in a Sample. + */ +public class Snapshot { + + private static final double MEDIAN_Q = 0.5; + private static final double P75_Q = 0.75; + private static final double P95_Q = 0.95; + private static final double P98_Q = 0.98; + private static final double P99_Q = 0.99; + private static final double P999_Q = 0.999; + + private final double[] values; + + /** + * Create a new {@link Snapshot} with the given values. + * + * @param values an unordered set of values in the sample + */ + public Snapshot(Collection values) { + final Object[] copy = values.toArray(); + this.values = new double[copy.length]; + for (int i = 0; i < copy.length; i++) { + this.values[i] = (Long) copy[i]; + } + Arrays.sort(this.values); + } + + /** + * Create a new {@link Snapshot} with the given values. + * + * @param values an unordered set of values in the sample + */ + public Snapshot(double[] values) { + this.values = new double[values.length]; + System.arraycopy(values, 0, this.values, 0, values.length); + Arrays.sort(this.values); + } + + /** + * Returns the value at the given quantile. + * + * @param quantile a given quantile, in [0..1] + * @return the value in the distribution at quantile + */ + public double getValue(double quantile) { + if (quantile < 0.0 || quantile > 1.0) { + throw new IllegalArgumentException(quantile + " is not in [0..1]"); + } + + if (values.length == 0) { + return 0.0; + } + + final double pos = quantile * (values.length + 1); + + if (pos < 1) { + return values[0]; + } + + if (pos >= values.length) { + return values[values.length - 1]; + } + + final double lower = values[(int) pos - 1]; + final double upper = values[(int) pos]; + return lower + (pos - Math.floor(pos)) * (upper - lower); + } + + /** + * Returns the number of values in the snapshot. + * + * @return the number of values in the snapshot + */ + public int size() { + return values.length; + } + + /** + * Returns the median value in the distribution. + * + * @return the median value in the distribution + */ + public double getMedian() { + return getValue(MEDIAN_Q); + } + + /** + * Returns the value at the 75th percentile in the distribution. + * + * @return the value at the 75th percentile in the distribution + */ + public double get75thPercentile() { + return getValue(P75_Q); + } + + /** + * Returns the value at the 95th percentile in the distribution. + * + * @return the value at the 95th percentile in the distribution + */ + public double get95thPercentile() { + return getValue(P95_Q); + } + + /** + * Returns the value at the 98th percentile in the distribution. + * + * @return the value at the 98th percentile in the distribution + */ + public double get98thPercentile() { + return getValue(P98_Q); + } + + /** + * Returns the value at the 99th percentile in the distribution. + * + * @return the value at the 99th percentile in the distribution + */ + public double get99thPercentile() { + return getValue(P99_Q); + } + + /** + * Returns the value at the 99.9th percentile in the distribution. + * + * @return the value at the 99.9th percentile in the distribution + */ + public double get999thPercentile() { + return getValue(P999_Q); + } + + /** + * Returns the entire set of values in the snapshot. + * + * @return the entire set of values in the snapshot + */ + public double[] getValues() { + return Arrays.copyOf(values, values.length); + } + + /** + * Writes the values of the sample to the given file. + * + * @param output the file to which the values will be written + * @throws IOException if there is an error writing the values + */ + public void dump(File output) throws IOException { + final PrintWriter writer = new PrintWriter(output); + try { + for (double value : values) { + writer.printf("%f\n", value); + } + } finally { + writer.close(); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java new file mode 100644 index 0000000..07c0505 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; + +/** + * A random sample of a stream of {@code long}s. Uses Vitter's Algorithm R to produce a + * statistically representative sample. + * + * @see Random Sampling with a Reservoir + */ +public class UniformSample implements Sample { + + private static final Random RANDOM = new Random(); + private static final int BITS_PER_LONG = 63; + + private final AtomicLong count = new AtomicLong(); + private final AtomicLongArray values; + + /** + * Creates a new {@link UniformSample}. + * + * @param reservoirSize the number of samples to keep in the sampling reservoir + */ + public UniformSample(int reservoirSize) { + this.values = new AtomicLongArray(reservoirSize); + clear(); + } + + @Override + public void clear() { + for (int i = 0; i < values.length(); i++) { + values.set(i, 0); + } + count.set(0); + } + + @Override + public int size() { + final long c = count.get(); + if (c > values.length()) { + return values.length(); + } + return (int) c; + } + + @Override + public void update(long value) { + final long c = count.incrementAndGet(); + if (c <= values.length()) { + values.set((int) c - 1, value); + } else { + final long r = nextLong(c); + if (r < values.length()) { + values.set((int) r, value); + } + } + } + + /** + * Get a pseudo-random long uniformly between 0 and n-1. Stolen from + * {@link java.util.Random#nextInt()}. + * + * @param n the bound + * @return a value select randomly from the range {@code [0..n)}. + */ + private static long nextLong(long n) { + long bits, val; + do { + bits = RANDOM.nextLong() & (~(1L << BITS_PER_LONG)); + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + @Override + public Snapshot getSnapshot() { + final int s = size(); + final List copy = new ArrayList(s); + for (int i = 0; i < s; i++) { + copy.add(values.get(i)); + } + return new Snapshot(copy); + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 06ba6f2..6a7c33d 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1779,12 +1779,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, /** {@inheritDoc} */ public Result get(byte[] regionName, Get get) throws IOException { checkOpen(); + final long startTime = System.nanoTime(); requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); return region.get(get, getLockFromId(get.getLockId())); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.getLatencies.update(System.nanoTime() - startTime); } } @@ -1816,6 +1819,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, throw new IllegalArgumentException("update has null row"); } + final long startTime = System.nanoTime(); checkOpen(); this.requestCount.incrementAndGet(); HRegion region = getRegion(regionName); @@ -1827,6 +1831,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.put(put, getLockFromId(put.getLockId()), writeToWAL); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.putLatencies.update(System.nanoTime() - startTime); } } @@ -1834,6 +1840,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, throws IOException { checkOpen(); HRegion region = null; + int i = 0; + + final long startTime = System.nanoTime(); try { region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { @@ -1843,7 +1852,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @SuppressWarnings("unchecked") Pair[] putsWithLocks = new Pair[puts.size()]; - int i = 0; for (Put p : puts) { Integer lock = getLockFromId(p.getLockId()); putsWithLocks[i++] = new Pair(p, lock); @@ -1859,6 +1867,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return -1; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + // going to count this as puts.size() PUTs for latency calculations + final long totalTime = System.nanoTime() - startTime; + final long putCount = i; + final long perPutTime = totalTime / putCount; + for (int request = 0; request < putCount; request++) { + this.metrics.putLatencies.update(perPutTime); + } } } @@ -2249,6 +2265,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void delete(final byte[] regionName, final Delete delete) throws IOException { checkOpen(); + final long startTime = System.nanoTime(); try { boolean writeToWAL = delete.getWriteToWAL(); this.requestCount.incrementAndGet(); @@ -2260,6 +2277,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.delete(delete, lid, writeToWAL); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.deleteLatencies.update(System.nanoTime() - startTime); } } @@ -2277,10 +2296,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int size = deletes.size(); Integer[] locks = new Integer[size]; for (Delete delete : deletes) { + final long startTime = System.nanoTime(); this.requestCount.incrementAndGet(); locks[i] = getLockFromId(delete.getLockId()); region.delete(delete, locks[i], delete.getWriteToWAL()); i++; + this.metrics.deleteLatencies.update(System.nanoTime() - startTime); } } catch (WrongRegionException ex) { LOG.debug("Batch deletes: " + i, ex); @@ -2788,6 +2809,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public HRegion getFromOnlineRegions(final String encodedRegionName) { HRegion r = null; r = this.onlineRegions.get(encodedRegionName); + + // all accesses to a region (get/put/delete/scan/etc) go through here, so + // this is a (very rough) way to determine which regions are most accessed. + // ideally, we'll later break down accesses by operation type but this will + // do for a first pass + if (r != null) { + this.metrics.regionAccessCounter.update(encodedRegionName); + } + return r; } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java index d512686..56df2dd 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java @@ -22,9 +22,11 @@ package org.apache.hadoop.hbase.regionserver.metrics; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.metrics.ExactCounterMetric; import org.apache.hadoop.hbase.metrics.HBaseInfo; import org.apache.hadoop.hbase.metrics.MetricsRate; import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate; +import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; @@ -113,6 +115,22 @@ public class RegionServerMetrics implements Updater { */ public final MetricsIntValue blockCacheHitCachingRatio = new MetricsIntValue("blockCacheHitCachingRatio", registry); + /** + * a latency histogram on 'get' requests + */ + public final MetricsHistogram getLatencies = new MetricsHistogram("getRequestLatency", registry); + + /** + * a latency histogram on 'delete' requests + */ + public final MetricsHistogram deleteLatencies = new MetricsHistogram("deleteRequestLatency", registry); + + /** + * a latency histogram on 'put' requests + */ + public final MetricsHistogram putLatencies = new MetricsHistogram("putRequestLatency", registry); + + /* * Count of requests to the regionservers since last call to metrics update */ @@ -180,6 +198,24 @@ public class RegionServerMetrics implements Updater { new MetricsIntValue("flushQueueSize", registry); /** + * Metrics on the distribution of filesystem read latencies (improved version of fsReadLatency) + */ + public final MetricsHistogram fsReadLatencyHistogram = new MetricsHistogram("fsReadLatencyHistogram", registry); + + /** + * Metrics on the distribution of filesystem write latencies (improved version of fsWriteLatency) + */ + public final MetricsHistogram fsWriteLatencyHistogram = new MetricsHistogram("fsWriteLatencyHistogram", registry); + + + /** + * Metrics on the distribution of region operations (how many 'operations' on each region). + * I'm using an exact counter since each RegionServer is, at most, responsible for a few hundred regions + * If that weren't the case, we'd have to use a lossy stream counter. + */ + public final ExactCounterMetric regionAccessCounter = new ExactCounterMetric("regionAccessCounter", registry); + + /** * filesystem read latency */ public final MetricsTimeVaryingRate fsReadLatency = @@ -198,6 +234,11 @@ public class RegionServerMetrics implements Updater { new MetricsTimeVaryingRate("fsSyncLatency", registry); /** + * HLog file count + */ + public final MetricsIntValue hlogFileCount = new MetricsIntValue("hlogFileCount", registry); + + /** * time each scheduled compaction takes */ protected final PersistentMetricsTimeVaryingRate compactionTime = @@ -289,7 +330,10 @@ public class RegionServerMetrics implements Updater { this.blockCacheHitRatio.pushMetric(this.metricsRecord); this.blockCacheHitCachingRatio.pushMetric(this.metricsRecord); this.hdfsBlocksLocalityIndex.pushMetric(this.metricsRecord); - + this.putLatencies.pushMetric(this.metricsRecord); + this.deleteLatencies.pushMetric(this.metricsRecord); + this.getLatencies.pushMetric(this.metricsRecord); + // Mix in HFile and HLog metrics // Be careful. Here is code for MTVR from up in hadoop: // public synchronized void inc(final int numOps, final long time) { @@ -309,9 +353,22 @@ public class RegionServerMetrics implements Updater { ops = (int)HLog.getSyncOps(); if (ops != 0) this.fsSyncLatency.inc(ops, HLog.getSyncTime()); + for(Long latency : HFile.getReadLatenciesNanos()) { + this.fsReadLatencyHistogram.update(latency); + } + for(Long latency : HFile.getWriteLatenciesNanos()) { + this.fsWriteLatencyHistogram.update(latency); + } + + this.hlogFileCount.set(HLog.getLogCount()); + + // push the result + this.hlogFileCount.pushMetric(this.metricsRecord); + this.fsReadLatency.pushMetric(this.metricsRecord); this.fsWriteLatency.pushMetric(this.metricsRecord); + this.fsSyncLatency.pushMetric(this.metricsRecord); this.compactionTime.pushMetric(this.metricsRecord); this.compactionSize.pushMetric(this.metricsRecord); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 4bf47bc..ed69384 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -234,6 +234,7 @@ public class HLog implements Syncable { * Keep the number of logs tidy. */ private final int maxLogs; + /** * Thread that handles optional sync'ing @@ -267,6 +268,15 @@ public class HLog implements Syncable { private static volatile long syncOps; private static volatile long syncTime; + // For measuring the current number of HLog files + private static volatile int logCount = 0; + + public static int getLogCount() { + // the logCount var is updated from the logFiles map, which doesn't include the current hlog file + // hence the + 1 + return logCount + 1; + } + public static long getWriteOps() { long ret = writeOps; writeOps = 0; @@ -712,8 +722,9 @@ public class HLog implements Syncable { // If too many log files, figure which regions we need to flush. // Array is an array of encoded region names. byte [][] regions = null; - int logCount = this.outputfiles == null? 0: this.outputfiles.size(); - if (logCount > this.maxLogs && logCount > 0) { + int curLogCount = this.outputfiles == null? 0: this.outputfiles.size(); + this.logCount = curLogCount; + if (curLogCount > this.maxLogs && curLogCount > 0) { // This is an array of encoded region names. regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), this.lastSeqWritten); @@ -723,7 +734,7 @@ public class HLog implements Syncable { if (i > 0) sb.append(", "); sb.append(Bytes.toStringBinary(regions[i])); } - LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + + LOG.info("Too many hlogs: logs=" + curLogCount + ", maxlogs=" + this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); } diff --git src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java new file mode 100644 index 0000000..200e0e2 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java @@ -0,0 +1,49 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; + +public class TestExactCounterMetric { + + @Test + public void testBasic() { + final ExactCounterMetric counter = new ExactCounterMetric("testCounter", null); + for (int i = 1; i <= 10; i++) { + for (int j = 0; j < i; j++) { + counter.update(i + ""); + } + } + + List> topFive = counter.getTop(5); + Long i = 10L; + for (Pair entry : topFive) { + Assert.assertEquals(i + "", entry.getFirst()); + Assert.assertEquals(i, entry.getSecond()); + i--; + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java new file mode 100644 index 0000000..001f1c7 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java @@ -0,0 +1,77 @@ +package org.apache.hadoop.hbase.metrics; + +import java.util.Arrays; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram; +import org.apache.hadoop.hbase.metrics.histogram.Snapshot; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class TestMetricsHistogram { + + @Test + public void testBasicUniform() { + MetricsHistogram h = new MetricsHistogram("testHistogram", null); + + for (int i = 0; i < 100; i++) { + h.update(i); + } + + Assert.assertEquals(100, h.getCount()); + Assert.assertEquals(0, h.getMin()); + Assert.assertEquals(99, h.getMax()); + } + + private static int safeIndex(int i, int len) { + if (i < len && i>= 0) { + return i; + } else if (i >= len) { + return len - 1; + } else { + return 0; + } + } + + @Test + public void testRandom() { + final Random r = new Random(); + final MetricsHistogram h = new MetricsHistogram("testHistogram", null); + + final long[] data = new long[1000]; + + for (int i = 0; i < data.length; i++) { + data[i] = (long) (r.nextGaussian() * 10000.0); + h.update(data[i]); + } + + final Snapshot s = h.getSnapshot(); + Arrays.sort(data); + + // as long as the histogram chooses an item with index N+/-slop, we'll accept it + final int slop = 20; + + // make sure the median, 75th percentile and 95th percentile are good + final int medianIndex = data.length / 2; + final long minAcceptableMedian = data[safeIndex(medianIndex - slop, data.length)]; + final long maxAcceptableMedian = data[safeIndex(medianIndex + slop, data.length)]; + Assert.assertTrue(s.getMedian() >= minAcceptableMedian + && s.getMedian() <= maxAcceptableMedian); + + final int seventyFifthIndex = (int) (data.length * 0.75); + final long minAcceptableseventyFifth = data[safeIndex(seventyFifthIndex - slop, data.length)]; + final long maxAcceptableseventyFifth = data[safeIndex(seventyFifthIndex + slop, data.length)]; + Assert.assertTrue(s.get75thPercentile() >= minAcceptableseventyFifth + && s.get75thPercentile() <= maxAcceptableseventyFifth); + + final int ninetyFifthIndex = (int) (data.length * 0.95); + final long minAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex - slop, data.length)]; + final long maxAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex + slop, data.length)]; + Assert.assertTrue(s.get95thPercentile() >= minAcceptableninetyFifth + && s.get95thPercentile() <= maxAcceptableninetyFifth); + + } +}