diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RealTimeAggregationOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RealTimeAggregationOperation.java new file mode 100644 index 0000000..bc54ac5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RealTimeAggregationOperation.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Map; + +/** + * Aggregation operations + */ +public enum RealTimeAggregationOperation { + NOP("NOP", new Operation() { + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric aggregate, Map state) { + return aggregate; + } + }), + MAX("MAX", new Operation() { + @Override + public TimelineMetric exec(TimelineMetric incoming, + TimelineMetric aggregate, Map state) { + if (aggregate == null) { + return incoming; + } + Number incomingValue = incoming.retrieveSingleDataValue(); + Number aggregateValue = aggregate.retrieveSingleDataValue(); + if (aggregateValue == null) { + aggregateValue = Long.MIN_VALUE; + } + if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) { + aggregate.addValue(incoming.retrieveSingleDataKey(), incomingValue); + } + return aggregate; + } + }), + SUM("SUM", new Operation() { + // For single value only + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric aggregate, + Map state) { + if (aggregate == null) { + return incoming; + } + Number incomingValue = incoming.retrieveSingleDataValue(); + Number aggregateValue = aggregate.retrieveSingleDataValue(); + Number result + = TimelineMetricCalculator.sum(incomingValue, aggregateValue); + + // If there are previous value in the state, we will take it off from the + // sum + if (state != null) { + Object prevMetric = state.get(PREV_METRIC_STATE_KEY); + if (prevMetric instanceof TimelineMetric) { + result = TimelineMetricCalculator.sub(result, + ((TimelineMetric) prevMetric).retrieveSingleDataValue()); + } + } + aggregate.addValue(incoming.retrieveSingleDataKey(), result); + return aggregate; + } + }), + AVG("AVERAGE", new Operation() { + @Override + public TimelineMetric exec(TimelineMetric incoming, TimelineMetric aggregate, + Map state) { + // Not supported yet + throw new UnsupportedOperationException( + "Unsupported aggregation operation: AVERAGE"); + } + }); + + public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC"; + + /** + * Perform the aggregation operation + * + * @param incoming + * @param aggregate + * @param state + * @return + */ + public TimelineMetric aggregate(TimelineMetric incoming, + TimelineMetric aggregate, Map state) { + return operation.exec(incoming, aggregate, state); + } + + private final String opName; + private final Operation operation; + + RealTimeAggregationOperation(String opString, Operation aggrOp) { + opName = opString; + operation = aggrOp; + } + + @Override + public String toString() { + return this.opName; + } + + private interface Operation { + TimelineMetric exec(TimelineMetric incoming, TimelineMetric aggregate, + Map state); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineAggregationBasis.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineAggregationBasis.java new file mode 100644 index 0000000..2f0b3cc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineAggregationBasis.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public enum TimelineAggregationBasis { + APPLICATION, + FLOW, + USER +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java index 8fcc2ae..21f1ae6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -30,6 +30,7 @@ YARN_FLOW_RUN, YARN_APPLICATION, YARN_APPLICATION_ATTEMPT, + YARN_APPLICATION_AGGREGATION, YARN_CONTAINER, YARN_USER, YARN_QUEUE, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java index 2f60515..8e74ad7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -48,6 +49,10 @@ private Type type; private String id; + // By default, not to do any aggregation operations + private RealTimeAggregationOperation realtimeAggregationOp + = RealTimeAggregationOperation.NOP; + private Comparator reverseComparator = new Comparator() { @Override public int compare(Long l1, Long l2) { @@ -83,6 +88,15 @@ public void setId(String metricId) { this.id = metricId; } + public RealTimeAggregationOperation getRealtimeAggregationOp() { + return realtimeAggregationOp; + } + + public void setRealtimeAggregationOp( + final RealTimeAggregationOperation op) { + this.realtimeAggregationOp = op; + } + // required by JAXB @InterfaceAudience.Private @XmlElement(name = "values") @@ -166,11 +180,76 @@ public boolean equals(Object o) { @Override public String toString() { - String str = "{id:" + id + ", type:" + type; - if (!values.isEmpty()) { - str += ", values:" + values; + StringBuffer sb = new StringBuffer(); + sb.append("{id: ").append(id).append(", type: ").append(type) + .append(", realtimeAggregationOp: ") + .append(realtimeAggregationOp).append("; ").append(values.toString()).append("}"); + return sb.toString(); + } + + // get latest timeline metric as single value type + public TimelineMetric retrieveLatestSingleValueMetric() { + if (this.getType().equals(Type.SINGLE_VALUE)) { + return this; + } else { + TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE); + Long lastKey = this.values.lastKey(); + if (lastKey != null) { + Number lastValue = this.values.get(lastKey); + singleValueMetric.addValue(lastKey, lastValue); + } + return singleValueMetric; + } + } + + public long retrieveSingleDataKey() { + if (this.type.equals(Type.SINGLE_VALUE)) { + if (values.size() == 0) { + throw new YarnRuntimeException("Values for this timeline metric is " + + "empty."); + } else { + return values.firstKey(); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); } - str += "}"; - return str; } + + public Number retrieveSingleDataValue() { + if (this.type.equals(Type.SINGLE_VALUE)) { + if (values.size() == 0) { + return null; + } else { + return values.get(values.firstKey()); + } + } else { + throw new YarnRuntimeException("Type for this timeline metric is not " + + "SINGLE_VALUE."); + } + } + + /** + * Aggregate a stateless metric + */ + public static TimelineMetric accumulateTo(TimelineMetric latestMetric, + TimelineMetric baseAggregatedMetric) { + return accumulateTo(latestMetric, baseAggregatedMetric, null); + } + + /** + * The assumption here is baseAggregatedMetric and latestMetric should be + * single value data if not null. + * + * @param latestMetric + * @param baseAggregatedMetric + * @param state + * @return TimelineMetric after accumulated + */ + public static TimelineMetric accumulateTo(TimelineMetric latestMetric, + TimelineMetric baseAggregatedMetric, Map state) { + RealTimeAggregationOperation operation = latestMetric.getRealtimeAggregationOp(); + return operation.aggregate(latestMetric, baseAggregatedMetric, state); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java new file mode 100644 index 0000000..b0ecb00 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +public final class TimelineMetricCalculator { + + private TimelineMetricCalculator() { + // do nothing. + } + + /** + * Compare two not-null numbers. + * @param n1 + * @param n2 + * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a + * positive int otherwise. + */ + public static int compare(Number n1, Number n2) { + if (n1 == null || n2 == null) { + throw new YarnRuntimeException( + "Number be compared shouldn't be null."); + } + + if (n1 instanceof Integer || n1 instanceof Long){ + if (n1.longValue() == n2.longValue()) { + return 0; + } else { + return (n1.longValue() < n2.longValue()) ? -1 : 1; + } + } + + if (n1 instanceof Float || n1 instanceof Double) { + if (n1.doubleValue() == n2.doubleValue()) { + return 0; + } else { + return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1; + } + } + + // TODO throw warnings/exceptions for other types of number. + throw new YarnRuntimeException("Unsupported types for number comparison: " + + n1.getClass().getName() + ", " + n2.getClass().getName()); + } + + /** + * Subtract operation between two Numbers. + * @param n1 + * @param n2 + * @return Number represent to (n1 - n2). + */ + public static Number sub(Number n1, Number n2) { + if (n1 == null) { + throw new YarnRuntimeException( + "Number be substracted shouldn't be null."); + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer){ + if (((n1.intValue() >= 0) && + (n1.intValue() - Integer.MAX_VALUE) > n2.intValue()) || + ((n1.intValue() < 0) && + (n1.intValue() - Integer.MIN_VALUE < n2.intValue()))) { + return Long.valueOf(n1.longValue() - n2.longValue()); + } + return Integer.valueOf(n1.intValue() - n2.intValue()); + } + + if (n1 instanceof Long) { + return Long.valueOf(n1.longValue() - n2.longValue()); + } + + if (n1 instanceof Float) { + return Float.valueOf(n1.floatValue() - n2.floatValue()); + } + + if (n1 instanceof Double) { + return Double.valueOf(n1.doubleValue() - n2.doubleValue()); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } + + /** + * Sum up two Numbers. + * @param n1 + * @param n2 + * @return Number represent to (n1 + n2). + */ + public static Number sum(Number n1, Number n2) { + if (n1 == null) { + return n2; + } else if (n2 == null) { + return n1; + } + + if (n1 instanceof Integer) { + if (((n1.intValue() >= 0) && + (Integer.MAX_VALUE - n1.intValue() < n2.intValue()) || + ((n1.intValue() < 0) && + (Integer.MIN_VALUE - n1.intValue() > n2.intValue())))) { + return Long.valueOf(n1.longValue() + n2.longValue()); + } + return Integer.valueOf(n1.intValue() + n2.intValue()); + } + + if (n1 instanceof Long) { + return Long.valueOf(n1.longValue() + n2.longValue()); + } + + if (n1 instanceof Float) { + return Float.valueOf(n1.floatValue() + n2.floatValue()); + } + + if (n1 instanceof Double) { + return Double.valueOf(n1.doubleValue() + n2.doubleValue()); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } + + /** + * Delta calculation: (n2 + n1)/2 * time. + * assumption: time is not null. + * @param n1 + * @param n2 + * @param time + * @return Number represent to (n1 + n2) * time /2. + */ + public static Number delta(Number n1, Number n2, Long time) { + + // make sure n2 is not null + if (n2 == null) { + if (n1 == null) { + return null; + } else { + n2 = n1; + n1 = null; + } + } + if (n2 instanceof Integer) { + return Long.valueOf( + ((n1 == null ? 0 : n1.intValue()) + n2.intValue()) * time / 2); + } + + if (n2 instanceof Long) { + return Long.valueOf( + ((n1 == null ? 0 : n1.longValue()) + n2.longValue()) * time / 2); + } + + if (n2 instanceof Float) { + return Float.valueOf( + ((n1 == null ? 0 : n1.floatValue()) + n2.floatValue()) * time / 2); + } + + if (n2 instanceof Double) { + return Double.valueOf( + ((n1 == null ? 0 : n1.doubleValue()) + n2.doubleValue()) * time / 2); + } + + // TODO throw warnings/exceptions for other types of number. + return null; + } + + /** + * Get 0 with the same type of given number. + * @param n1 + * @return Number represent 0 for the same type. + */ + public static Number getZero(Number n1) { + if (n1 == null) { + throw new YarnRuntimeException("Input number should not be null."); + } + + if (n1 instanceof Integer){ + return 0; + } + + if (n1 instanceof Long) { + return 0L; + } + + if (n1 instanceof Float) { + return 0.0F; + } + + if (n1 instanceof Double) { + return 0.0D; + } + + throw new YarnRuntimeException("Input number type is not supported, we " + + "only support Integer, Long, Float, Double."); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java new file mode 100644 index 0000000..18d3a32 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.crypto.UnsupportedCodecException; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; + +import org.junit.Test; + +public class TestTimelineMetric { + + @Test + public void testAccumulationOnTimelineMetrics() { + long ts = System.currentTimeMillis(); + // single_value metric add against null metric + TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS", + RealTimeAggregationOperation.SUM, ts, 10000L); + TimelineMetric aggregatedMetric = TimelineMetric.accumulateTo(m1, null); + assertEquals(10000L, aggregatedMetric.retrieveSingleDataValue()); + + TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS", + RealTimeAggregationOperation.SUM, ts, 20000L); + aggregatedMetric = TimelineMetric.accumulateTo(m2, aggregatedMetric); + assertEquals(30000L, aggregatedMetric.retrieveSingleDataValue()); + + // stateful sum test + Map state = new HashMap<>(); + state.put(RealTimeAggregationOperation.PREV_METRIC_STATE_KEY, m2); + TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS", + RealTimeAggregationOperation.SUM, ts, 10000L); + aggregatedMetric = TimelineMetric.accumulateTo(m2New, aggregatedMetric, + state); + assertEquals(20000L, aggregatedMetric.retrieveSingleDataValue()); + + // single_value metric max against single_value metric + TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE", + RealTimeAggregationOperation.MAX, ts, 150L); + TimelineMetric aggregatedMax = TimelineMetric.accumulateTo(m3, null); + assertEquals(150L, aggregatedMax.retrieveSingleDataValue()); + + TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE", + RealTimeAggregationOperation.MAX, ts, 170L); + aggregatedMax = TimelineMetric.accumulateTo(m4, aggregatedMax); + assertEquals(170L, aggregatedMax.retrieveSingleDataValue()); + + // single_value metric avg against single_value metric + TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE", + RealTimeAggregationOperation.AVG, ts, 150L); + try { + TimelineMetric.accumulateTo(m5, null); + fail("Taking average among metrics is not supported! "); + } catch (UnsupportedOperationException e) { + // Expected + } + + } + + private static TimelineMetric getSingleValueMetric(String id, + RealTimeAggregationOperation op, long timestamp, long value) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.SINGLE_VALUE); + m.setRealtimeAggregationOp(op); + Map metricValues = new HashMap(); + metricValues.put(timestamp, value); + m.setValues(metricValues); + return m; + } + + private static TimelineMetric getTimeSeriesMetric(String id, + RealTimeAggregationOperation op, Map metricValues) { + TimelineMetric m = new TimelineMetric(); + m.setId(id); + m.setType(Type.TIME_SERIES); + m.setRealtimeAggregationOp(op); + m.setValues(metricValues); + return m; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 51ec762..95771bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -63,14 +63,14 @@ public void testTimelineEntities() throws Exception { Iterator> itr = metric1.getValues().entrySet().iterator(); Map.Entry entry = itr.next(); - Assert.assertEquals(new Long(3L), entry.getKey()); - Assert.assertEquals(new Double(3.0D), entry.getValue()); + Assert.assertEquals(new Long(1L), entry.getKey()); + Assert.assertEquals(new Float(1.0F), entry.getValue()); entry = itr.next(); Assert.assertEquals(new Long(2L), entry.getKey()); Assert.assertEquals(new Integer(2), entry.getValue()); entry = itr.next(); - Assert.assertEquals(new Long(1L), entry.getKey()); - Assert.assertEquals(new Float(1.0F), entry.getValue()); + Assert.assertEquals(new Long(3L), entry.getKey()); + Assert.assertEquals(new Double(3.0D), entry.getValue()); Assert.assertFalse(itr.hasNext()); entity.addMetric(metric1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 4d3dafd..5d215e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; @@ -119,12 +120,15 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage, if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric memoryMetric = new TimelineMetric(); memoryMetric.setId(ContainerMetric.MEMORY.toString()); + memoryMetric.setRealtimeAggregationOp(RealTimeAggregationOperation.SUM); memoryMetric.addValue(currentTimeMillis, pmemUsage); entity.addMetric(memoryMetric); } if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric cpuMetric = new TimelineMetric(); cpuMetric.setId(ContainerMetric.CPU.toString()); + // TODO: support average + cpuMetric.setRealtimeAggregationOp(RealTimeAggregationOperation.MAX); cpuMetric.addValue(currentTimeMillis, Math.round(cpuUsagePercentPerCore)); entity.addMetric(cpuMetric); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 15187d1..91ae128 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -19,7 +19,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -27,7 +34,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; @@ -45,6 +56,17 @@ private TimelineWriter writer; + private Map aggregationGroups = new HashMap<>(); + + private ScheduledThreadPoolExecutor executor; + private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 2; + private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15; + + + public static final String AGGREGATION_ID_POSTFIX = "_AGGR"; + + public static final String SEPARATOR = "_"; + public TimelineCollector(String name) { super(name); } @@ -56,6 +78,14 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { + // Launch the aggregation thread + executor = new ScheduledThreadPoolExecutor(AGGREGATION_EXECUTOR_NUM_THREADS, + new ThreadFactoryBuilder() + .setNameFormat("TimelineCollector Aggregation thread #%d") + .build()); + executor.scheduleAtFixedRate(new RealTimeAggregator(), 0, + AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, TimeUnit.SECONDS); + super.serviceStart(); } @@ -68,6 +98,9 @@ protected void setWriter(TimelineWriter w) { this.writer = w; } + public abstract TimelineCollectorContext getTimelineEntityContext(); + + /** * Handles entity writes. These writes are synchronous and are written to the * backing storage without buffering/batching. If any entity already exists, @@ -90,8 +123,12 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - TimelineCollectorContext context = getTimelineEntityContext(); + + // Update application metrics for aggregation + TimelineCollector.updateAggregateStatusInternal(entities, + aggregationGroups); + return writer.write(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), context.getAppId(), entities); @@ -117,6 +154,183 @@ public void putEntitiesAsync(TimelineEntities entities, } } - public abstract TimelineCollectorContext getTimelineEntityContext(); + /** + * Aggregate all metrics in a given TimelineEntities. + * + * @param entities + * @param resultEntityPrefix + * @param resultEntityType + * @return Map from metric_id to aggregated TimelineMetric. + */ + public static TimelineEntities aggregateMetrics( + TimelineEntities entities, String resultEntityPrefix, + String resultEntityType) { + Map aggregationGroups = new HashMap<>(); + TimelineCollector.updateAggregateStatusInternal(entities, aggregationGroups); + return TimelineCollector.aggregateInternal(aggregationGroups, + resultEntityPrefix, resultEntityType); + } + + private static void updateAggregateStatusInternal( + TimelineEntities entities, + Map aggregationGroups) { + for (TimelineEntity e : entities.getEntities()) { + AggregationStatus aggrGroup = aggregationGroups.get(e.getType()); + if (aggrGroup == null) { + aggrGroup = new AggregationStatus(); + aggregationGroups.put(e.getType(), aggrGroup); + } + aggrGroup.update(e); + } + } + + private static TimelineEntities aggregateInternal( + Map aggregationGroups, String contextId, + String resultEntityType) { + TimelineEntities resultEntities = new TimelineEntities(); + for (Map.Entry entry + : aggregationGroups.entrySet()) { + TimelineEntity result = new TimelineEntity(); + result.setId(contextId + SEPARATOR + entry.getKey() + + AGGREGATION_ID_POSTFIX); + result.setType(resultEntityType); + entry.getValue().aggregateAllTo(result); + resultEntities.addEntity(result); + } + return resultEntities; + } + + private class RealTimeAggregator implements Runnable { + + @Override public void run() { + LOG.debug("Real-time aggregating"); + try { + TimelineCollectorContext context = getTimelineEntityContext(); + TimelineEntities entities = TimelineCollector.aggregateInternal( + aggregationGroups, context.toString(), + TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString()); + writer.write(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowVersion(), + context.getFlowRunId(), context.getAppId(), entities); + } catch (Exception e) { + LOG.error("Error aggregating timeline metrics", e); + } + LOG.debug("Real-time aggregation complete"); + } + } + + // Note: In memory aggregation is performed in an eventually consistent + // fashion. + private static class AggregationStatus { + // On aggregation, for each metric, aggregate all per-entity accumulated + // metrics + private Map aggregateTable; + + public AggregationStatus() { + aggregateTable = new ConcurrentHashMap<>(); + } + + public void update(TimelineEntity incoming) { + String entityId = incoming.getId(); + for (TimelineMetric m : incoming.getMetrics()) { + // Update aggregateTable + AggregateTableRow aggrRow = aggregateTable.get(m); + if (aggrRow == null) { + aggrRow = new AggregateTableRow(); + aggregateTable.put(m, aggrRow); + } + aggrRow.getPerEntityMetrics().put(entityId, m); + } + } + + public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e) { + if (metric.getRealtimeAggregationOp() + .equals(RealTimeAggregationOperation.NOP)) { + return e; + } + AggregateTableRow aggrRow = aggregateTable.get(metric); + if (aggrRow != null) { + TimelineMetric aggrMetric = new TimelineMetric(); + aggrMetric.setId(metric.getId() + AGGREGATION_ID_POSTFIX); + Map status = new HashMap<>(); + for (TimelineMetric m : aggrRow.getPerEntityMetrics().values()) { + TimelineMetric.accumulateTo(m, aggrMetric, status); + if (!m.getRealtimeAggregationOp() + .equals(aggrMetric.getRealtimeAggregationOp())) { + aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + } + } + // Replace the metric for both the row and the entity + synchronized (aggrRow) { + aggrRow.setAggrMetric(aggrMetric); + } + Set metrics = e.getMetrics(); + metrics.remove(aggrMetric); + metrics.add(aggrMetric); + } + return e; + } + + public TimelineEntity aggregateAllTo(TimelineEntity e) { + for (TimelineMetric m : aggregateTable.keySet()) { + aggregateTo(m, e); + } + return e; + } + + // Right now not using it since the synchronization overhead could be + // significant. + public TimelineEntity updateThenAggregateTo(TimelineEntity incoming, + TimelineEntity e) { + String entityId = incoming.getId(); + for (TimelineMetric m : incoming.getMetrics()) { + // Update aggregateTable + AggregateTableRow aggrRow = aggregateTable.get(m); + if (aggrRow == null) { + aggrRow = new AggregateTableRow(); + aggregateTable.put(m, aggrRow); + } + // Do aggregation + TimelineMetric prev = aggrRow.getPerEntityMetrics().get(entityId); + Map prevState = new HashMap<>(); + if (prev != null) { + prevState.put(RealTimeAggregationOperation.PREV_METRIC_STATE_KEY, + prev); + } + TimelineMetric aggrMetric = TimelineMetric.accumulateTo(m, + aggrRow.getAggrMetric(), prevState); + // Update aggregation table and entity + synchronized (aggrRow) { + aggrRow.setAggrMetric(aggrMetric); + } + aggrRow.getPerEntityMetrics().put(entityId, m); + Set metrics = e.getMetrics(); + metrics.remove(aggrMetric); + metrics.add(aggrMetric); + } + return e; + } + } + + private static class AggregateTableRow { + private Map perEntityMetrics; + private TimelineMetric aggrMetric; + public AggregateTableRow() { + perEntityMetrics = new ConcurrentHashMap<>(); + } + + public Map getPerEntityMetrics() { + return perEntityMetrics; + } + + public TimelineMetric getAggrMetric() { + return aggrMetric; + } + + public void setAggrMetric(TimelineMetric m) { + aggrMetric = m; + } + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 0b9549b..af21836 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.timelineservice.reader; +import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.Locale; import java.util.Set; import java.util.TimeZone; @@ -47,6 +49,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineAggregationBasis; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index b75007d..7c872a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -380,16 +380,22 @@ private void storeMetrics(byte[] rowKey, Set metrics, Long timestamp = timeseriesEntry.getKey(); if (isApplication) { ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + metricColumnQualifier, getMetricPostfix(metric), + timestamp, timeseriesEntry.getValue()); } else { EntityColumnPrefix.METRIC.store(rowKey, entityTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + metricColumnQualifier, getMetricPostfix(metric), + timestamp, timeseriesEntry.getValue()); } } } } } + private String getMetricPostfix(TimelineMetric metric) { + return metric.getRealtimeAggregationOp().toString(); + } + /** * Stores the events from the {@linkplain TimelineEvent} object. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java index f0b1e47..6a1e086 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java @@ -24,5 +24,5 @@ * */ public enum TimelineAggregationTrack { - FLOW, USER, QUEUE + APP, FLOW, USER, QUEUE } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 9120f3d..a28ef4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -171,6 +171,25 @@ public void store(byte[] rowKey, Long timestamp, Object inputValue, Attribute...attributes) throws IOException { + store(rowKey, tableMutator, qualifier, null, timestamp, inputValue, + attributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + String columnPostfix, Long timestamp, Object inputValue, + Attribute...attributes) + throws IOException { + // Null check if (qualifier == null) { throw new IOException("Cannot store column with null qualifier in " @@ -179,6 +198,11 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); + if (columnPostfix != null) { + columnQualifier = Separator.VALUES.join(columnQualifier, + Bytes.toBytes(columnPostfix)); + } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index b5fc214..0823a11 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -318,7 +318,9 @@ public static boolean isApplicationFinished(TimelineEntity te) { * @return true if input is an ApplicationEntity, false otherwise */ public static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()) + || te.getType().equals( + TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString()); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index f3c7e7f..019f302 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -145,6 +145,15 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, Long timestamp, Object inputValue, Attribute... attributes) throws IOException { + store(rowKey, tableMutator, qualifier, null, timestamp, inputValue, + attributes); + } + + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + String columnPostfix, Long timestamp, Object inputValue, + Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -154,6 +163,11 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); + if (columnPostfix != null) { + columnQualifier = Separator.VALUES.join(columnQualifier, + Bytes.toBytes(columnPostfix)); + } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 281e901..4a50796 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; /** * The base class for reading and deserializing timeline entities from the @@ -268,7 +272,22 @@ protected void readMetrics(TimelineEntity entity, Result result, for (Map.Entry> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); + Collection tokens = + Separator.VALUES.splitEncoded(metricResult.getKey()); + if (tokens.size() != 1 && tokens.size() != 2) { + throw new IOException( + "Invalid metric column name: " + metricResult.getKey()); + } + Iterator idItr = tokens.iterator(); + String id = idItr.next(); + // By default, do not do any real time aggregation if the field is missing + RealTimeAggregationOperation op = RealTimeAggregationOperation.NOP; + if (tokens.size() == 2) { + String postfix = idItr.next(); + op = RealTimeAggregationOperation.valueOf(postfix); + } + metric.setId(id); + metric.setRealtimeAggregationOp(op); // Simply assume that if the value set contains more than 1 elements, the // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric metric.setType(metricResult.getValue().size() > 1 ? diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java new file mode 100644 index 0000000..a02f20d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class TestTimelineCollector { + + private TimelineEntities generateTestEntities(int groups, int entities) { + TimelineEntities te = new TimelineEntities(); + for (int j = 0; j < groups; j++) { + for (int i = 0; i < entities; i++) { + TimelineEntity entity = new TimelineEntity(); + String containerId = "container_1000178881110_2002_" + i; + entity.setId(containerId); + String entityType = "TEST_" + j; + entity.setType(entityType); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("HDFS_BYTES_WRITE"); + m1.setRealtimeAggregationOp(RealTimeAggregationOperation.SUM); + long ts = System.currentTimeMillis(); + m1.addValue(ts - 20000, 100L); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId("VCORES_USED"); + m2.setRealtimeAggregationOp(RealTimeAggregationOperation.SUM); + m2.addValue(ts - 20000, 3L); + metrics.add(m2); + + // m3 should not show up in the aggregation + TimelineMetric m3 = new TimelineMetric(); + m3.setId("UNRELATED_VALUES"); + m3.addValue(ts - 20000, 3L); + metrics.add(m3); + + TimelineMetric m4 = new TimelineMetric(); + m4.setId("TXN_FINISH_TIME"); + m4.setRealtimeAggregationOp(RealTimeAggregationOperation.MAX); + m4.addValue(ts - 20000, i); + metrics.add(m4); + + entity.addMetrics(metrics); + te.addEntity(entity); + } + } + + return te; + } + + @Test + public void testAggregation() throws Exception { + int groups = 3; + int n = 50; + TimelineEntities testEntities = generateTestEntities(groups, n); + TimelineEntities resultEntities = TimelineCollector.aggregateMetrics( + testEntities, "test_result", "TEST_AGGR"); + assertEquals(resultEntities.getEntities().size(), groups); + + for (int i = 0; i < groups; i++) { + TimelineEntity result = resultEntities.getEntities().get(i); + Set metrics = result.getMetrics(); + assertEquals(metrics.size(), 3); + for (TimelineMetric m : metrics) { + switch (m.getId()) { + case "HDFS_BYTES_WRITE": + assertEquals(100 * n, m.retrieveSingleDataValue()); + break; + case "VCORES_USED": + assertEquals(3 * n, m.retrieveSingleDataValue()); + break; + case "TXN_FINISH_TIME": + assertEquals(n, m.retrieveSingleDataValue()); + break; + } + } + } + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 5ce7d3b..8d65c78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -25,11 +25,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; @@ -51,6 +55,26 @@ public void testWriteEntityToFile() throws Exception { entity.setCreatedTime(1425016501000L); te.addEntity(entity); + TimelineMetric metric = new TimelineMetric(); + String metricId = "CPU"; + metric.setId(metricId); + metric.setType(TimelineMetric.Type.SINGLE_VALUE); + metric.setRealtimeAggregationOp(RealTimeAggregationOperation.SUM); + metric.addValue(1425016501000L, 1234567L); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "metric"; + String type2 = "app"; + entity2.setId(id2); + entity2.setType(type2); + entity2.setCreatedTime(1425016503000L); + entity2.addMetric(metric); + te.addEntity(entity2); + + Map aggregatedMetrics = + new HashMap(); + aggregatedMetrics.put(metricId, metric); + FileSystemTimelineWriterImpl fsi = null; try { fsi = new FileSystemTimelineWriterImpl(); @@ -68,11 +92,27 @@ public void testWriteEntityToFile() throws Exception { assertTrue(f.exists() && !f.isDirectory()); List data = Files.readAllLines(path, StandardCharsets.UTF_8); // ensure there's only one entity + 1 new line - assertTrue(data.size() == 2); + assertTrue("data size is:" + data.size(), data.size() == 2); String d = data.get(0); // confirm the contents same as what was written assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + // verify aggregated metrics + String fileName2 = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + + type2 + "/" + id2 + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path2 = Paths.get(fileName2); + File file = new File(fileName2); + assertTrue(file.exists() && !file.isDirectory()); + List data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue("data size is:" + data.size(), data2.size() == 2); + String metricToString = data2.get(0); + // confirm the contents same as what was written + assertEquals(metricToString, + TimelineUtils.dumpTimelineRecordtoJSON(entity2)); + // delete the directory File outputDir = new File(fsi.getOutputRoot()); FileUtils.deleteDirectory(outputDir); @@ -84,4 +124,5 @@ public void testWriteEntityToFile() throws Exception { } } } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 4e07ecf..d2ff068 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.RealTimeAggregationOperation; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -489,6 +490,26 @@ public void testWriteApplicationToHBase() throws Exception { metrics.add(m1); entity.addMetrics(metrics); + // add aggregated metrics + TimelineEntity aggEntity = new TimelineEntity(); + String type = TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString(); + aggEntity.setId(appId); + aggEntity.setType(type); + long cTime2 = 1425016502000L; + long mTime2 = 1425026902000L; + aggEntity.setCreatedTime(cTime2); + + TimelineMetric aggMetric = new TimelineMetric(); + aggMetric.setId("MEM_USAGE"); + Map aggMetricValues = new HashMap(); + ts = System.currentTimeMillis(); + aggMetricValues.put(ts - 120000, 102400000); + aggMetric.setType(Type.SINGLE_VALUE); + aggMetric.setRealtimeAggregationOp(RealTimeAggregationOperation.SUM); + aggMetric.setValues(aggMetricValues); + Set aggMetrics = new HashSet<>(); + aggMetrics.add(aggMetric); + entity.addMetrics(aggMetrics); te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; @@ -514,7 +535,7 @@ public void testWriteApplicationToHBase() throws Exception { Result result = new ApplicationTable().getResult(c1, conn, get); assertTrue(result != null); - assertEquals(15, result.size()); + assertEquals(16, result.size()); // check the row key byte[] row1 = result.getRow(); @@ -573,7 +594,9 @@ public void testWriteApplicationToHBase() throws Exception { NavigableMap> metricsResult = ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); - NavigableMap metricMap = metricsResult.get(m1.getId()); + NavigableMap metricMap = metricsResult.get( + Separator.VALUES.joinEncoded(m1.getId(), + m1.getRealtimeAggregationOp().toString())); matchMetrics(metricValues, metricMap); // read the timeline entity using the reader this time @@ -602,10 +625,17 @@ public void testWriteApplicationToHBase() throws Exception { assertEquals(conf, conf2); Set metrics2 = e1.getMetrics(); - assertEquals(metrics, metrics2); + assertEquals(2, metrics2.size()); for (TimelineMetric metric2 : metrics2) { Map metricValues2 = metric2.getValues(); - matchMetrics(metricValues, metricValues2); + assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") || + metric2.getId().equals("MEM_USAGE")); + if (metric2.getId().equals("MAP_SLOT_MILLIS")) { + matchMetrics(metricValues, metricValues2); + } + if (metric2.getId().equals("MEM_USAGE")) { + matchMetrics(aggMetricValues, metricValues2); + } } } finally { if (hbi != null) { @@ -674,7 +704,6 @@ public void testWriteEntityToHBase() throws Exception { m1.setValues(metricValues); metrics.add(m1); entity.addMetrics(metrics); - te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; @@ -768,7 +797,9 @@ public void testWriteEntityToHBase() throws Exception { NavigableMap> metricsResult = EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); - NavigableMap metricMap = metricsResult.get(m1.getId()); + NavigableMap metricMap = metricsResult.get( + Separator.VALUES.joinEncoded(m1.getId(), + m1.getRealtimeAggregationOp().toString())); matchMetrics(metricValues, metricMap); } }