diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java index 0d59322aa4..0fb22c0097 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java @@ -19,6 +19,9 @@ package org.apache.jackrabbit.oak.segment; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MINUTES; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import org.apache.jackrabbit.oak.segment.file.Scheduler; /** * A simple tracker for the source of commits (writes) in @@ -36,19 +40,29 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; * currently waiting on the commit semaphore * * - * This class delegates thread-safety to its underlying state variables. + * This class delegates thread-safety to its underlying state variables. */ class CommitsTracker { private volatile boolean collectStackTraces; + private Map commitsCountPerThreadGroupLastMinute = new HashMap<>(); + private final String[] threadGroups; private final ConcurrentMap queuedWritersMap; - private final ConcurrentMap commitsCountMap; + private final ConcurrentMap commitsCountPerThreadGroup; + private final ConcurrentMap commitsCountOtherThreads; + + private final Scheduler commitsTrackerScheduler = new Scheduler("CommitsTracker background tasks"); - CommitsTracker(int commitsCountMapMaxSize, boolean collectStackTraces) { + CommitsTracker(String[] threadGroups, int commitsCountMapMaxSize, boolean collectStackTraces) { + this.threadGroups = threadGroups; this.collectStackTraces = collectStackTraces; - this.commitsCountMap = new ConcurrentLinkedHashMap.Builder() + this.commitsCountPerThreadGroup = new ConcurrentHashMap<>(); + this.commitsCountOtherThreads = new ConcurrentLinkedHashMap.Builder() .maximumWeightedCapacity(commitsCountMapMaxSize).build(); this.queuedWritersMap = new ConcurrentHashMap<>(); + + commitsTrackerScheduler.scheduleWithFixedDelay(format("TarMK commits tracker stats resetter"), 1, MINUTES, + this::resetStatistics); } public void trackQueuedCommitOf(Thread t) { @@ -67,19 +81,56 @@ class CommitsTracker { } public void trackExecutedCommitOf(Thread t) { - commitsCountMap.compute(t.getName(), (w, v) -> v == null ? 1 : v + 1); + String group = findGroupFor(t); + + if (group.equals("other")) { + commitsCountOtherThreads.compute(t.getName(), (w, v) -> v == null ? 1 : v + 1); + } + + commitsCountPerThreadGroup.compute(group, (w, v) -> v == null ? 1 : v + 1); + } + + private String findGroupFor(Thread t) { + if (threadGroups == null) { + return "other"; + } + + for (String group : threadGroups) { + if (t.getName().matches(group)) { + return group; + } + } + + return "other"; + } + + private void resetStatistics() { + commitsCountPerThreadGroupLastMinute = new HashMap<>(commitsCountPerThreadGroup); + commitsCountPerThreadGroup.clear(); + commitsCountOtherThreads.clear(); } public void setCollectStackTraces(boolean flag) { this.collectStackTraces = flag; } + public void close() { + commitsTrackerScheduler.close(); + } + public Map getQueuedWritersMap() { return new HashMap<>(queuedWritersMap); } - public Map getCommitsCountMap() { - return new HashMap<>(commitsCountMap); + public Map getCommitsCountPerGroupLastMinute() { + return new HashMap<>(commitsCountPerThreadGroupLastMinute); } + public Map getCommitsCountOthers() { + return new HashMap<>(commitsCountOtherThreads); + } + + Map getCommitsCountPerGroup() { + return new HashMap<>(commitsCountPerThreadGroup); + } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java index 1f9f4a8589..41201e0e0b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java @@ -61,11 +61,12 @@ public class SegmentNodeStoreStats implements SegmentNodeStoreStatsMBean, Segmen private volatile CommitsTracker commitsTracker; private boolean collectStackTraces = DEFAULT_COLLECT_STACK_TRACES; private int commitsCountMapMaxSize = DEFAULT_COMMITS_COUNT_MAP_SIZE; + private String[] threadGroups; public SegmentNodeStoreStats(StatisticsProvider statisticsProvider) { this.statisticsProvider = statisticsProvider; - this.commitsTracker = new CommitsTracker(commitsCountMapMaxSize, collectStackTraces); + this.commitsTracker = new CommitsTracker(threadGroups, commitsCountMapMaxSize, collectStackTraces); this.commitsCount = statisticsProvider.getMeter(COMMITS_COUNT, StatsOptions.DEFAULT); this.commitQueueSize = statisticsProvider.getCounterStats(COMMIT_QUEUE_SIZE, StatsOptions.DEFAULT); this.commitTime = statisticsProvider.getTimer(COMMIT_TIME, StatsOptions.DEFAULT); @@ -117,24 +118,35 @@ public class SegmentNodeStoreStats implements SegmentNodeStoreStatsMBean, Segmen } @Override - public TabularData getCommitsCountPerWriter() throws OpenDataException { - CompositeType commitsPerWriterRowType = new CompositeType("commitsPerWriter", "commitsPerWriter", - new String[] { "count", "writerName" }, new String[] { "count", "writerName" }, + public TabularData getCommitsCountPerGroupLastMinute() throws OpenDataException { + return createTabularDataFromCountMap(commitsTracker.getCommitsCountPerGroupLastMinute(), "commitsPerWriterGroup", + "writerGroup"); + } + + @Override + public TabularData getCommitsCountOtherThreads() throws OpenDataException { + return createTabularDataFromCountMap(commitsTracker.getCommitsCountOthers(), "commitsPerWriter", + "writerName"); + } + + private TabularData createTabularDataFromCountMap(Map commitsCountMap, String typeName, + String writerDescription) throws OpenDataException { + CompositeType commitsPerWriterRowType = new CompositeType(typeName, typeName, + new String[] { "count", writerDescription }, new String[] { "count", writerDescription }, new OpenType[] { SimpleType.LONG, SimpleType.STRING }); - TabularDataSupport tabularData = new TabularDataSupport(new TabularType("commitsPerWriter", - "Most active writers", commitsPerWriterRowType, new String[] { "writerName" })); + TabularDataSupport tabularData = new TabularDataSupport(new TabularType(typeName, "Most active writers", + commitsPerWriterRowType, new String[] { writerDescription })); - Map commitsCountMap = commitsTracker.getCommitsCountMap(); if (commitsCountMap.isEmpty()) { commitsCountMap.put("N/A", 0L); } - + commitsCountMap.entrySet().stream() .sorted(Comparator.> comparingLong(Entry::getValue).reversed()).map(e -> { Map m = new HashMap<>(); m.put("count", e.getValue()); - m.put("writerName", e.getKey()); + m.put(writerDescription, e.getKey()); return m; }).map(d -> mapToCompositeData(commitsPerWriterRowType, d)).forEach(tabularData::put); @@ -176,13 +188,28 @@ public class SegmentNodeStoreStats implements SegmentNodeStoreStatsMBean, Segmen return collectStackTraces; } + @Override public int getCommitsCountMapMaxSize() { return commitsCountMapMaxSize; } + @Override public void setCommitsCountMapMaxSize(int commitsCountMapMaxSize) { this.commitsCountMapMaxSize = commitsCountMapMaxSize; - commitsTracker = new CommitsTracker(commitsCountMapMaxSize, collectStackTraces); + commitsTracker.close(); + commitsTracker = new CommitsTracker(threadGroups, commitsCountMapMaxSize, collectStackTraces); + } + + @Override + public String[] getThreadGroups() { + return threadGroups; + } + + @Override + public void setThreadGroups(String[] threadGroups) { + this.threadGroups = threadGroups; + commitsTracker.close(); + commitsTracker = new CommitsTracker(threadGroups, commitsCountMapMaxSize, collectStackTraces); } private TimeSeries getTimeSeries(String name) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java index 51ddb51b42..ca7f3f3321 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java @@ -47,10 +47,18 @@ public interface SegmentNodeStoreStatsMBean { CompositeData getQueuingTimes(); /** - * @return tabular data of the form + * @return tabular data of the form collected + * in the last minute * @throws OpenDataException if data is not available */ - TabularData getCommitsCountPerWriter() throws OpenDataException; + TabularData getCommitsCountPerGroupLastMinute() throws OpenDataException; + + /** + * @return tabular data of the form for threads + * not included in groups + * @throws OpenDataException if data is not available + */ + TabularData getCommitsCountOtherThreads() throws OpenDataException; /** * @return tabular data of the form for each writer @@ -72,7 +80,8 @@ public interface SegmentNodeStoreStatsMBean { boolean isCollectStackTraces(); /** - * Modifies the maximum number of writing threads to be recorded. + * Modifies the maximum number of writing threads to be recorded for + * threads not included in groups.. * Changing the default value will reset the overall collection process. * * @param commitsCountMapSize the new size @@ -83,4 +92,16 @@ public interface SegmentNodeStoreStatsMBean { * @return maximum number of writing threads to be recorded */ int getCommitsCountMapMaxSize(); + + /** + * @return current groups used for grouping collected threads. + */ + String[] getThreadGroups(); + + /** + * Modifies the groups used for grouping collected threads. + * Changing the default value will reset the overall collection process. + * @param threadGroups + */ + void setThreadGroups(String[] threadGroups); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java index 067042ea50..4dd800e730 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java @@ -21,11 +21,13 @@ package org.apache.jackrabbit.oak.segment; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.junit.Test; @@ -52,8 +54,8 @@ public class CommitsTrackerTest { } @Test - public void testSizeConstraints() throws InterruptedException { - CommitsTracker commitsTracker = new CommitsTracker(10, false); + public void testCommitsCountOthers() throws InterruptedException { + CommitsTracker commitsTracker = new CommitsTracker(new String[] {}, 10, false); ExecutorService executorService = newFixedThreadPool(30); final CountDownLatch addLatch = new CountDownLatch(25); @@ -78,11 +80,11 @@ public class CommitsTrackerTest { } addLatch.await(); - Map commitsCountMap = commitsTracker.getCommitsCountMap(); + Map commitsCountOthersMap = commitsTracker.getCommitsCountOthers(); Map queuedWritersMap = commitsTracker.getQueuedWritersMap(); - assertTrue(commitsCountMap.size() >= 10); - assertTrue(commitsCountMap.size() < 20); + assertTrue(commitsCountOthersMap.size() >= 10); + assertTrue(commitsCountOthersMap.size() < 20); assertEquals(5, queuedWritersMap.size()); CountDownLatch removeLatch = new CountDownLatch(5); @@ -97,4 +99,38 @@ public class CommitsTrackerTest { new ExecutorCloser(executorService).close(); } } + + @Test + public void testCommitsCountPerGroup() throws InterruptedException { + String[] groups = new String[] { "Thread-1.*", "Thread-2.*", "Thread-3.*" }; + CommitsTracker commitsTracker = new CommitsTracker(groups, 10, false); + ExecutorService executorService = newFixedThreadPool(30); + AtomicInteger counter = new AtomicInteger(10); + final CountDownLatch latch = new CountDownLatch(30); + + Runnable executedCommitTask = () -> { + Thread.currentThread().setName("Thread-" + counter.getAndIncrement()); + commitsTracker.trackExecutedCommitOf(Thread.currentThread()); + latch.countDown(); + }; + + try { + for (int i = 0; i < 30; i++) { + executorService.submit(executedCommitTask); + } + + latch.await(); + + Map commitsCountPerGroup = commitsTracker.getCommitsCountPerGroup(); + assertEquals(3, commitsCountPerGroup.size()); + + for (String group : groups) { + Long groupCount = commitsCountPerGroup.get(group); + assertNotNull(groupCount); + assertEquals(10, (long) groupCount); + } + } finally { + new ExecutorCloser(executorService).close(); + } + } }