From 41473947bd6ae5637c71e4da93ddad5ed0a840cb Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Wed, 1 Apr 2020 10:11:41 +0800 Subject: [PATCH] YARN-10217. Expired SampleStat should ignore when generating SlowPeersReport --- .../metrics2/lib/MutableRollingAverages.java | 30 +++++++++++++++ .../datanode/TestDataNodePeerMetrics.java | 38 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java index 6803d11d1ca..84a058ad0a2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java @@ -266,6 +266,12 @@ public void close() throws IOException { double totalSum = 0; long totalCount = 0; + LinkedBlockingDeque deque = entry.getValue(); + + if (deque.size() == numWindows && checkAllMembersEqual(deque)) { + continue; + } + for (final SumAndCount sumAndCount : entry.getValue()) { totalCount += sumAndCount.getCount(); totalSum += sumAndCount.getSum(); @@ -277,4 +283,28 @@ public void close() throws IOException { } return stats; } + + /** + * If all members in deque are the same, that means an new metrics hasn't been + * generated in the last of {@link MutableRollingAverages#NUM_WINDOWS_DEFAULT} + * *{@link MutableRollingAverages#WINDOW_SIZE_MS_DEFAULT} ms, because the last + * metrics has filled the whole deque.The metrics storing in deque should + * consider to be expired. + * + * @return true if all members are the same. + */ + private static boolean checkAllMembersEqual( + LinkedBlockingDeque deque) { + if (deque.isEmpty()) { + return true; + } + SumAndCount first = deque.getFirst(); + for (final SumAndCount sumAndCount : deque) { + if (sumAndCount.getCount() != first.getCount() + || sumAndCount.getSum() != first.getSum()) { + return false; + } + } + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 3caf24d83fc..2cd939ca8eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -24,10 +24,13 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.metrics2.lib.MetricsTestHelper; +import org.apache.hadoop.metrics2.lib.MutableRollingAverages; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.conf.Configuration; import org.junit.Test; import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -80,6 +83,41 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception { } } + @Test(timeout = 30000) + public void testRemoveExpiredInfo() throws Exception { + final int numWindows = 5; + final long scheduleInterval = 1000; + final int iterations = 3; + final int numSamples = 1000; + + final Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); + + final DataNodePeerMetrics peerMetrics = + DataNodePeerMetrics.create("Sample-DataNode", conf); + MutableRollingAverages rollingAverages = + peerMetrics.getSendPacketDownstreamRollingAverages(); + MetricsTestHelper.replaceRollingAveragesScheduler(rollingAverages, + numWindows, scheduleInterval, TimeUnit.MILLISECONDS); + + for (int i = 1; i <= iterations; i++) { + final String peerAddr = genPeerAddress(); + for (int j = 1; j <= numSamples; j++) { + /* simulate to get latency of 1 to 1000 ms */ + final long latency = ThreadLocalRandom.current().nextLong(1, 1000); + peerMetrics.addSendPacketDownstream(peerAddr, latency); + } + } + + GenericTestUtils.waitFor( + () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000); + assertEquals(3, rollingAverages.getStats(numSamples).size()); + /* waiting expired info to be removed */ + GenericTestUtils.waitFor( + () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000); + assertEquals(0, rollingAverages.getStats(numSamples).size()); + } + /** * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801]. */ -- 2.20.1