From 9f9e2694c176ed8cb60b2b73debea75b5d8812f7 Mon Sep 17 00:00:00 2001 From: Dave Harvey Date: Mon, 17 Sep 2018 18:11:44 -0400 Subject: [PATCH] # mxbeans_threads2 --- .../org/apache/ignite/internal/IgniteKernal.java | 19 +- ...niteStripedThreadPoolExecutorMXBeanAdapter.java | 286 +++++++++++++++++++++ .../IgniteStripedThreadPoolExecutorMXBean.java | 48 ++++ .../thread/IgniteStripedThreadPoolExecutor.java | 21 +- 4 files changed, 371 insertions(+), 3 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/IgniteStripedThreadPoolExecutorMXBeanAdapter.java create mode 100644 modules/core/src/main/java/org/apache/ignite/mxbean/IgniteStripedThreadPoolExecutorMXBean.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index d74b3aafd7..b6557906ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -198,6 +198,7 @@ import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.IgniteMXBean; +import org.apache.ignite.mxbean.IgniteStripedThreadPoolExecutorMXBean; import org.apache.ignite.mxbean.StripedExecutorMXBean; import org.apache.ignite.mxbean.ThreadPoolMXBean; import org.apache.ignite.mxbean.TransactionMetricsMxBean; @@ -4187,9 +4188,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { registerExecutorMBean("GridClassLoadingExecutor", p2pExecSvc); registerExecutorMBean("GridManagementExecutor", mgmtExecSvc); registerExecutorMBean("GridIgfsExecutor", igfsExecSvc); - registerExecutorMBean("GridDataStreamExecutor", dataStreamExecSvc); registerExecutorMBean("GridAffinityExecutor", affExecSvc); - registerExecutorMBean("GridCallbackExecutor", callbackExecSvc); registerExecutorMBean("GridQueryExecutor", qryExecSvc); registerExecutorMBean("GridSchemaExecutor", schemaExecSvc); @@ -4206,6 +4205,22 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { new StripedExecutorMXBeanAdapter(stripedExecSvc), StripedExecutorMXBean.class); } + + if (dataStreamExecSvc != null) { + // striped executor uses a custom adapter + registerMBean("Thread Pools", + "GridDataStreamExecutor", + new StripedExecutorMXBeanAdapter(dataStreamExecSvc), + StripedExecutorMXBean.class); + } + + if (callbackExecSvc != null) { + // striped executor uses a custom adapter + registerMBean("Thread Pools", + "GridCallbackExecutor", + new IgniteStripedThreadPoolExecutorMXBeanAdapter(callbackExecSvc), + IgniteStripedThreadPoolExecutorMXBean.class); + } if (customExecSvcs != null) { for (Map.Entry entry : customExecSvcs.entrySet()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteStripedThreadPoolExecutorMXBeanAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteStripedThreadPoolExecutorMXBeanAdapter.java new file mode 100644 index 0000000000..7cc84505c1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteStripedThreadPoolExecutorMXBeanAdapter.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.ignite.internal.util.StripedExecutor; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.mxbean.IgniteStripedThreadPoolExecutorMXBean; +import org.apache.ignite.mxbean.MXBeanDescription; +import org.apache.ignite.mxbean.StripedExecutorMXBean; +import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; + +/** + * Adapter for {@link IgniteStripedThreadPoolExecutorMXBean} which delegates all method calls to the underlying + * {@link ExecutorService} instance. + */ +public class IgniteStripedThreadPoolExecutorMXBeanAdapter implements IgniteStripedThreadPoolExecutorMXBean { + /** */ + private final IgniteStripedThreadPoolExecutor exec; + + /** + * @param exec Executor service + */ + IgniteStripedThreadPoolExecutorMXBeanAdapter(IgniteStripedThreadPoolExecutor exec) { + assert exec != null; + + this.exec = exec; + } + + /** {@inheritDoc} */ + @Override public int getStripesCount() { + return exec.stripes(); + } + + /** {@inheritDoc} */ + @Override public int getActiveCount() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getActiveCount(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public long getCompletedTaskCount() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getCompletedTaskCount(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public int getCorePoolSize() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getCorePoolSize(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public int getLargestPoolSize() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + if (count < a.getLargestPoolSize()) { + count = a.getLargestPoolSize(); + } + } + + return count; + } + + /** {@inheritDoc} */ + @Override public int getMaximumPoolSize() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + if (count < a.getMaximumPoolSize()) { + count = a.getMaximumPoolSize(); + } + } + + return count; + } + + /** {@inheritDoc} */ + @Override public int getPoolSize() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getPoolSize(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public long getTaskCount() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getTaskCount(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public int getQueueSize() { + int count = 0; + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + count += a.getQueueSize(); + } + + return count; + } + + /** {@inheritDoc} */ + @Override public long getKeepAliveTime() { + return new ThreadPoolMXBeanAdapter(exec.stripe(0)).getKeepAliveTime(); + } + + /** {@inheritDoc} */ + @Override public boolean isShutdown() { + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + if (!a.isShutdown()) { + return false; + } + } + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isTerminated() { + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + if (!a.isTerminated()) { + return false; + } + } + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isTerminating() { + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + if (a.isTerminating()) { + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String getRejectedExecutionHandlerClass() { + return new ThreadPoolMXBeanAdapter(exec.stripe(0)).getRejectedExecutionHandlerClass(); + } + + /** {@inheritDoc} */ + @Override public String getThreadFactoryClass() { + return new ThreadPoolMXBeanAdapter(exec.stripe(0)).getThreadFactoryClass(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteStripedThreadPoolExecutorMXBeanAdapter.class, this, super.toString()); + } + + @MXBeanDescription("Number of completed tasks per stripe.") + public long[] getStripesCompletedTasksCounts() { + long [] result = new long[exec.stripes()]; + + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + result[i] = a.getCompletedTaskCount(); + } + + return result; + } + + /** + * @return Number of active tasks per stripe. + */ + @MXBeanDescription("Number of active tasks per stripe.") + public int[] getStripesActiveCount() { + int [] result = new int[exec.stripes()]; + + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + result[i] = a.getActiveCount(); + } + + return result; + } + + /** + * @return Size of queue per stripe. + */ + @MXBeanDescription("Size of queue per stripe.") + public int[] getStripesQueueSizes() { + int [] result = new int[exec.stripes()]; + + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + result[i] = a.getQueueSize(); + } + + return result; + } + + /** + * @return Size of queue per stripe. + */ + @MXBeanDescription("Size of queue per stripe.") + public int[] getStripesLargestPoolSize() { + int [] result = new int[exec.stripes()]; + + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + result[i] = a.getLargestPoolSize(); + } + + return result; + } + + @Override + public boolean[] getStripesActiveStatuses() { + boolean [] result = new boolean[exec.stripes()]; + + for (int i = 0; i < exec.stripes(); i++) { + ThreadPoolMXBeanAdapter a = new ThreadPoolMXBeanAdapter(exec.stripe(i)); + + result[i] = a.getActiveCount() > 0 ; + } + + return result; + } +} + diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteStripedThreadPoolExecutorMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteStripedThreadPoolExecutorMXBean.java new file mode 100644 index 0000000000..892034d213 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteStripedThreadPoolExecutorMXBean.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +/** + * MBean that provides access to information about striped threadpool executor service. + */ +@MXBeanDescription("MBean that provides access to information about striped executor service.") +public interface IgniteStripedThreadPoolExecutorMXBean extends ThreadPoolMXBean { + + /** + * @return Stripes count. + */ + @MXBeanDescription("Stripes count.") + public int getStripesCount(); + + + @MXBeanDescription("Number of completed tasks per stripe.") + public long[] getStripesCompletedTasksCounts(); + + /** + * @return Number of active tasks per stripe. + */ + @MXBeanDescription("Number of active tasks per stripe.") + public boolean[] getStripesActiveStatuses(); + + /** + * @return Size of queue per stripe. + */ + @MXBeanDescription("Size of queue per stripe.") + public int[] getStripesQueueSizes(); + +} diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 418812f8a2..c52d56366d 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -53,7 +53,9 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd); for (int i = 0; i < concurrentLvl; i++) - execs[i] = Executors.newSingleThreadExecutor(factory); + // Don't use a single thread pool because it adds a level of obfuscation + // which is unnecessary and breaks the MXBean adapter. + execs[i] = Executors.newFixedThreadPool(1, factory); } /** @@ -175,4 +177,21 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { @Override public String toString() { return S.toString(IgniteStripedThreadPoolExecutor.class, this); } + + /** + * @return Stripes count. + */ + public int stripes() { + return execs.length; + } + + /** + * @return The executor service for stripe idx or null if out of range + */ + public ExecutorService stripe(int idx) { + if ( idx >= execs.length ) { + return null; + } + return execs[idx]; + } } -- 2.15.1