diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 0ad4cb8..2130486 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -658,11 +658,17 @@ public class SegmentNodeStoreService extends ProxyNodeStore + "compaction.sizeDeltaEstimation. For turning off estimation, the new property compaction.disableEstimation should be used."); } + //TODO turn into proper OSGi props + long gcLogCycle = Long.getLong("oak.segment.compaction.gcLogCycle", -1); + long gcSleepPerCycle = Long.getLong("oak.segment.compaction.gcSleepPerCycle", -1); + + return new SegmentGCOptions(pauseCompaction, retryCount, forceTimeout) .setRetainedGenerations(retainedGenerations) .setGcSizeDeltaEstimation(sizeDeltaEstimation) .setMemoryThreshold(memoryThreshold) - .setEstimationDisabled(disableEstimation); + .setEstimationDisabled(disableEstimation) + .withGCNodeWriteMonitor(gcLogCycle, gcSleepPerCycle); } private void unregisterNodeStore() { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java index 8507221..0fd2cd9 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java @@ -72,6 +72,7 @@ import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState; import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation; +import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff; @@ -118,6 +119,9 @@ public class SegmentWriter { new SynchronizedDescriptiveStatistics(NODE_WRITER_STATS_WINDOW); @Nonnull + private GCNodeWriteMonitor compactionMonitor = GCNodeWriteMonitor.EMPTY; + + @Nonnull private final SynchronizedDescriptiveStatistics nodeWriteTimeStats = new SynchronizedDescriptiveStatistics(NODE_WRITER_STATS_WINDOW); @@ -1033,6 +1037,7 @@ public class SegmentWriter { SegmentNodeState sns = (SegmentNodeState) state; nodeCache.put(sns.getStableId(), recordId, cost(sns)); nodeWriteStats.isCompactOp = true; + compactionMonitor.compacted(); } return recordId; } @@ -1285,4 +1290,8 @@ public class SegmentWriter { } } + public void setCompactionMonitor(GCNodeWriteMonitor compactionMonitor) { + this.compactionMonitor = compactionMonitor; + } + } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java index b980c35..1fe4ff6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java @@ -21,6 +21,8 @@ package org.apache.jackrabbit.oak.segment.compaction; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor; + /** * This class holds configuration options for segment store revision gc. */ @@ -89,6 +91,12 @@ public class SegmentGCOptions { "oak.segment.compaction.gcSizeDeltaEstimation", SIZE_DELTA_ESTIMATION_DEFAULT); + /** + * Responsible for monitoring progress of the online compaction, and + * providing progress tracking. + */ + private GCNodeWriteMonitor gcNodeWriteMonitor = GCNodeWriteMonitor.EMPTY; + public SegmentGCOptions(boolean paused, int retryCount, int forceTimeout) { this.paused = paused; this.retryCount = retryCount; @@ -317,4 +325,23 @@ public class SegmentGCOptions { this.estimationDisabled = disabled; return this; } + + /** + * Enables the GcWriteMonitor with the given params. + * @param gcLogCycle + * sets the log cycle to this value, disabled if set to + * {@code -1} + * @param gcSleepPerCycle + * introduce a sleep (ms) for each log cycle. disabled if set to + * {@code -1} + * @return this instance + */ + public SegmentGCOptions withGCNodeWriteMonitor(long gcLogCycle, long gcSleepPerCycle) { + this.gcNodeWriteMonitor = new GCNodeWriteMonitor(gcLogCycle, gcSleepPerCycle); + return this; + } + + public GCNodeWriteMonitor getGCNodeWriteMonitor() { + return gcNodeWriteMonitor; + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java index f18993d..f3a9aad 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java @@ -170,4 +170,26 @@ public interface SegmentRevisionGC { * @param memoryThreshold */ void setMemoryThreshold(int memoryThreshold); + + /** + * @return {@code true} if there is an online compaction cycle running + */ + boolean isCompactionRunning(); + + /** + * @return number of compacted nodes in the current cycle + */ + long getCompactedNodes(); + + /** + * @return number of estimated nodes to be compacted in the current cycle. + * Can be {@code -1} if the estimation can't be performed + */ + long getEstimatedTotal(); + + /** + * @return percentage of progress for the current compaction cycle. Can be + * {@code -1} if the estimation can't be performed. + */ + int getEstimatedPercentage(); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java index 0a24682..69611a5 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java @@ -171,4 +171,24 @@ public class SegmentRevisionGCMBean public void setMemoryThreshold(int memoryThreshold) { gcOptions.setMemoryThreshold(memoryThreshold); } + + @Override + public boolean isCompactionRunning() { + return gcOptions.getGCNodeWriteMonitor().isCompactionRunning(); + } + + @Override + public long getCompactedNodes() { + return gcOptions.getGCNodeWriteMonitor().getCompactedNodes(); + } + + @Override + public long getEstimatedTotal() { + return gcOptions.getGCNodeWriteMonitor().getEstimatedTotal(); + } + + @Override + public int getEstimatedPercentage() { + return gcOptions.getGCNodeWriteMonitor().getEstimatedPercentage(); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index 7145bd1..5ddea73 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -710,6 +710,9 @@ public class FileStore extends AbstractFileStore { private volatile boolean cancelled; + @Nonnull + private final GCNodeWriteMonitor compactionMonitor; + GarbageCollector( @Nonnull SegmentGCOptions gcOptions, @Nonnull GCListener gcListener, @@ -719,6 +722,7 @@ public class FileStore extends AbstractFileStore { this.gcListener = gcListener; this.gcJournal = gcJournal; this.cacheManager = cacheManager; + this.compactionMonitor = gcOptions.getGCNodeWriteMonitor(); } synchronized void run() throws IOException { @@ -777,6 +781,7 @@ public class FileStore extends AbstractFileStore { } gcMemoryBarrier.close(); } finally { + compactionMonitor.finished(); gcListener.updateStatus(IDLE.message()); } } @@ -830,6 +835,9 @@ public class FileStore extends AbstractFileStore { gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions); gcListener.updateStatus(COMPACTION.message()); + long initialSize = size(); + compactionMonitor.init(GC_COUNT.get(), gcJournal.read(), initialSize); + SegmentNodeState before = getHead(); Supplier cancel = new CancelCompactionSupplier(FileStore.this); SegmentWriter writer = segmentWriterBuilder("c") @@ -837,6 +845,7 @@ public class FileStore extends AbstractFileStore { .withGeneration(newGeneration) .withoutWriterPool() .build(FileStore.this); + writer.setCompactionMonitor(compactionMonitor); SegmentNodeState after = compact(before, writer, cancel); if (after == null) { @@ -1117,7 +1126,7 @@ public class FileStore extends AbstractFileStore { long finalSize = size(); long reclaimedSize = initialSize - afterCleanupSize; stats.reclaimed(reclaimedSize); - gcJournal.persist(reclaimedSize, finalSize, getGcGeneration()); + gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(), compactionMonitor.getCompactedNodes()); gcListener.cleaned(reclaimedSize, finalSize); gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" + " and space reclaimed {} ({} bytes).", diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java index 1406673..6c5c2c3 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCJournal.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * Persists the repository size and the reclaimed size following a cleanup * operation in the {@link #GC_JOURNAL gc journal} file with the format: - * 'repoSize, reclaimedSize, timestamp, gcGen'. + * 'repoSize, reclaimedSize, timestamp, gcGen, nodes compacted'. */ public class GCJournal { @@ -61,11 +61,18 @@ public class GCJournal { } /** - * Persists the repository size and the reclaimed size following a cleanup - * operation + * Persists the repository stats (current size, reclaimed size, gc + * generation, number of compacted nodes) following a cleanup operation for + * a successful compaction. NOOP if the gcGeneration is the same as the one + * persisted previously. + * + * @param reclaimedSize size reclaimed by cleanup + * @param repoSize current repo size + * @param gcGeneration gc generation + * @param nodes number of compacted nodes */ public synchronized void persist(long reclaimedSize, long repoSize, - int gcGeneration) { + int gcGeneration, long nodes) { GCJournalEntry current = read(); if (current.getGcGeneration() == gcGeneration) { // failed compaction, only update the journal if the generation @@ -73,7 +80,7 @@ public class GCJournal { return; } latest = new GCJournalEntry(repoSize, reclaimedSize, - System.currentTimeMillis(), gcGeneration); + System.currentTimeMillis(), gcGeneration, nodes); Path path = new File(directory, GC_JOURNAL).toPath(); try { try (BufferedWriter w = newBufferedWriter(path, UTF_8, WRITE, @@ -127,43 +134,43 @@ public class GCJournal { static class GCJournalEntry { - static GCJournalEntry EMPTY = new GCJournalEntry(-1, -1, -1, -1); + static GCJournalEntry EMPTY = new GCJournalEntry(-1, -1, -1, -1, -1); private final long repoSize; private final long reclaimedSize; private final long ts; private final int gcGeneration; + private final long nodes; public GCJournalEntry(long repoSize, long reclaimedSize, long ts, - int gcGeneration) { + int gcGeneration, long nodes) { this.repoSize = repoSize; this.reclaimedSize = reclaimedSize; this.ts = ts; this.gcGeneration = gcGeneration; + this.nodes = nodes; } @Override public String toString() { - return repoSize + "," + reclaimedSize + "," + ts + "," - + gcGeneration; + return repoSize + "," + reclaimedSize + "," + ts + "," + gcGeneration + "," + nodes; } static GCJournalEntry fromString(String in) { String[] items = in.split(","); - if (items.length == 3 || items.length == 4) { - long repoSize = safeParse(items[0]); - long reclaimedSize = safeParse(items[1]); - long ts = safeParse(items[2]); - int gcGen = -1; - if (items.length == 4) { - gcGen = (int) safeParse(items[3]); - } - return new GCJournalEntry(repoSize, reclaimedSize, ts, gcGen); - } - return GCJournalEntry.EMPTY; + long repoSize = safeParse(items, 0); + long reclaimedSize = safeParse(items, 1); + long ts = safeParse(items, 2); + int gcGen = (int) safeParse(items, 3); + long nodes = safeParse(items, 4); + return new GCJournalEntry(repoSize, reclaimedSize, ts, gcGen, nodes); } - private static long safeParse(String in) { + private static long safeParse(String[] items, int index) { + if (items.length < index - 1) { + return -1; + } + String in = items[index]; try { return Long.parseLong(in); } catch (NumberFormatException ex) { @@ -172,29 +179,48 @@ public class GCJournal { return -1; } + /** + * Returns the repository size + */ public long getRepoSize() { return repoSize; } + /** + * Returns the reclaimed size + */ public long getReclaimedSize() { return reclaimedSize; } + /** + * Returns the timestamp + */ public long getTs() { return ts; } + /** + * Returns the gc generation + */ public int getGcGeneration() { return gcGeneration; } + /** + * Returns the number of compacted nodes + */ + public long getNodes() { + return nodes; + } + @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + gcGeneration; - result = prime * result - + (int) (reclaimedSize ^ (reclaimedSize >>> 32)); + result = prime * result + (int) (nodes ^ (nodes >>> 32)); + result = prime * result + (int) (reclaimedSize ^ (reclaimedSize >>> 32)); result = prime * result + (int) (repoSize ^ (repoSize >>> 32)); result = prime * result + (int) (ts ^ (ts >>> 32)); return result; @@ -211,6 +237,8 @@ public class GCJournal { GCJournalEntry other = (GCJournalEntry) obj; if (gcGeneration != other.gcGeneration) return false; + if (nodes != other.nodes) + return false; if (reclaimedSize != other.reclaimedSize) return false; if (repoSize != other.repoSize) @@ -219,6 +247,5 @@ public class GCJournal { return false; return true; } - } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java new file mode 100644 index 0000000..78f44b9 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file; + +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors the compaction cycle and keeps a compacted nodes counter, in order + * to provide a best effort progress log based on extrapolating the previous + * size and node count and current size to deduce current node count. + */ +public class GCNodeWriteMonitor { + + private static final Logger log = LoggerFactory.getLogger(GCNodeWriteMonitor.class); + + public static final GCNodeWriteMonitor EMPTY = new GCNodeWriteMonitor(-1, -1); + + /** + * Number of nodes the monitor will log a message, -1 to disable + */ + private final long logCycle; + + /** + * Ms to sleep at each cycle, -1 to disable + */ + private final long sleepPerCycle; + + /** + * Start timestamp of compaction (reset at each + * {@link #init(GCJournalEntry)} call). + */ + private long start = 0; + + /** + * Estimated nodes to compact per cycle (reset at each + * {@link #init(GCJournalEntry)} call). + */ + private long estimated = -1; + + /** + * Compacted nodes per cycle (reset at each {@link #init(GCJournalEntry)} + * call). + */ + private long nodes; + + private boolean running = false; + + private long gcCount; + + public GCNodeWriteMonitor(long logCycle, long sleepPerCycle) { + this.logCycle = logCycle; + this.sleepPerCycle = sleepPerCycle; + } + + public void compacted() { + nodes++; + if (logCycle > 0 && nodes % logCycle == 0) { + long ms = System.currentTimeMillis() - start; + log.info("TarMK GC #{}: compacted {} nodes out of estimated {} in {} ms.", gcCount, nodes, estimated, ms); + start = System.currentTimeMillis(); + if (sleepPerCycle > 0) { + try { + TimeUnit.MILLISECONDS.sleep(sleepPerCycle); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + public void init(long gcCount, @Nonnull GCJournalEntry entry, long currentSize) { + long prevSize = entry.getRepoSize(); + long prevNodes = entry.getNodes(); + this.gcCount = gcCount; + if (prevNodes > 0) { + estimated = (long) (((double) currentSize / prevSize) * prevNodes); + log.info("TarMK GC #{}: estimated {} nodes for compaction.", this.gcCount, estimated); + } else { + log.info("TarMK GC #{}: unable to estimate number of nodes for compaction, missing gc log."); + } + nodes = 0; + start = System.currentTimeMillis(); + running = true; + } + + public void finished() { + running = false; + } + + /** + * Compacted nodes in current cycle + */ + public long getCompactedNodes() { + return nodes; + } + + /** + * Estimated nodes to compact in current cycle. Can be {@code -1} if the + * estimation could not be performed. + */ + public long getEstimatedTotal() { + return estimated; + } + + /** + * Estimated completion percentage. Can be {@code -1} if the estimation + * could not be performed. + */ + public int getEstimatedPercentage() { + if (estimated > 0) { + if (!running) { + return 100; + } else { + return Math.min((int) (100 * ((double) nodes / estimated)), 99); + } + } + return -1; + } + + public boolean isCompactionRunning() { + return running; + } +} diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java index 1f212c3..d46a84d 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/GcJournalTest.java @@ -44,26 +44,30 @@ public class GcJournalTest { File directory = segmentFolder.newFolder(); GCJournal gc = new GCJournal(directory); - gc.persist(0, 100, 1); + gc.persist(0, 100, 1, 50); GCJournalEntry e0 = gc.read(); assertEquals(100, e0.getRepoSize()); assertEquals(0, e0.getReclaimedSize()); + assertEquals(50, e0.getNodes()); - gc.persist(0, 250, 2); + gc.persist(0, 250, 2, 75); GCJournalEntry e1 = gc.read(); assertEquals(250, e1.getRepoSize()); assertEquals(0, e1.getReclaimedSize()); + assertEquals(75, e1.getNodes()); - gc.persist(50, 200, 3); + gc.persist(50, 200, 3, 90); GCJournalEntry e2 = gc.read(); assertEquals(200, e2.getRepoSize()); assertEquals(50, e2.getReclaimedSize()); + assertEquals(90, e2.getNodes()); // same gen - gc.persist(75, 300, 3); + gc.persist(75, 300, 3, 125); GCJournalEntry e3 = gc.read(); assertEquals(200, e3.getRepoSize()); assertEquals(50, e3.getReclaimedSize()); + assertEquals(90, e3.getNodes()); Collection all = gc.readAll(); assertEquals(all.size(), 3);