From 125287639b0c093887b777e4267420bbd295d23f Mon Sep 17 00:00:00 2001 From: Andrey Gura Date: Wed, 1 Apr 2015 21:02:20 +0300 Subject: [PATCH] ignite-366 Metrics for caches should work in clustered mode --- .../main/java/org/apache/ignite/IgniteCache.java | 9 + .../java/org/apache/ignite/cache/CacheMetrics.java | 7 + .../managers/discovery/GridDiscoveryManager.java | 23 ++ .../processors/cache/CacheMetricsImpl.java | 10 +- .../processors/cache/CacheMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheMetricsSnapshot.java | 237 +++++++++++++++++++- .../processors/cache/IgniteCacheProxy.java | 26 +++ .../spi/discovery/DiscoveryMetricsProvider.java | 10 + .../spi/discovery/tcp/TcpClientDiscoverySpi.java | 29 ++- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 45 +++- .../discovery/tcp/internal/TcpDiscoveryNode.java | 73 ++++++- .../tcp/messages/TcpDiscoveryHeartbeatMessage.java | 65 ++++++ .../cache/CacheMetricsForClusterGroupSelfTest.java | 239 +++++++++++++++++++++ .../junits/spi/GridSpiAbstractTest.java | 6 + .../IgniteCacheMetricsSelfTestSuite.java | 4 + 15 files changed, 766 insertions(+), 22 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 5c5bb25..cc0805e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; @@ -514,6 +515,14 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS public CacheMetrics metrics(); /** + * Gets snapshot metrics for caches in cluster group. + * + * @param grp Cluster group. + * @return Cache metrics. + */ + public CacheMetrics metrics(ClusterGroup grp); + + /** * Gets MxBean for this cache. * * @return MxBean. diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 0d87326..dad0ddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -147,6 +147,13 @@ public interface CacheMetrics { public String name(); /** + * Gets ID of this cache. + * + * @return Cache ID. + */ + public int id(); + + /** * Gets number of entries that was swapped to disk. * * @return Number of entries that was swapped to disk. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 04ff423..76a3dce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; @@ -644,6 +646,27 @@ public class GridDiscoveryManager extends GridManagerAdapter { return nm; } + + /** {@inheritDoc} */ + @Override public Map cacheMetrics() { + Collection> caches = ctx.cache().internalCaches(); + + if (F.isEmpty(caches)) + return Collections.emptyMap(); + + Map metrics = null; + + for (GridCacheAdapter cache : caches) { + if (cache.configuration().isStatisticsEnabled()) { + if (metrics == null) + metrics = U.newHashMap(caches.size()); + + metrics.put(cache.context().cacheId(), cache.metrics()); + } + } + + return metrics == null ? Collections.emptyMap() : metrics; + } }; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index deebab4..8d9d02b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -110,6 +110,10 @@ public class CacheMetricsImpl implements CacheMetrics { this.delegate = delegate; } + /** {@inheritDoc} */ + @Override public int id() { + return cctx.cacheId(); + } /** {@inheritDoc} */ @Override public String name() { @@ -353,9 +357,8 @@ public class CacheMetricsImpl implements CacheMetrics { long misses0 = misses.get(); long reads0 = reads.get(); - if (misses0 == 0) { + if (misses0 == 0) return 0; - } return (float) misses0 / reads0 * 100.0f; } @@ -468,9 +471,8 @@ public class CacheMetricsImpl implements CacheMetrics { txCommits.incrementAndGet(); commitTimeNanos.addAndGet(duration); - if (delegate != null) { + if (delegate != null) delegate.onTxCommit(duration); - } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java index e9d547c..3dd206b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java @@ -39,6 +39,11 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public int id() { + return cache.context().cacheId(); + } + + /** {@inheritDoc} */ @Override public String name() { return cache.metrics0().name(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 0391f4e..5ed7c73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; +import java.util.*; + /** * Metrics snapshot. */ -class CacheMetricsSnapshot implements CacheMetrics { +public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + /** Number of reads. */ private long reads = 0; @@ -63,6 +69,9 @@ class CacheMetricsSnapshot implements CacheMetrics { /** Commit transaction time taken nanos. */ private float rollbackAvgTimeNanos = 0; + /** Cache ID. */ + private int id; + /** Cache name */ private String cacheName; @@ -178,6 +187,13 @@ class CacheMetricsSnapshot implements CacheMetrics { private boolean isWriteThrough; /** + * Default constructor. + */ + public CacheMetricsSnapshot() { + // No-op. + } + + /** * Create snapshot for given metrics. * * @param m Cache metrics. @@ -198,6 +214,7 @@ class CacheMetricsSnapshot implements CacheMetrics { commitAvgTimeNanos = m.getAverageTxCommitTime(); rollbackAvgTimeNanos = m.getAverageTxRollbackTime(); + id = m.id(); cacheName = m.name(); overflowSize = m.getOverflowSize(); offHeapEntriesCount = m.getOffHeapEntriesCount(); @@ -239,6 +256,132 @@ class CacheMetricsSnapshot implements CacheMetrics { isWriteThrough = m.isWriteThrough(); } + /** + * Constructs merged cache metrics. + * + * @param loc Metrics for cache on local node. + * @param metrics Metrics for merge. + */ + public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) { + id = loc.id(); + cacheName = loc.name(); + isEmpty = loc.isEmpty(); + isWriteBehindEnabled = loc.isWriteBehindEnabled(); + writeBehindFlushSize = loc.getWriteBehindFlushSize(); + writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount(); + writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency(); + writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize(); + writeBehindBufferSize = loc.getWriteBehindBufferSize(); + size = loc.getSize(); + keySize = loc.getKeySize(); + + keyType = loc.getKeyType(); + valueType = loc.getValueType(); + isStoreByValue = loc.isStoreByValue(); + isStatisticsEnabled = loc.isStatisticsEnabled(); + isManagementEnabled = loc.isManagementEnabled(); + isReadThrough = loc.isReadThrough(); + isWriteThrough = loc.isWriteThrough(); + + for (CacheMetrics e : metrics) { + reads += e.getCacheGets(); + puts += e.getCachePuts(); + hits += e.getCacheHits(); + misses += e.getCacheHits(); + txCommits += e.getCacheTxCommits(); + txRollbacks += e.getCacheTxRollbacks(); + evicts += e.getCacheEvictions(); + removes += e.getCacheRemovals(); + + putAvgTimeNanos += e.getAveragePutTime(); + getAvgTimeNanos += e.getAverageGetTime(); + removeAvgTimeNanos += e.getAverageRemoveTime(); + commitAvgTimeNanos += e.getAverageTxCommitTime(); + rollbackAvgTimeNanos += e.getAverageTxRollbackTime(); + + if (e.getOverflowSize() > -1) + overflowSize += e.getOverflowSize(); + else + overflowSize = -1; + + offHeapEntriesCount += e.getOffHeapEntriesCount(); + offHeapAllocatedSize += e.getOffHeapAllocatedSize(); + + if (e.getDhtEvictQueueCurrentSize() > -1) + dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + else + dhtEvictQueueCurrentSize = -1; + + txThreadMapSize += e.getTxThreadMapSize(); + txXidMapSize += e.getTxXidMapSize(); + txCommitQueueSize += e.getTxCommitQueueSize(); + txPrepareQueueSize += e.getTxPrepareQueueSize(); + txStartVersionCountsSize += e.getTxStartVersionCountsSize(); + txCommittedVersionsSize += e.getTxCommittedVersionsSize(); + txRolledbackVersionsSize += e.getTxRolledbackVersionsSize(); + + if (e.getTxDhtThreadMapSize() > -1) + txDhtThreadMapSize += e.getTxDhtThreadMapSize(); + else + txDhtThreadMapSize = -1; + + if (e.getTxDhtXidMapSize() > -1) + txDhtXidMapSize += e.getTxDhtXidMapSize(); + else + txDhtXidMapSize = -1; + + if (e.getTxDhtCommitQueueSize() > -1) + txDhtCommitQueueSize += e.getTxDhtCommitQueueSize(); + else + txDhtCommitQueueSize = -1; + + if (e.getTxDhtPrepareQueueSize() > -1) + txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize(); + else + txDhtPrepareQueueSize = -1; + + if (e.getTxDhtStartVersionCountsSize() > -1) + txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); + else + txDhtStartVersionCountsSize = -1; + + if (e.getTxDhtCommittedVersionsSize() > -1) + txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); + else + txDhtCommittedVersionsSize = -1; + + if (e.getTxDhtRolledbackVersionsSize() > -1) + txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize(); + else + txDhtRolledbackVersionsSize = -1; + + if (e.getWriteBehindTotalCriticalOverflowCount() > -1) + writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount(); + else + writeBehindTotalCriticalOverflowCount = -1; + + if (e.getWriteBehindCriticalOverflowCount() > -1) + writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount(); + else + writeBehindCriticalOverflowCount = -1; + + if (e.getWriteBehindErrorRetryCount() > -1) + writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount(); + else + writeBehindErrorRetryCount = -1; + } + + int size = metrics.size(); + + if (size > 1) { + putAvgTimeNanos /= size; + getAvgTimeNanos /= size; + removeAvgTimeNanos /= size; + commitAvgTimeNanos /= size; + rollbackAvgTimeNanos /= size; + } + } + /** {@inheritDoc} */ @Override public long getCacheHits() { return hits; @@ -259,9 +402,8 @@ class CacheMetricsSnapshot implements CacheMetrics { /** {@inheritDoc} */ @Override public float getCacheMissPercentage() { - if (misses == 0 || reads == 0) { + if (misses == 0 || reads == 0) return 0; - } return (float) misses / reads * 100.0f; } @@ -327,6 +469,11 @@ class CacheMetricsSnapshot implements CacheMetrics { } /** {@inheritDoc} */ + @Override public int id() { + return id; + } + + /** {@inheritDoc} */ @Override public long getOverflowSize() { return overflowSize; } @@ -515,4 +662,88 @@ class CacheMetricsSnapshot implements CacheMetrics { @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + + out.writeLong(reads); + out.writeLong(puts); + out.writeLong(hits); + out.writeLong(misses); + out.writeLong(txCommits); + out.writeLong(txRollbacks); + out.writeLong(evicts); + out.writeLong(removes); + + out.writeFloat(putAvgTimeNanos); + out.writeFloat(getAvgTimeNanos); + out.writeFloat(removeAvgTimeNanos); + out.writeFloat(commitAvgTimeNanos); + out.writeFloat(rollbackAvgTimeNanos); + + out.writeLong(overflowSize); + out.writeLong(offHeapEntriesCount); + out.writeLong(offHeapAllocatedSize); + out.writeInt(dhtEvictQueueCurrentSize); + out.writeInt(txThreadMapSize); + out.writeInt(txXidMapSize); + out.writeInt(txCommitQueueSize); + out.writeInt(txPrepareQueueSize); + out.writeInt(txStartVersionCountsSize); + out.writeInt(txCommittedVersionsSize); + out.writeInt(txRolledbackVersionsSize); + out.writeInt(txDhtThreadMapSize); + out.writeInt(txDhtXidMapSize); + out.writeInt(txDhtCommitQueueSize); + out.writeInt(txDhtPrepareQueueSize); + out.writeInt(txDhtStartVersionCountsSize); + out.writeInt(txDhtCommittedVersionsSize); + out.writeInt(txDhtRolledbackVersionsSize); + out.writeInt(writeBehindTotalCriticalOverflowCount); + out.writeInt(writeBehindCriticalOverflowCount); + out.writeInt(writeBehindErrorRetryCount); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + + reads = in.readLong(); + puts = in.readLong(); + hits = in.readLong(); + misses = in.readLong(); + txCommits = in.readLong(); + txRollbacks = in.readLong(); + evicts = in.readLong(); + removes = in.readLong(); + + putAvgTimeNanos = in.readFloat(); + getAvgTimeNanos = in.readFloat(); + removeAvgTimeNanos = in.readFloat(); + commitAvgTimeNanos = in.readFloat(); + rollbackAvgTimeNanos = in.readFloat(); + + overflowSize = in.readLong(); + offHeapEntriesCount = in.readLong(); + offHeapAllocatedSize = in.readLong(); + dhtEvictQueueCurrentSize = in.readInt(); + txThreadMapSize = in.readInt(); + txXidMapSize = in.readInt(); + txCommitQueueSize = in.readInt(); + txPrepareQueueSize = in.readInt(); + txStartVersionCountsSize = in.readInt(); + txCommittedVersionsSize = in.readInt(); + txRolledbackVersionsSize = in.readInt(); + txDhtThreadMapSize = in.readInt(); + txDhtXidMapSize = in.readInt(); + txDhtCommitQueueSize = in.readInt(); + txDhtPrepareQueueSize = in.readInt(); + txDhtStartVersionCountsSize = in.readInt(); + txDhtCommittedVersionsSize = in.readInt(); + txDhtRolledbackVersionsSize = in.readInt(); + writeBehindTotalCriticalOverflowCount = in.readInt(); + writeBehindCriticalOverflowCount = in.readInt(); + writeBehindErrorRetryCount = in.readInt(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index dfc3ef4..dda1c7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -140,6 +141,31 @@ public class IgniteCacheProxy extends AsyncSupportAdapter prev = gate.enter(prj); + + try { + List metrics = new ArrayList<>(grp.nodes().size()); + + for (ClusterNode node : grp.nodes()) { + Map nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); + + if (nodeCacheMetrics != null) { + CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); + + if (e != null) + metrics.add(e); + } + } + + return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { GridCacheProjectionImpl prev = gate.enter(prj); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java index 4a03278..c2bdc53 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java @@ -17,9 +17,12 @@ package org.apache.ignite.spi.discovery; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.tostring.*; +import java.util.*; + /** * Provides metrics to discovery SPI. It is responsibility of discovery SPI * to make sure that all nodes have updated metrics data about each other. @@ -36,4 +39,11 @@ public interface DiscoveryMetricsProvider { * @return Up to date metrics data about local node. */ public ClusterMetrics metrics(); + + /** + * Returns metrics data about all caches on local node. + * + * @return metrics data about all caches on local node. + */ + public Map cacheMetrics(); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index bf69efb..5d8a285 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -1067,7 +1068,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp Socket sock0 = sock; if (sock0 != null) { - msg.setMetrics(getLocalNodeId(), metricsProvider.metrics()); + UUID nodeId = ignite.configuration().getNodeId(); + + msg.setMetrics(nodeId, metricsProvider.metrics()); + + msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics()); try { writeToSocket(sock0, msg); @@ -1094,16 +1099,21 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp log.debug("Received heartbeat response: " + msg); } else { - if (msg.hasMetrics()) { - long tstamp = U.currentTimeMillis(); + long tstamp = U.currentTimeMillis(); + if (msg.hasMetrics()) { for (Map.Entry e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + MetricsSet metricsSet = e.getValue(); - updateMetrics(e.getKey(), metricsSet.metrics(), tstamp); + Map cacheMetrics = msg.hasCacheMetrics() ? + msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), tstamp); + updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); } } } @@ -1151,16 +1161,23 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** * @param nodeId Node ID. * @param metrics Metrics. + * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) { + private void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tstamp) + { assert nodeId != null; assert metrics != null; + assert cacheMetrics != null; TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); if (node != null && node.visible()) { node.setMetrics(metrics); + node.setCacheMetrics(cacheMetrics); node.lastUpdateTime(tstamp); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index bad8837..1707666 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.DiscoveryEvent; @@ -4240,7 +4241,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - if (locNodeId.equals(msg.creatorNodeId()) && !msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) { + if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) { if (log.isDebugEnabled()) log.debug("Discarding heartbeat message that has made two passes: " + msg); @@ -4252,21 +4253,27 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (spiStateCopy() == CONNECTED) { if (msg.hasMetrics()) { for (Map.Entry e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + MetricsSet metricsSet = e.getValue(); - updateMetrics(e.getKey(), metricsSet.metrics(), tstamp); + Map cacheMetrics = msg.hasCacheMetrics() ? + msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), tstamp); + updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); } } } if (ring.hasRemoteNodes()) { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || - !msg.hasMetrics(locNodeId)) && spiStateCopy() == CONNECTED) { + !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. msg.setMetrics(locNodeId, metricsProvider.metrics()); + msg.setCacheMetrics(locNodeId, metricsProvider.cacheMetrics()); for (Map.Entry e : clientMsgWorkers.entrySet()) { UUID nodeId = e.getKey(); @@ -4280,7 +4287,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } else { // Message is on its second ring. - msg.removeMetrics(locNodeId); + removeMetrics(msg, locNodeId); Collection clientNodeIds = msg.clientNodeIds(); @@ -4313,16 +4320,23 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * @param nodeId Node ID. * @param metrics Metrics. + * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) { + private void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tstamp) + { assert nodeId != null; assert metrics != null; + assert cacheMetrics != null; TcpDiscoveryNode node = ring.node(nodeId); if (node != null) { node.setMetrics(metrics); + node.setCacheMetrics(cacheMetrics); node.lastUpdateTime(tstamp); @@ -4333,6 +4347,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** + * @param msg Message. + */ + private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); + } + + /** * Processes discard message and discards previously registered pending messages. * * @param msg Discard message. @@ -5114,9 +5135,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (hbMsg.creatorNodeId().equals(nodeId)) { metrics = hbMsg.metrics().get(nodeId).metrics(); - hbMsg.removeMetrics(nodeId); + removeMetrics(hbMsg, nodeId); assert !hbMsg.hasMetrics(); + assert !hbMsg.hasCacheMetrics(); } } @@ -5162,4 +5184,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.closeQuiet(sock); } } + + /** + * @param msg Message. + * @param nodeId Node ID. + */ + private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + msg.removeMetrics(nodeId); + msg.removeCacheMetrics(nodeId); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 450dd8c..50555e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -17,8 +17,10 @@ package org.apache.ignite.spi.discovery.tcp.internal; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -73,6 +75,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste @GridToStringExclude private volatile ClusterMetrics metrics; + /** Node cache metrics. */ + @GridToStringExclude + private volatile Map cacheMetrics; + /** Node order in the topology. */ private volatile long order; @@ -192,8 +198,13 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { - if (metricsProvider != null) - metrics = metricsProvider.metrics(); + if (metricsProvider != null) { + ClusterMetrics metrics0 = metricsProvider.metrics(); + + metrics = metrics0; + + return metrics0; + } return metrics; } @@ -210,6 +221,39 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** + * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated + * and provide up to date information about caches. + *

+ * Cache metrics are updated with some delay which is directly related to heartbeat + * frequency. For example, when used with default + * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds. + * + * @return Runtime metrics snapshots for this node. + */ + public Map cacheMetrics() { + if (metricsProvider != null) { + Map cacheMetrics0 = metricsProvider.cacheMetrics(); + + cacheMetrics = cacheMetrics0; + + return cacheMetrics0; + } + + return cacheMetrics; + } + + /** + * Sets node cache metrics. + * + * @param cacheMetrics Cache metrics. + */ + public void setCacheMetrics(Map cacheMetrics) { + assert cacheMetrics != null; + + this.cacheMetrics = cacheMetrics; + } + + /** * @return Internal order. */ public long internalOrder() { @@ -397,8 +441,11 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste U.writeCollection(out, hostNames); out.writeInt(discPort); + // Cluster metrics byte[] mtr = null; + ClusterMetrics metrics = this.metrics; + if (metrics != null) { mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE]; @@ -407,6 +454,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste U.writeByteArray(out, mtr); + // Cache metrics + Map cacheMetrics = this.cacheMetrics; + + out.writeInt(cacheMetrics == null ? 0 : cacheMetrics.size()); + + if (!F.isEmpty(cacheMetrics)) + for (Map.Entry m : cacheMetrics.entrySet()) + out.writeObject(m.getValue()); + out.writeLong(order); out.writeLong(intOrder); out.writeObject(ver); @@ -426,11 +482,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste consistentId = U.consistentId(addrs, discPort); + // Cluster metrics byte[] mtr = U.readByteArray(in); if (mtr != null) metrics = ClusterMetricsSnapshot.deserialize(mtr, 0); + // Cache metrics + int size = in.readInt(); + + Map cacheMetrics = + size > 0 ? U.newHashMap(size) : Collections.emptyMap(); + + for (int i = 0; i < size; i++) { + CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject(); + + cacheMetrics.put(m.id(), m); + } + order = in.readLong(); intOrder = in.readLong(); ver = (IgniteProductVersion)in.readObject(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 8b07ba4..bafde9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; @@ -52,6 +53,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** Client node IDs. */ private final Collection clientNodeIds = new HashSet<>(); + /** Cahce metrics by node. */ + @GridToStringExclude + private final Map> cacheMetrics = new HashMap<>(); + + /** + * Public default no-arg constructor for {@link Externalizable} interface. + */ + public TcpDiscoveryHeartbeatMessage() { + // No-op. + } + /** * Constructor. * @@ -76,6 +88,21 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Sets cache metrics for particular node. + * + * @param nodeId Node ID. + * @param metrics Node cache metrics. + */ + public void setCacheMetrics(UUID nodeId, Map metrics) { + assert nodeId != null; + assert metrics != null; + assert !this.cacheMetrics.containsKey(nodeId); + + if (!F.isEmpty(metrics)) + this.cacheMetrics.put(nodeId, metrics); + } + + /** * Sets metrics for a client node. * * @param nodeId Server node ID. @@ -103,6 +130,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Removes cache metrics for particular node from the message. + * + * @param nodeId Node ID. + */ + public void removeCacheMetrics(UUID nodeId) { + assert nodeId != null; + + cacheMetrics.remove(nodeId); + } + + /** * Gets metrics map. * * @return Metrics map. @@ -112,6 +150,15 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * Gets cache metrics map. + * + * @return Cache metrics map. + */ + public Map> cacheMetrics() { + return cacheMetrics; + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics() { @@ -119,6 +166,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @return {@code True} this message contains cache metrics. + */ + public boolean hasCacheMetrics() { + return !cacheMetrics.isEmpty(); + } + + /** * @return {@code True} if this message contains metrics. */ public boolean hasMetrics(UUID nodeId) { @@ -128,6 +182,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** + * @param nodeId Node ID. + * + * @return {@code True} if this message contains cache metrics for particular node. + */ + public boolean hasCacheMetrics(UUID nodeId) { + assert nodeId != null; + + return cacheMetrics.get(nodeId) != null; + } + + /** * Gets client node IDs for particular node. * * @return Client node IDs. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java new file mode 100644 index 0000000..98800ba --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Test for cluster wide cache metrics. + */ +public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Cache 1 name. */ + private static final String CACHE1 = "cache1"; + + /** Cache 2 name. */ + private static final String CACHE2 = "cache2"; + + /** Entry count cache 1. */ + private static final int ENTRY_CNT_CACHE1 = 1000; + + /** Entry count cache 2. */ + private static final int ENTRY_CNT_CACHE2 = 500; + + /** Cache 1. */ + private IgniteCache cache1; + + /** Cache 2. */ + private IgniteCache cache2; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Test cluster group metrics in case of statistics enabled. + */ + public void testMetricsStatisticsEnabled() throws Exception { + createCaches(true); + + populateCacheData(cache1, ENTRY_CNT_CACHE1); + populateCacheData(cache2, ENTRY_CNT_CACHE2); + + readCacheData(cache1, ENTRY_CNT_CACHE1); + readCacheData(cache2, ENTRY_CNT_CACHE2); + + awaitMetricsUpdate(); + + Collection nodes = grid(0).cluster().forRemotes().nodes(); + + for (ClusterNode node : nodes) { + Map metrics = ((TcpDiscoveryNode) node).cacheMetrics(); + assertNotNull(metrics); + assertFalse(metrics.isEmpty()); + } + + assertMetrics(cache1); + assertMetrics(cache2); + + closeCaches(); + } + + /** + * Test cluster group metrics in case of statistics disabled. + */ + public void testMetricsStatisticsDisabled() throws Exception { + createCaches(false); + + populateCacheData(cache1, ENTRY_CNT_CACHE1); + populateCacheData(cache2, ENTRY_CNT_CACHE2); + + readCacheData(cache1, ENTRY_CNT_CACHE1); + readCacheData(cache2, ENTRY_CNT_CACHE2); + + awaitMetricsUpdate(); + + Collection nodes = grid(0).cluster().forRemotes().nodes(); + + for (ClusterNode node : nodes) { + Map metrics = ((TcpDiscoveryNode) node).cacheMetrics(); + assertNotNull(metrics); + assertTrue(metrics.isEmpty()); + } + + assertMetrics(cache1); + assertMetrics(cache2); + + closeCaches(); + } + + /** + * @param statisticsEnabled Statistics enabled. + */ + private void createCaches(boolean statisticsEnabled) { + CacheConfiguration ccfg1 = defaultCacheConfiguration(); + ccfg1.setName(CACHE1); + ccfg1.setStatisticsEnabled(statisticsEnabled); + + CacheConfiguration ccfg2 = defaultCacheConfiguration(); + ccfg2.setName(CACHE2); + ccfg2.setStatisticsEnabled(statisticsEnabled); + + cache1 = grid(0).getOrCreateCache(ccfg1); + cache2 = grid(0).getOrCreateCache(ccfg2); + } + + /** + * Closes caches. + */ + private void closeCaches() { + cache1.close(); + cache2.close(); + } + + /** + * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved. + */ + private void awaitMetricsUpdate() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(GRID_CNT * 2); + + IgnitePredicate lsnr = new IgnitePredicate() { + @Override public boolean apply(Event ignore) { + latch.countDown(); + + return true; + } + }; + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().localListen(lsnr, EVT_NODE_METRICS_UPDATED); + + latch.await(); + } + + /** + * @param cache Cache. + * @param cnt Count. + */ + private void populateCacheData(IgniteCache cache, int cnt) { + for (int i = 0; i < cnt; i++) + cache.put(i, i); + } + + /** + * @param cache Cache. + * @param cnt Count. + */ + private void readCacheData(IgniteCache cache, int cnt) { + for (int i = 0; i < cnt; i++) + cache.get(i); + } + + /** + * @param cache Cache. + */ + private void assertMetrics(IgniteCache cache) { + CacheMetrics[] ms = new CacheMetrics[GRID_CNT]; + + for (int i = 0; i < GRID_CNT; i++) { + CacheMetrics metrics = cache.metrics(grid(i).cluster().forCacheNodes(cache.getName())); + + for (int j = 0; j < GRID_CNT; j++) + ms[j] = grid(j).cache(cache.getName()).metrics(); + + // Static metrics + for (int j = 0; j < GRID_CNT; j++) + assertEquals(metrics.id(), ms[j].id()); + + for (int j = 0; j < GRID_CNT; j++) + assertEquals(metrics.name(), ms[j].name()); + + // Dynamic metrics + assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheGets(); + } + })); + + assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCachePuts(); + } + })); + + assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheHits(); + } + })); + } + } + + /** + * @param ms Milliseconds. + * @param f Function. + */ + private long sum(CacheMetrics[] ms, IgniteClosure f) { + long res = 0; + + for (int i = 0; i < GRID_CNT; i++) + res += f.apply(ms[i]); + + return res; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java index 7898c3d..e0acde9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.testframework.junits.spi; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -348,6 +349,11 @@ public abstract class GridSpiAbstractTest extends GridAbstr return new DiscoveryMetricsProvider() { /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); } + + /** {@inheritDoc} */ + @Override public Map cacheMetrics() { + return Collections.emptyMap(); + } }; } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index 511afec..1adf55f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.local.*; @@ -47,6 +48,9 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class); suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class); + // Cluster wide metrics. + suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class); + return suite; } } -- 1.9.5.msysgit.0