diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java index 2fa6d30a9ee..b935f03754f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.records.timelineservice; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -54,7 +55,7 @@ // persisted (like a "transient" member). private TimelineMetricOperation realtimeAggregationOp = TimelineMetricOperation.NOP; - + private TimelineMetricAggregator aggregator; private TreeMap values = new TreeMap<>(Collections.reverseOrder()); @@ -66,7 +67,6 @@ public TimelineMetric(Type type) { this.type = type; } - @XmlElement(name = "type") public Type getType() { return type; @@ -115,6 +115,13 @@ public void setRealtimeAggregationOp( } public Map getValues() { + if (aggregator != null) { // indicates this is an aggregated metric + long i = 0; + values.clear(); + for (Number val : aggregator.getAggregatedValues()) { + values.put(i++, val); + } + } return values; } @@ -243,6 +250,9 @@ public long getSingleDataTimestamp() { */ public Number getSingleDataValue() { if (this.type == Type.SINGLE_VALUE) { + if (aggregator != null) { // indicates this is an aggregated metric + return aggregator.getAggregatedValues().get(0); + } if (values.size() == 0) { return null; } else { @@ -254,6 +264,26 @@ public Number getSingleDataValue() { } } + public void setAggregator(TimelineMetricAggregator aggregator) { + this.aggregator = aggregator; + } + + public void aggregate(TimelineMetric incoming) { + aggregator.aggregate(incoming); + } + + /** + * Get aggregated values if this metric is an aggregated metric. + * + * @return Aggregated values, no particular order is guaranteed. + */ + public List getAggregatedValues() { + if (aggregator != null) { + return aggregator.getAggregatedValues(); + } + return null; + } + /** * Aggregate an incoming metric to the base aggregated metric with the given * operation state in a stateless fashion. The assumption here is diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregator.java index a29d58488ba..bdec346437c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregator.java @@ -32,18 +32,16 @@ TimelineMetricAggregator() { needFlush = true; values = new ArrayList<>(); - // Add an initial value, so we can use values.set(0, newVal) later. - values.add(0); } - void update(TimelineMetric incoming) { + void aggregate(TimelineMetric incoming) { for (Number value : incoming.getValues().values()) { - doUpdate(value); + doAggregate(value); } needFlush = true; } - protected abstract void doUpdate(Number value); + protected abstract void doAggregate(Number value); List getAggregatedValues() { if (needFlush) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorAvg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorAvg.java index 30ec3eba4b5..6629cbc3b59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorAvg.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorAvg.java @@ -1,4 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorAvg { +/** + * This class implements the average aggregation function. + */ +public class TimelineMetricAggregatorAvg extends TimelineMetricAggregator { + private long count = 0; + private double sum = 0.0; + + @Override protected void doAggregate(Number value) { + sum += value.doubleValue(); + count++; + } + + @Override protected void doGetAggregatedValues() { + if (values.isEmpty()) { + values.add(sum / count); + } else { + values.set(0, sum / count); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorCount.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorCount.java index e776c5b8cfb..4aabbf84261 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorCount.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorCount.java @@ -17,14 +17,21 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; +/** + * This class implements the count aggregation function. + */ public class TimelineMetricAggregatorCount extends TimelineMetricAggregator { private long count = 0; - @Override protected void doUpdate(Number value) { + @Override protected void doAggregate(Number value) { count++; } @Override protected void doGetAggregatedValues() { - values.set(0, count); + if (values.isEmpty()) { + values.add(count); + } else { + values.set(0, count); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMax.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMax.java index 4936f17ee22..fe5486c1f57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMax.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMax.java @@ -1,4 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorMax { +/** + * This class implements max aggregation function. + */ +public class TimelineMetricAggregatorMax extends TimelineMetricAggregator { + private Number max = null; + + @Override protected void doAggregate(Number value) { + if (max == null) { + max = value; + } else { + max = TimelineMetricCalculator.compare(value, max) > 0 ? value : max; + } + } + + @Override protected void doGetAggregatedValues() { + if (values.isEmpty()) { + values.add(max); + } else { + values.set(0, max); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMaxFreq.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMaxFreq.java index f2825d64259..33e1012c5f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMaxFreq.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMaxFreq.java @@ -1,27 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -import java.util.HashMap; -import java.util.Map; +import java.util.*; -public class TimelineMetricAggregatorMode extends TimelineMetricAggregator { - private static final int DEFAULT_MAX_METRIC_COUNT = Integer.MAX_VALUE; - private long maxMetricCount; - private Map map; +/** + * This class implements maxFrequency aggregation function. + * The result is a collection of metric which occurs most often. + * If the number of metrics reaches the capacity, + * the oldest metric based on the enter order will be removed. + */ +public class TimelineMetricAggregatorMaxFreq extends TimelineMetricAggregator { + private static final byte INFINITE_CAPACITY = -1; + private long capacity; + // Key is Number, value is the frequency of the key. + private Map freqMap; + // If there is no capacity limit, we will use maxFreq and maxFreqValues + // to do the aggregation, otherwise, we use invertedFreqMap and deque. + private long maxFreq = 0L; + private List maxFreqValues; + // Key is frequency, value is a set of numbers whose frequency is key. + private TreeMap> invertedFreqMap; + // Maintain order with deque so we can remove the oldest metrics when needed. + private Deque deque; - public TimelineMetricAggregatorMode() { - this(DEFAULT_MAX_METRIC_COUNT); + TimelineMetricAggregatorMaxFreq() { + this(INFINITE_CAPACITY); } - public TimelineMetricAggregatorMode(long maxMetricCount) { - maxMetricCount = maxMetricCount; - map = new HashMap<>(); + TimelineMetricAggregatorMaxFreq(long capacity) { + this.capacity = capacity; + freqMap = new HashMap<>(); + if (capacity == INFINITE_CAPACITY) { + maxFreqValues = new ArrayList<>(); + } else { + invertedFreqMap = new TreeMap<>(); + deque = new ArrayDeque<>(); + } } - @Override public void update(TimelineMetric incoming) { - map.put(incomingValue, map.getOrDefault(incoming.getSingleDataValue(), 0L) + 1); + @Override protected void doAggregate(Number value) { + if (capacity == INFINITE_CAPACITY) { + doAggregateWithInfinityCapacity(value); + } else { + doAggregateWithCapacity(value); + } } - @Override public void flush(TimelineMetric base) { + private void doAggregateWithInfinityCapacity(Number value) { + long currFreq = freqMap.getOrDefault(value, 0L) + 1; + freqMap.put(value, currFreq); + if (currFreq > maxFreq) { + maxFreq = currFreq; + maxFreqValues.clear(); + maxFreqValues.add(value); + } else if (currFreq == maxFreq) { + maxFreqValues.add(value); + } + } + + private void doAggregateWithCapacity(Number value) { + if (deque.size() == capacity) { + removeTheOldest(); + } + deque.offerLast(value); + long currFreq = freqMap.getOrDefault(value, 0L) + 1; + freqMap.put(value, currFreq); + invertedFreqMap.putIfAbsent(currFreq, new HashSet<>()); + invertedFreqMap.get(currFreq).add(value); + } + + + private void removeTheOldest() { + Number x = deque.pollFirst(); + long oldFreq = freqMap.get(x); + if (oldFreq == 1) { + freqMap.remove(x); + } else { + freqMap.put(x, oldFreq - 1); + } + + // Remove x from the old frequency set. + // Remove entry if corresponding set becomes empty after removal. + // If new frequency of number is greater than zero, + // put the number in the corresponding set. + Set sameFreqSet = invertedFreqMap.get(oldFreq); + sameFreqSet.remove(x); + if (sameFreqSet.isEmpty()) { + invertedFreqMap.remove(oldFreq); + } + long newFreq = oldFreq - 1; + if (newFreq > 0) { + invertedFreqMap.putIfAbsent(newFreq, new HashSet<>()); + invertedFreqMap.get(newFreq).add(x); + } + } + @Override protected void doGetAggregatedValues() { + values.clear(); + if (capacity == INFINITE_CAPACITY) { + values.addAll(maxFreqValues); + } else { + values.addAll(invertedFreqMap.lastEntry().getValue()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMedian.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMedian.java index 78d5f68622f..e273af21bc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMedian.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMedian.java @@ -1,4 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineAggregatorMedian { +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.TreeSet; + +/** + * This class implements the median aggregation function. + * If the number of metrics is odd, the middle value will be the median. + * If the number of metrics is even, the mean of two middle values will be the median. + * If the number of metrics reaches the capacity, + * the oldest metric based on the enter order will be removed. + */ +public class TimelineMetricAggregatorMedian extends TimelineMetricAggregator { + class Node implements Comparable { + long seq; + Number value; + + Node(long seq, Number value) { + this.seq = seq; + this.value = value; + } + + @Override public int compareTo(Node o) { + int comp = TimelineMetricCalculator.compare(value, o.value); + if (comp == 0) { + return seq == o.seq ? 0 : seq < o.seq ? -1 : 1; + } + return comp; + } + + @Override public String toString() { + return value.toString(); + } + } + + private static long seq = 0; + private static final byte INFINITE_CAPACITY = -1; + private long capacity; + // We might need to delete the oldest metric + // when total number of metrics reaches to capacity, + // so here use TreeSet instead of PriorityQueue. + // Different metrics may have same value, + // so here use a wrapped class Node to distinguish different metrics. + // SmallerSet maintains the smaller half metric, functions as a maxHeap. + // LargerSet maintains the larger half metric, functions as a minHeap. + private TreeSet smallerSet; + private TreeSet largerSet; + // Use a deque to maintain order so that we can remove the oldest metric when needed. + private Deque deque; + + TimelineMetricAggregatorMedian() { + this(INFINITE_CAPACITY); + } + + TimelineMetricAggregatorMedian(long capacity) { + this.capacity = capacity; + smallerSet = new TreeSet<>(); + largerSet= new TreeSet<>(); + if (capacity != INFINITE_CAPACITY) { + deque = new ArrayDeque<>(); + } + } + + @Override protected void doAggregate(Number value) { + Node node = new Node(seq++, value); + if (capacity != INFINITE_CAPACITY) { + if (deque.size() == capacity) { + removeTheOldest(); + } + deque.offerLast(node); + } + + if (smallerSet.isEmpty()) { + smallerSet.add(node); + } else if (smallerSet.size() == largerSet.size()) { + Node minOfLargerSet = largerSet.first(); + if (TimelineMetricCalculator.compare(value, minOfLargerSet.value) > 0) { + smallerSet.add(minOfLargerSet); + largerSet.remove(minOfLargerSet); + largerSet.add(node); + } else { + smallerSet.add(node); + } + } else { + Node maxOfSmallerSet = smallerSet.last(); + if (TimelineMetricCalculator.compare(value, maxOfSmallerSet.value) < 0) { + largerSet.add(maxOfSmallerSet); + smallerSet.remove(maxOfSmallerSet); + smallerSet.add(node); + } else { + largerSet.add(node); + } + } + } + + private void removeTheOldest() { + Node oldest = deque.pollFirst(); + if (smallerSet.contains(oldest)) { + smallerSet.remove(oldest); + } else { + largerSet.remove(oldest); + } + // Make sure smallerSet.size() >= largerSet.size() + // and not greater by more than 1. + if (smallerSet.size() < largerSet.size()) { + Node minOfLargerSet = largerSet.first(); + largerSet.remove(minOfLargerSet); + smallerSet.add(minOfLargerSet); + } else if (smallerSet.size() == largerSet.size() + 2) { + Node maxOfSmallerSet = smallerSet.last(); + smallerSet.remove(maxOfSmallerSet); + largerSet.add(maxOfSmallerSet); + } + } + + @Override protected void doGetAggregatedValues() { + double median; + if (smallerSet.size() > largerSet.size()) { + median = smallerSet.last().value.doubleValue(); + } else { + Number maxOfSmallerSet = smallerSet.last().value; + Number minOfLargerSet = largerSet.first().value; + median = TimelineMetricCalculator.sum(maxOfSmallerSet, minOfLargerSet) + .doubleValue() / 2.0; + } + + if (values.isEmpty()) { + values.add(median); + } else { + values.set(0, median); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMin.java index 6dc8e445818..fdef7da35f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorMin.java @@ -1,4 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorMin { +/** + * This class implements min aggregation function. + */ +public class TimelineMetricAggregatorMin extends TimelineMetricAggregator { + private Number min = null; + + @Override protected void doAggregate(Number value) { + if (min == null) { + min = value; + } else { + min = TimelineMetricCalculator.compare(value, min) < 0 ? value : min; + } + } + + @Override protected void doGetAggregatedValues() { + if (values.isEmpty()) { + values.add(min); + } else { + values.set(0, min); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorSum.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorSum.java index 9f58a472805..adf325b1d6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorSum.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorSum.java @@ -1,4 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorSum { +/** + * This class implements sum aggregation function. + */ +public class TimelineMetricAggregatorSum extends TimelineMetricAggregator { + private Number sum; + + @Override protected void doAggregate(Number value) { + sum = TimelineMetricCalculator.sum(sum, value); + } + + @Override protected void doGetAggregatedValues() { + if (values.isEmpty()) { + values.add(sum); + } else { + values.set(0, sum); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMax.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMax.java index ea611156760..9604db3d755 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMax.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMax.java @@ -1,4 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorTopKMax { +import java.util.PriorityQueue; + +/** + * This class implements the topKMax aggregation function. + */ +public class TimelineMetricAggregatorTopKMax extends TimelineMetricAggregator { + private static final int DEFAULT_PARAM_K = 3; + private int k; + private PriorityQueue minHeap; + + TimelineMetricAggregatorTopKMax() { + this(DEFAULT_PARAM_K); + } + + TimelineMetricAggregatorTopKMax(int k) { + this.k = k; + minHeap = new PriorityQueue<>((n1, n2) -> TimelineMetricCalculator.compare(n1, n2)); + } + + @Override protected void doAggregate(Number value) { + if (minHeap.size() < k) { + minHeap.offer(value); + } else if (TimelineMetricCalculator.compare(value, minHeap.peek()) > 0) { + minHeap.poll(); + minHeap.offer(value); + } + } + + @Override protected void doGetAggregatedValues() { + values.clear(); + values.addAll(minHeap); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMin.java index 04142325fd1..d5452e2f65f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricAggregatorTopKMin.java @@ -1,4 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.api.records.timelineservice; -public class TimelineMetricAggregatorTopKMin { +import java.util.*; + +/** + * This class implements the topKMin aggregation function. + */ +public class TimelineMetricAggregatorTopKMin extends TimelineMetricAggregator { + private static final int DEFAULT_PARAM_K = 3; + private int k; + private PriorityQueue maxHeap; + + TimelineMetricAggregatorTopKMin() { + this(DEFAULT_PARAM_K); + } + + TimelineMetricAggregatorTopKMin(int k) { + this.k = k; + maxHeap = new PriorityQueue<>((n1, n2) -> TimelineMetricCalculator.compare(n2, n1)); + } + + @Override protected void doAggregate(Number value) { + if (maxHeap.size() < k) { + maxHeap.offer(value); + } else if (TimelineMetricCalculator.compare(value, maxHeap.peek()) < 0) { + maxHeap.poll(); + maxHeap.offer(value); + } + } + + @Override protected void doGetAggregatedValues() { + values.clear(); + values.addAll(maxHeap); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java index 58e5c3811cf..4f64a3a366d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java @@ -52,16 +52,30 @@ public TimelineMetric exec(TimelineMetric incoming, public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, Map state) { if (base == null) { - return incoming; + base = new TimelineMetric(); + base.setAggregator(new TimelineMetricAggregatorMax()); } - Number incomingValue = incoming.getSingleDataValue(); - Number aggregateValue = base.getSingleDataValue(); - if (aggregateValue == null) { - aggregateValue = Long.MIN_VALUE; - } - if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) { - base.addValue(incoming.getSingleDataTimestamp(), incomingValue); + base.aggregate(incoming); + return base; + } + }, + MIN("MIN") { + /** + * Keep the less value of incoming and base. Stateless operation. + * + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return the less value of a and b + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + base = new TimelineMetric(); + base.setAggregator(new TimelineMetricAggregatorMin()); } + base.aggregate(incoming); return base; } }, @@ -76,8 +90,7 @@ public TimelineMetric exec(TimelineMetric incoming, */ @Override public TimelineMetric exec(TimelineMetric incoming, - TimelineMetric base, - Map state) { + TimelineMetric base, Map state) { return incoming; } }, @@ -94,56 +107,205 @@ public TimelineMetric exec(TimelineMetric incoming, * @return A metric with value a + b - p */ @Override - public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, - Map state) { + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { if (base == null) { - return incoming; + base = new TimelineMetric(); + base.setAggregator(new TimelineMetricAggregatorSum()); } - Number incomingValue = incoming.getSingleDataValue(); - Number aggregateValue = base.getSingleDataValue(); - Number result - = TimelineMetricCalculator.sum(incomingValue, aggregateValue); - - // If there are previous value in the state, we will take it off from the - // sum + // If there are previous value in the state, + // we will take it off from the incoming. if (state != null) { Object prevMetric = state.get(PREV_METRIC_STATE_KEY); if (prevMetric instanceof TimelineMetric) { - result = TimelineMetricCalculator.sub(result, - ((TimelineMetric) prevMetric).getSingleDataValue()); + incoming.addValue(incoming.getSingleDataTimestamp(), + TimelineMetricCalculator.sub(incoming.getSingleDataValue(), + ((TimelineMetric) prevMetric).getSingleDataValue())); } } - base.addValue(incoming.getSingleDataTimestamp(), result); + base.aggregate(incoming); return base; } }, AVG("AVERAGE") { /** * Return the average value of the incoming metric and the base metric, - * with a given state. Not supported yet. + * with a given state. * * @param incoming Metric a * @param base Metric b * @param state Operation state - * @return Not finished yet + * @return A metric with value (b * state.count + a) / (state.count + 1) */ @Override - public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base, - Map state) { - // Not supported yet - throw new UnsupportedOperationException( - "Unsupported aggregation operation: AVERAGE"); + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + base = new TimelineMetric(); + base.addValue(incoming.getSingleDataTimestamp(), 0.0); + base.setAggregator(new TimelineMetricAggregatorAvg()); + } + base.aggregate(incoming); + return base; + } + }, + Count("Count") { + /** + * Increase count by 1. + * @param incoming Metric a (not used) + * @param base Metric b + * @param state Operation state (not used) + * @return Metric b with count increased by 1 + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + base = new TimelineMetric(); + base.setAggregator(new TimelineMetricAggregatorCount()); + } + base.aggregate(incoming); + return base; + } + }, + Median("Median") { + /** + * Update median with the incoming metric. + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric b which contains the new median + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + Long capacity = null; + if (state != null) { + Object val = state.get(CAPACITY); + if (val instanceof Integer) { + capacity = Long.valueOf((Integer)val); + } else if (val instanceof Long) { + capacity = (Long)val; + } + } + base = new TimelineMetric(); + if (capacity == null) { + System.out.println("create median aggregator with infinite capacity"); + base.setAggregator(new TimelineMetricAggregatorMedian()); + } else { + base.setAggregator(new TimelineMetricAggregatorMedian(capacity)); + } + } + base.aggregate(incoming); + return base; + } + }, + TopKMin("TopKMin") { + /** + * Update top k min with the incoming metric. + * @param incoming Metric a + * @param base Metric b + * @param state Operation state contains PARAM_K + * @return Metric b which contains the top k min values + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + Integer k = null; + if (state != null) { + Object kObj = state.get(PARAM_K); + if (kObj instanceof Integer) { + k = (Integer) kObj; + } + } + base = new TimelineMetric(); + base.setType(TimelineMetric.Type.TIME_SERIES); + if (k != null) { + base.setAggregator(new TimelineMetricAggregatorTopKMin(k)); + } else { + base.setAggregator(new TimelineMetricAggregatorTopKMin()); + } + } + base.aggregate(incoming); + return base; + } + }, + TopKMax("TopKMax") { + /** + * Update top k max with the incoming metric. + * @param incoming Metric a + * @param base Metric b + * @param state Operation state contains PARAM_K + * @return Metric b which contains the top k max values + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + Integer k = null; + if (state != null) { + Object kObj = state.get(PARAM_K); + if (kObj instanceof Integer) { + k = (Integer) kObj; + } + } + base = new TimelineMetric(); + base.setType(TimelineMetric.Type.TIME_SERIES); + if (k != null) { + base.setAggregator(new TimelineMetricAggregatorTopKMax(k)); + } else { + base.setAggregator(new TimelineMetricAggregatorTopKMax()); + } + } + base.aggregate(incoming); + return base; + } + }, + MaxFreq("MaxFrequency") { + /** + * Update median with the incoming metric. + * @param incoming Metric a + * @param base Metric b + * @param state Operation state (not used) + * @return Metric b which contains the new median + */ + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric base, Map state) { + if (base == null) { + Long capacity = null; + if (state != null) { + Object val = state.get(CAPACITY); + if (val instanceof Integer) { + capacity = Long.valueOf((Integer)val); + } else if (val instanceof Long) { + capacity = (Long)val; + } + } + base = new TimelineMetric(); + if (capacity == null) { + base.setAggregator(new TimelineMetricAggregatorMaxFreq()); + } else { + base.setAggregator(new TimelineMetricAggregatorMaxFreq(capacity)); + } + } + base.aggregate(incoming); + return base; } }; public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC"; + public static final String PARAM_K = "PARAM_K"; + public static final String CAPACITY = "CAPACITY"; /** * Perform the aggregation operation. * - * @param incoming Incoming metric + * @param incoming Incoming metric * @param aggregate Base aggregation metric - * @param state Operation state + * @param state Operation state * @return Result metric for this aggregation operation */ public TimelineMetric aggregate(TimelineMetric incoming, @@ -157,8 +319,7 @@ public TimelineMetric aggregate(TimelineMetric incoming, opName = opString; } - @Override - public String toString() { + @Override public String toString() { return this.opName; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java index 3244bc37c0b..4d50b6a70d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,62 +17,314 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.HashMap; -import java.util.Map; - import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; - import org.junit.Test; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestTimelineMetric { - @Test - public void testTimelineMetricAggregation() { + @Test public void testTimelineMetricAggregationSum() { long ts = System.currentTimeMillis(); - // single_value metric add against null metric - TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS", - TimelineMetricOperation.SUM, ts, 10000L); + TimelineMetric m1 = + getSingleValueMetric("MEGA_BYTES_MILLIS", TimelineMetricOperation.SUM, + ts, 10000L); TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); assertEquals(10000L, aggregatedMetric.getSingleDataValue()); - TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS", - TimelineMetricOperation.SUM, ts, 20000L); + TimelineMetric m2 = + getSingleValueMetric("MEGA_BYTES_MILLIS", TimelineMetricOperation.SUM, + ts, 20000L); aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); assertEquals(30000L, aggregatedMetric.getSingleDataValue()); - // stateful sum test + // Stateful sum test. Map state = new HashMap<>(); state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2); - TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS", - TimelineMetricOperation.SUM, ts, 10000L); - aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric, - state); + TimelineMetric m2New = + getSingleValueMetric("MEGA_BYTES_MILLIS", TimelineMetricOperation.SUM, + ts, 10000L); + aggregatedMetric = + TimelineMetric.aggregateTo(m2New, aggregatedMetric, state); assertEquals(20000L, aggregatedMetric.getSingleDataValue()); - // single_value metric max against single_value metric - TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE", - TimelineMetricOperation.MAX, ts, 150L); - TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null); - assertEquals(150L, aggregatedMax.getSingleDataValue()); - - TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE", - TimelineMetricOperation.MAX, ts, 170L); - aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax); - assertEquals(170L, aggregatedMax.getSingleDataValue()); - - // single_value metric avg against single_value metric - TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE", - TimelineMetricOperation.AVG, ts, 150L); - try { - TimelineMetric.aggregateTo(m5, null); - fail("Taking average among metrics is not supported! "); - } catch (UnsupportedOperationException e) { - // Expected - } + } + + @Test public void testTimelineMetricAggregationMax() { + long ts = System.currentTimeMillis(); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MAX, ts, + 150L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(150L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MAX, ts, + 170L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(170L, aggregatedMetric.getSingleDataValue()); + } + + @Test public void testTimelineMetricAggregationMin() { + long ts = System.currentTimeMillis(); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MIN, ts, + 150L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(150L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MIN, ts, + 120L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(120L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MIN, ts, + 180L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + assertEquals(120L, aggregatedMetric.getSingleDataValue()); + } + + @Test public void testTimelineMetricAggregationAverage() { + long ts = System.currentTimeMillis(); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.AVG, ts, + 150L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(150.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.AVG, ts, + 170L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(160.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.AVG, ts, + -170L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + assertEquals(50.0, aggregatedMetric.getSingleDataValue()); + } + + @Test public void testTimelineMetricAggregationMedian() { + long ts = System.currentTimeMillis(); + Map state = new HashMap<>(); + state.put("CAPACITY", 3); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 150L); + TimelineMetric aggregatedMetric = + TimelineMetric.aggregateTo(m1, null, state); + assertEquals(150.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 110L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(130.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 150L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + assertEquals(150.0, aggregatedMetric.getSingleDataValue()); + + // With capacity being 3, m1 will be evicted when we feed m4 to the aggregator. + TimelineMetric m4 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 120L); + aggregatedMetric = TimelineMetric.aggregateTo(m4, aggregatedMetric); + assertEquals(120.0, aggregatedMetric.getSingleDataValue()); + + // Test infinite capacity. + TimelineMetric m5 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 150L); + aggregatedMetric = + TimelineMetric.aggregateTo(m5, null); + assertEquals(150.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m6 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 120L); + aggregatedMetric = TimelineMetric.aggregateTo(m6, aggregatedMetric); + assertEquals(135.0, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m7 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Median, + ts, 120L); + aggregatedMetric = TimelineMetric.aggregateTo(m7, aggregatedMetric); + assertEquals(120.0, aggregatedMetric.getSingleDataValue()); + } + + @Test public void testTimelineMetricAggregationTopKMin() { + long ts = System.currentTimeMillis(); + Map state = new HashMap<>(); + state.put(TimelineMetricOperation.PARAM_K, 2); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMin, + ts, 120L); + TimelineMetric aggregatedMetric = + TimelineMetric.aggregateTo(m1, null, state); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMin, + ts, 100L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + List topKMin = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMin.size()); + assertTrue(topKMin.contains(120L)); + assertTrue(topKMin.contains(100L)); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMin, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + topKMin = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMin.size()); + assertTrue(topKMin.contains(90L)); + assertTrue(topKMin.contains(100L)); + + TimelineMetric m3Again = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMin, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m3Again, aggregatedMetric); + topKMin = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMin.size()); + assertEquals(90L, topKMin.get(0)); + assertEquals(90L, topKMin.get(1)); + } + + @Test public void testTimelineMetricAggregationTopKMax() { + long ts = System.currentTimeMillis(); + Map state = new HashMap<>(); + state.put(TimelineMetricOperation.PARAM_K, 2); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMax, + ts, 120L); + TimelineMetric aggregatedMetric = + TimelineMetric.aggregateTo(m1, null, state); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMax, + ts, 100L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + List topKMax = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMax.size()); + assertTrue(topKMax.contains(100L)); + assertTrue(topKMax.contains(120L)); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMax, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + topKMax = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMax.size()); + assertTrue(topKMax.contains(100L)); + assertTrue(topKMax.contains(120L)); + + TimelineMetric m4 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMax, + ts, 120L); + aggregatedMetric = TimelineMetric.aggregateTo(m4, aggregatedMetric, state); + topKMax = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMax.size()); + assertEquals(120L, topKMax.get(0)); + assertEquals(120L, topKMax.get(1)); + + TimelineMetric m5 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.TopKMax, + ts, 190L); + aggregatedMetric = TimelineMetric.aggregateTo(m5, aggregatedMetric, state); + topKMax = aggregatedMetric.getAggregatedValues(); + assertEquals(2, topKMax.size()); + assertTrue(topKMax.contains(190L)); + assertTrue(topKMax.contains(120L)); + } + + @Test public void testTimelineMetricAggregationCount() { + long ts = System.currentTimeMillis(); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Count, ts, + 90L); + TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null); + assertEquals(1L, aggregatedMetric.getSingleDataValue()); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.Count, ts, + 190L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + assertEquals(2L, aggregatedMetric.getSingleDataValue()); + } + + @Test public void testTimelineMetricAggregationMaxFreq() { + long ts = System.currentTimeMillis(); + Map state = new HashMap<>(); + state.put("CAPACITY", 3); + TimelineMetric m1 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 70L); + TimelineMetric aggregatedMetric = + TimelineMetric.aggregateTo(m1, null, state); + List maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertTrue(maxFreqVals.size() == 1); + assertTrue(maxFreqVals.contains(70L)); + + TimelineMetric m2 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(2, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(70L)); + assertTrue(maxFreqVals.contains(90L)); + + TimelineMetric m3 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 70L); + aggregatedMetric = TimelineMetric.aggregateTo(m3, aggregatedMetric); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(1, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(70L)); + + // With capacity being 2, m1 is evicted when we feed m4 to the aggregator. + TimelineMetric m4 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m4, aggregatedMetric); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(1, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(90L)); + + // Test infinite frequency. + TimelineMetric m5 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 90L); + aggregatedMetric = TimelineMetric.aggregateTo(m5, null); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(1, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(90L)); + + TimelineMetric m6 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 30L); + aggregatedMetric = TimelineMetric.aggregateTo(m6, aggregatedMetric); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(2, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(30L)); + assertTrue(maxFreqVals.contains(90L)); + TimelineMetric m7 = + getSingleValueMetric("TRANSFER_RATE", TimelineMetricOperation.MaxFreq, + ts, 30L); + aggregatedMetric = TimelineMetric.aggregateTo(m7, aggregatedMetric); + maxFreqVals = aggregatedMetric.getAggregatedValues(); + assertEquals(1, maxFreqVals.size()); + assertTrue(maxFreqVals.contains(30L)); } private static TimelineMetric getSingleValueMetric(String id, @@ -81,7 +333,7 @@ private static TimelineMetric getSingleValueMetric(String id, m.setId(id); m.setType(Type.SINGLE_VALUE); m.setRealtimeAggregationOp(op); - Map metricValues = new HashMap(); + Map metricValues = new HashMap<>(); metricValues.put(timestamp, value); m.setValues(metricValues); return m;