diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 8d1386d..c7d2ddb 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -27,6 +27,7 @@ import static org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener.IGNORE_SNFE; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.FORCE_TIMEOUT_DEFAULT; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GAIN_THRESHOLD_DEFAULT; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.MEMORY_THRESHOLD_DEFAULT; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.PAUSE_DEFAULT; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.RETAINED_GENERATIONS_DEFAULT; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.RETRY_COUNT_DEFAULT; @@ -232,6 +233,15 @@ description = "Number of segment generations to retain." ) public static final String RETAINED_GENERATIONS = "compaction.retainedGenerations"; + + @Property( + intValue = MEMORY_THRESHOLD_DEFAULT, + label = "Compaction Memory Threshold", + description = "Set the available memory threshold beyond which revision gc will be canceled. " + + "Value represents a percentage so an input between 0 and 100 is expected. " + + "Setting this to 0 will disable the check." + ) + public static final String MEMORY_THRESHOLD = "compaction.memoryThreshold"; @Property( boolValue = false, @@ -624,10 +634,12 @@ byte gainThreshold = getGainThreshold(); long sizeDeltaEstimation = toLong(property(COMPACTION_SIZE_DELTA_ESTIMATION), SIZE_DELTA_ESTIMATION_DEFAULT); + int memoryThreshold = toInteger(property(MEMORY_THRESHOLD), MEMORY_THRESHOLD_DEFAULT); return new SegmentGCOptions(pauseCompaction, gainThreshold, retryCount, forceTimeout) .setRetainedGenerations(retainedGenerations) - .setGcSizeDeltaEstimation(sizeDeltaEstimation); + .setGcSizeDeltaEstimation(sizeDeltaEstimation) + .setMemoryThreshold(memoryThreshold); } private void unregisterNodeStore() { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java index 375740a..1b4be7b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java @@ -56,6 +56,11 @@ */ public static final long SIZE_DELTA_ESTIMATION_DEFAULT = -1; + /** + * Default value for {@link #getMemoryThreshold()} + */ + public static final int MEMORY_THRESHOLD_DEFAULT = 15; + private boolean paused = PAUSE_DEFAULT; private int gainThreshold = GAIN_THRESHOLD_DEFAULT; @@ -67,6 +72,8 @@ private int retainedGenerations = RETAINED_GENERATIONS_DEFAULT; private boolean offline = false; + + private int memoryThreshold = MEMORY_THRESHOLD_DEFAULT; private boolean ocBinDeduplication = Boolean .getBoolean("oak.segment.compaction.binaryDeduplication"); @@ -294,4 +301,18 @@ return this; } + /** + * Get the available memory threshold beyond which revision gc will be + * canceled. Setting this to 0 will disable the check. + * @return memoryThreshold + */ + public int getMemoryThreshold() { + return memoryThreshold; + } + + public SegmentGCOptions setMemoryThreshold(int memoryThreshold) { + this.memoryThreshold = memoryThreshold; + return this; + } + } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java index d01a83a..a61d8d1 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java @@ -152,4 +152,12 @@ @Nonnull String getStatus(); + /** + * Get the available memory threshold beyond which revision gc will be + * canceled. Setting this to 0 will disable the check. + * @return memoryThreshold + */ + int getMemoryThreshold(); + + void setMemoryThreshold(int memoryThreshold); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java index d9617f9..9c0cd61 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java @@ -28,7 +28,6 @@ import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor; -// FIXME OAK-4617: Align SegmentRevisionGC MBean with new generation based GC public class SegmentRevisionGCMBean extends AnnotatedStandardMBean implements SegmentRevisionGC { @@ -157,4 +156,14 @@ public String getStatus() { return fileStoreGCMonitor.getStatus(); } + + @Override + public int getMemoryThreshold() { + return gcOptions.getMemoryThreshold(); + } + + @Override + public void setMemoryThreshold(int memoryThreshold) { + gcOptions.setMemoryThreshold(memoryThreshold); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index 6da8aa6..cdb8a59 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -141,6 +141,12 @@ private final AtomicBoolean sufficientDiskSpace = new AtomicBoolean(true); /** + * This flag is raised whenever the available memory falls under a specified + * threshold. See {@link GCMemoryBarrier} + */ + private final AtomicBoolean sufficientMemory = new AtomicBoolean(true); + + /** * Flag signalling shutdown of the file store */ private volatile boolean shutdown; @@ -735,7 +741,8 @@ synchronized void run() throws IOException { gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet()); - Stopwatch watch = Stopwatch.createStarted(); + GCMemoryBarrier gcMemoryBarrier = new GCMemoryBarrier( + sufficientMemory, gcListener, GC_COUNT.get(), gcOptions); int gainThreshold = gcOptions.getGainThreshold(); boolean sufficientEstimatedGain = true; @@ -746,10 +753,12 @@ gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT); } else { gcListener.info("TarMK GC #{}: estimation started", GC_COUNT); + Stopwatch watch = Stopwatch.createStarted(); Supplier cancel = new CancelCompactionSupplier(FileStore.this); GCEstimation estimate = estimateCompactionGain(cancel); if (cancel.get()) { gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel); + gcMemoryBarrier.close(); return; } @@ -783,6 +792,7 @@ gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT); } } + gcMemoryBarrier.close(); } /** @@ -1235,6 +1245,10 @@ reason = "Not enough disk space"; return true; } + if (!store.sufficientMemory.get()) { + reason = "Not enough memory"; + return true; + } if (store.shutdown) { reason = "The FileStore is shutting down"; return true; diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCMemoryBarrier.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCMemoryBarrier.java new file mode 100644 index 0000000..3c7d8a6 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCMemoryBarrier.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file; + +import static java.lang.management.ManagementFactory.getMemoryMXBean; +import static java.lang.management.ManagementFactory.getMemoryPoolMXBeans; +import static java.lang.management.MemoryType.HEAP; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.MemoryNotificationInfo; +import java.lang.management.MemoryPoolMXBean; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nonnull; +import javax.management.ListenerNotFoundException; +import javax.management.Notification; +import javax.management.NotificationEmitter; +import javax.management.NotificationListener; + +import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; + +public class GCMemoryBarrier implements Closeable { + + // https://docs.oracle.com/javase/7/docs/api/java/lang/management/MemoryPoolMXBean.html#UsageThreshold + // http://www.javaspecialists.eu/archive/Issue092.html + + // TODO possibly add a min value to the percentage, ie. skip gc if available heap drops under 2GB + + @Nonnull + private final AtomicBoolean sufficientMemory; + + @Nonnull + private final GCListener gcListener; + + @Nonnull + private final SegmentGCOptions gcOptions; + + private final long gcCount; + + private final MemoryPoolMXBean pool; + private final NotificationEmitter emitter; + private final MemoryListener listener; + + public GCMemoryBarrier(@Nonnull AtomicBoolean sufficientMemory, + @Nonnull GCListener gcListener, long gcCount, + @Nonnull SegmentGCOptions gcOptions) { + this.sufficientMemory = sufficientMemory; + this.gcListener = gcListener; + this.gcOptions = gcOptions; + this.gcCount = gcCount; + + int percentage = gcOptions.getMemoryThreshold(); + if (percentage > 0) { + pool = getMemoryPool(); + } else { + pool = null; + } + if (pool != null) { + emitter = (NotificationEmitter) getMemoryMXBean(); + listener = new MemoryListener(); + emitter.addNotificationListener(listener, null, null); + long maxMemory = pool.getUsage().getMax(); + long required = (long) (maxMemory * percentage / 100); + gcListener + .info("TarMK GC #{}: setting up a listener to cancel compaction if available memory drops below {}%, {} ({} bytes).", + gcCount, percentage, + humanReadableByteCount(required), required); + + long warningThreshold = maxMemory - required; + long current = pool.getUsageThreshold(); + if (current > 0) { + warningThreshold = Math.min(warningThreshold, current); + } + pool.setUsageThreshold(warningThreshold); + + } else { + emitter = null; + listener = null; + } + checkMemory(); + } + + private MemoryPoolMXBean getMemoryPool() { + for (MemoryPoolMXBean pool : getMemoryPoolMXBeans()) { + if (HEAP == pool.getType() && pool.isUsageThresholdSupported()) { + return pool; + } + } + gcListener + .warn("TarMK GC #{}: Unable to setup monitoring of available memory.", + gcCount); + return null; + } + + private void checkMemory() { + if (pool == null) { + return; + } + int percentage = gcOptions.getMemoryThreshold(); + long maxMemory = pool.getUsage().getMax(); + long usedMemory = pool.getUsage().getUsed(); + long avail = maxMemory - usedMemory; + long required = (long) (maxMemory * percentage / 100); + + if (avail <= required) { + gcListener + .warn("TarMK GC #{}: canceling compaction because available memory level {} ({} bytes) is too low, expecting at least {} ({} bytes)", + gcCount, humanReadableByteCount(avail), avail, + humanReadableByteCount(required), required); + sufficientMemory.set(false); + } else { + gcListener + .info("TarMK GC #{}: available memory level {} ({} bytes) is good, expecting at least {} ({} bytes)", + gcCount, humanReadableByteCount(avail), avail, + humanReadableByteCount(required), required); + sufficientMemory.set(true); + } + } + + @Override + public void close() throws IOException { + if (emitter != null && listener != null) { + try { + emitter.removeNotificationListener(listener); + } catch (ListenerNotFoundException e) { + // + } + } + } + + private class MemoryListener implements NotificationListener { + public void handleNotification(Notification notification, + Object handback) { + String notifType = notification.getType(); + if (notifType + .equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) { + checkMemory(); + } + } + } +}