Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java (date 1537199542000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitor.java (date 1537199542000) @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.jackrabbit.oak.segment; + +import static com.google.common.collect.Sets.newConcurrentHashSet; +import static org.apache.jackrabbit.oak.stats.StatsOptions.METRICS_ONLY; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Set; + +import org.apache.jackrabbit.oak.stats.CounterStats; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.jetbrains.annotations.NotNull; + +/** + * This class exposes {@link CounterStats} for allocations and de-allocations + * of {@link ByteBuffer} instances: + * + *

+ * Users of this class call {@link #trackAllocation(ByteBuffer)} to update above statistics. + */ +public class SegmentBufferMonitor { + + /** + * Number of allocated direct byte buffers + */ + public static final String DIRECT_BUFFER_COUNT = "oak.segment.direct-buffer-count"; + + /** + * Total capacity of the allocated direct byte buffers. + */ + public static final String DIRECT_BUFFER_CAPACITY = "oak.segment.direct-buffer-capacity"; + + /** + * Number of allocated heap byte buffers + */ + public static final String HEAP_BUFFER_COUNT = "oak.segment.heap-buffer-count"; + + /** + * Total capacity of the allocated heap byte buffers. + */ + public static final String HEAP_BUFFER_CAPACITY = "oak.segment.heap-buffer-capacity"; + + @NotNull + private final Set buffers = newConcurrentHashSet(); + + @NotNull + private final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + + @NotNull + private final CounterStats directBufferCount; + + @NotNull + private final CounterStats directBufferCapacity; + + @NotNull + private final CounterStats heapBufferCount; + + @NotNull + private final CounterStats heapBufferCapacity; + + /** + * Create a new instance using the passed {@code statisticsProvider} to expose + * buffer allocations. + * @param statisticsProvider + */ + public SegmentBufferMonitor(@NotNull StatisticsProvider statisticsProvider) { + directBufferCount = statisticsProvider.getCounterStats(DIRECT_BUFFER_COUNT, METRICS_ONLY); + directBufferCapacity = statisticsProvider.getCounterStats(DIRECT_BUFFER_CAPACITY, METRICS_ONLY); + heapBufferCount = statisticsProvider.getCounterStats(HEAP_BUFFER_COUNT, METRICS_ONLY); + heapBufferCapacity = statisticsProvider.getCounterStats(HEAP_BUFFER_CAPACITY, METRICS_ONLY); + } + + private static class BufferReference extends WeakReference { + private final int capacity; + private final boolean isDirect; + + public BufferReference(@NotNull ByteBuffer buffer, + @NotNull ReferenceQueue queue) { + super(buffer, queue); + this.capacity = buffer.capacity(); + this.isDirect = buffer.isDirect(); + } + } + + /** + * Track the allocation of a {@code buffer} and update the exposed statistics. + * @param buffer + */ + public void trackAllocation(@NotNull ByteBuffer buffer) { + BufferReference reference = new BufferReference(buffer, referenceQueue); + buffers.add(reference); + allocated(reference); + trackDeallocations(); + } + + private void trackDeallocations() { + BufferReference reference = (BufferReference) referenceQueue.poll(); + while (reference != null) { + buffers.remove(reference); + deallocated(reference); + reference = (BufferReference) referenceQueue.poll(); + } + } + + private void allocated(@NotNull BufferReference reference) { + if (reference.isDirect) { + directBufferCount.inc(); + directBufferCapacity.inc(reference.capacity); + } else { + heapBufferCount.inc(); + heapBufferCapacity.inc(reference.capacity); + } + } + + private void deallocated(@NotNull BufferReference reference) { + if (reference.isDirect) { + directBufferCount.dec(); + directBufferCapacity.dec(reference.capacity); + } else { + heapBufferCount.dec(); + heapBufferCapacity.dec(reference.capacity); + } + } + +} Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (date 1537433404000) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (date 1537199542000) @@ -42,6 +42,7 @@ import org.apache.jackrabbit.oak.segment.SegmentIdFactory; import org.apache.jackrabbit.oak.segment.SegmentIdProvider; import org.apache.jackrabbit.oak.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.segment.SegmentBufferMonitor; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; @@ -119,6 +120,9 @@ }; + @NotNull + private final SegmentBufferMonitor segmentBufferMonitor; + protected final IOMonitor ioMonitor; AbstractFileStore(final FileStoreBuilder builder) { @@ -134,6 +138,7 @@ this.segmentReader = new CachingSegmentReader(this::getWriter, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize()); this.memoryMapping = builder.getMemoryMapping(); this.ioMonitor = builder.getIOMonitor(); + this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider()); } static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) { @@ -268,6 +273,7 @@ if (buffer == null) { throw new SegmentNotFoundException(id); } + segmentBufferMonitor.trackAllocation(buffer); return new Segment(tracker, segmentReader, id, buffer); } Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java (date 1537199542000) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferMonitorTest.java (date 1537199542000) @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.jackrabbit.oak.segment; + +import static com.google.common.collect.Maps.newHashMap; +import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_CAPACITY; +import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.DIRECT_BUFFER_COUNT; +import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_CAPACITY; +import static org.apache.jackrabbit.oak.segment.SegmentBufferMonitor.HEAP_BUFFER_COUNT; +import static org.apache.jackrabbit.oak.stats.SimpleStats.Type.COUNTER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.jackrabbit.api.stats.RepositoryStatistics; +import org.apache.jackrabbit.oak.stats.CounterStats; +import org.apache.jackrabbit.oak.stats.HistogramStats; +import org.apache.jackrabbit.oak.stats.MeterStats; +import org.apache.jackrabbit.oak.stats.SimpleStats; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.apache.jackrabbit.oak.stats.StatsOptions; +import org.apache.jackrabbit.oak.stats.TimerStats; +import org.junit.Test; + +public class SegmentBufferMonitorTest { + + private final Map stats = newHashMap(); + + private final SegmentBufferMonitor segmentBufferMonitor = new SegmentBufferMonitor(new StatisticsProvider() { + @Override + public RepositoryStatistics getStats() { + throw new IllegalStateException(); + } + + @Override + public MeterStats getMeter(String name, StatsOptions options) { + throw new IllegalStateException(); + } + + @Override + public CounterStats getCounterStats(String name, StatsOptions options) { + SimpleStats simpleStats = new SimpleStats(new AtomicLong(), COUNTER); + stats.put(name, simpleStats); + return simpleStats; + } + + @Override + public TimerStats getTimer(String name, StatsOptions options) { + throw new IllegalStateException(); + } + + @Override + public HistogramStats getHistogram(String name, StatsOptions options) { + throw new IllegalStateException(); + } + }); + + @Test + public void emptyStats() { + assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount()); + assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount()); + } + + @Test + public void heapBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(42); + segmentBufferMonitor.trackAllocation(buffer); + + assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount()); + assertEquals(1, stats.get(HEAP_BUFFER_COUNT).getCount()); + assertEquals(42, stats.get(HEAP_BUFFER_CAPACITY).getCount()); + + buffer = null; + System.gc(); + + assertEquals(0, stats.get(DIRECT_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(DIRECT_BUFFER_CAPACITY).getCount()); + assertTrue(stats.get(HEAP_BUFFER_COUNT).getCount() <= 1); + assertTrue(stats.get(HEAP_BUFFER_CAPACITY).getCount() <= 42); + } + + @Test + public void directBuffer() { + ByteBuffer buffer = ByteBuffer.allocateDirect(42); + segmentBufferMonitor.trackAllocation(buffer); + + assertEquals(1, stats.get(DIRECT_BUFFER_COUNT).getCount()); + assertEquals(42, stats.get(DIRECT_BUFFER_CAPACITY).getCount()); + assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount()); + + buffer = null; + System.gc(); + + assertTrue(stats.get(DIRECT_BUFFER_COUNT).getCount() <= 1); + assertTrue(stats.get(DIRECT_BUFFER_CAPACITY).getCount() <= 42); + assertEquals(0, stats.get(HEAP_BUFFER_COUNT).getCount()); + assertEquals(0, stats.get(HEAP_BUFFER_CAPACITY).getCount()); + } +}