Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (revision ) @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType; + +public class DefaultCompactionStrategyMBean implements CompactionStrategyMBean { + + private final CompactionStrategy strategy; + + public DefaultCompactionStrategyMBean(CompactionStrategy strategy) { + this.strategy = strategy; + } + + @Override + public boolean isCloneBinaries() { + return strategy.cloneBinaries(); + } + + @Override + public void setCloneBinaries(boolean cloneBinaries) { + strategy.setCloneBinaries(cloneBinaries); + } + + @Override + public boolean isPausedCompaction() { + return strategy.isPaused(); + } + + @Override + public void setPausedCompaction(boolean pausedCompaction) { + strategy.setPaused(pausedCompaction); + } + + @Override + public String getCleanupStrategy() { + return strategy.getCleanupType(); + } + + @Override + public void setCleanupStrategy(String cleanup) { + strategy.setCleanupType(CleanupType.valueOf(cleanup)); + } + + @Override + public long getOlderThan() { + return strategy.getOlderThan(); + } + + @Override + public void setOlderThan(long olderThan) { + strategy.setOlderThan(olderThan); + } + + @Override + public byte getMemoryThreshold() { + return strategy.getMemoryThreshold(); + } + + @Override + public void setMemoryThreshold(byte memory) { + strategy.setMemoryThreshold(memory); + } + + @Override + public String getCompactionMapStats() { + CompactionMap cm = strategy.getCompactionMap(); + if (cm != null) { + return cm.getCompactionStats(); + } + return ""; + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (revision ) @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +public interface CompactionStrategyMBean { + + String TYPE = "CompactionStrategy"; + + boolean isCloneBinaries(); + + void setCloneBinaries(boolean cloneBinaries); + + boolean isPausedCompaction(); + + void setPausedCompaction(boolean pausedCompaction); + + String getCleanupStrategy(); + + void setCleanupStrategy(String cleanup); + + long getOlderThan(); + + void setOlderThan(long olderThan); + + byte getMemoryThreshold(); + + void setMemoryThreshold(byte memory); + + String getCompactionMapStats(); +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (revision ) @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +72,7 @@ * identifiers and identifiers of the corresponding records * after compaction. */ - private final AtomicReference compactionMap = - new AtomicReference(new CompactionMap(1)); + private final AtomicReference compactionMap; private final long cacheSize; @@ -99,6 +99,8 @@ this.store = store; this.writer = new SegmentWriter(store, this); this.cacheSize = cacheSizeMB * MB; + this.compactionMap = new AtomicReference( + new CompactionMap(1, this)); } public SegmentTracker(SegmentStore store) { @@ -114,10 +116,17 @@ } Segment getSegment(SegmentId id) { + try { - Segment segment = store.readSegment(id); - setSegment(id, segment); - return segment; + Segment segment = store.readSegment(id); + setSegment(id, segment); + return segment; + } catch (SegmentNotFoundException snfe) { + long delta = System.currentTimeMillis() - id.getCreationTime(); + log.error("Segment not found: {}. Creation date delta is {} ms.", + id, delta); + throw snfe; - } + } + } void setSegment(SegmentId id, Segment segment) { // done before synchronization to allow concurrent segment access @@ -154,11 +163,12 @@ } public void setCompactionMap(CompactionMap compaction) { + compaction.merge(compactionMap.get()); compactionMap.set(compaction); } @Nonnull - CompactionMap getCompactionMap() { + public CompactionMap getCompactionMap() { return compactionMap.get(); } @@ -229,6 +239,12 @@ long msb = (random.nextLong() & MSB_MASK) | VERSION; long lsb = (random.nextLong() & LSB_MASK) | type; return getSegmentId(msb, lsb); + } + + public synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + for (int i = 0; i < tables.length; i++) { + tables[i].clearSegmentIdTables(strategy); + } } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java (date 1418330678000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java (revision ) @@ -18,8 +18,8 @@ import static com.google.common.collect.Maps.newHashMap; import static junit.framework.Assert.assertTrue; -import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE; +import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; import static org.junit.Assert.assertFalse; import java.util.Map; @@ -39,8 +39,8 @@ System.gc(); System.out.println((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)); - CompactionMap map = new CompactionMap(100000); SegmentTracker factory = new MemoryStore().getTracker(); + CompactionMap map = new CompactionMap(100000, factory); for (int i = 0; i < 1000000; i++) { if (i % 1000 == 0) { System.gc(); @@ -65,7 +65,7 @@ Random r = new Random(seed); SegmentTracker factory = new MemoryStore().getTracker(); - CompactionMap map = new CompactionMap(r.nextInt(maxSegments / 2)); + CompactionMap map = new CompactionMap(r.nextInt(maxSegments / 2), factory); Map entries = newHashMap(); int segments = r.nextInt(maxSegments); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (revision ) @@ -19,6 +19,7 @@ import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newTreeMap; import static com.google.common.collect.Sets.newTreeSet; +import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS; import java.util.Map; @@ -67,18 +68,27 @@ public class CompactionMap { private final int compressInterval; - private final Map recent = newHashMap(); + private final SegmentTracker tracker; + private Map recent = newHashMap(); + private long[] msbs = new long[0]; private long[] lsbs = new long[0]; - private int[] entryIndex = new int[0]; - private short[] beforeOffsets = new short[0]; - private SegmentId[] afterSegmentIds = new SegmentId[0]; + + private int[] entryIndex = new int[0]; private short[] afterOffsets = new short[0]; - CompactionMap(int compressInterval) { + private int[] afterSegmentIds = new int[0]; + private long[] amsbs = new long[0]; + private long[] alsbs = new long[0]; + + private long prevWeight; + private CompactionMap prev; + + CompactionMap(int compressInterval, SegmentTracker tracker) { this.compressInterval = compressInterval; + this.tracker = tracker; } /** @@ -90,28 +100,74 @@ * @return whether {@code before} was compacted to {@code after} */ boolean wasCompactedTo(RecordId before, RecordId after) { - return after.equals(get(before)); + return recursiveWasCompactedTo(before, after); } /** + * Given a record and a map I need to cycle down the #prev line to identify the compacted version. + * + * @param before before record identifier + * @param after after record identifier + * @return whether {@code before} was compacted to {@code after} + */ + private boolean recursiveWasCompactedTo(RecordId before, + RecordId after) { + RecordId potentialAfter = recursiveGet(this, before); + if (potentialAfter == null) { + return false; + } + if (after.equals(potentialAfter)) { + return true; + } + return recursiveWasCompactedTo(potentialAfter, after); + } + + private RecordId recursiveGet(CompactionMap map, RecordId before) { + RecordId after = map.get(before); + if (after != null) { + return after; + } + if (map.prev != null) { + return recursiveGet(map.prev, before); + } + return null; + } + + /** * Checks whether content in the segment with the given identifier was * compacted to new segments. * * @param id segment identifier * @return whether the identified segment was compacted */ - boolean wasCompacted(SegmentId id) { + public boolean wasCompacted(UUID id) { long msb = id.getMostSignificantBits(); long lsb = id.getLeastSignificantBits(); - return findEntry(msb, lsb) != -1; + return wasCompacted(this, msb, lsb); } + private static boolean wasCompacted(CompactionMap map, long msb, long lsb) { + int find = map.findEntry(msb, lsb); + if (find != -1) { + return true; + } + if (map.prev != null) { + return wasCompacted(map.prev, msb, lsb); + } + return false; + } + public RecordId get(RecordId before) { RecordId after = recent.get(before); if (after != null) { return after; } + //empty map + if (msbs.length == 0) { + return null; + } + SegmentId segmentId = before.getSegmentId(); long msb = segmentId.getMostSignificantBits(); long lsb = segmentId.getLeastSignificantBits(); @@ -122,12 +178,10 @@ int index = entryIndex[entry]; int limit = entryIndex[entry + 1]; for (int i = index; i < limit; i++) { - int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS; + int o = decode(beforeOffsets[i]); if (o == offset) { // found it! - return new RecordId( - afterSegmentIds[i], - (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS); + return new RecordId(asSegmentId(i), decode(afterOffsets[i])); } else if (o > offset) { return null; } @@ -137,12 +191,32 @@ return null; } + private static int decode(short offset) { + return (offset & 0xffff) << RECORD_ALIGN_BITS; + } + + private static short encode(int offset) { + return (short) (offset >> RECORD_ALIGN_BITS); + } + + private SegmentId asSegmentId(int index) { + int idx = afterSegmentIds[index]; + return new SegmentId(tracker, amsbs[idx], alsbs[idx]); + } + + private static UUID asUUID(SegmentId id) { + return new UUID(id.getMostSignificantBits(), + id.getLeastSignificantBits()); + } + /** * Adds a new entry to the compaction map. Overwriting a previously * added entry is not supported. */ void put(RecordId before, RecordId after) { - assert get(before) == null; + if (get(before) != null) { + throw new IllegalArgumentException(); + } recent.put(before, after); if (recent.size() >= compressInterval) { compress(); @@ -150,6 +224,10 @@ } void compress() { + if (recent.isEmpty()) { + // noop + return; + } Set uuids = newTreeSet(); Map> mapping = newTreeMap(); @@ -180,9 +258,11 @@ int newEntries = beforeOffsets.length + recent.size(); short[] newBeforeOffsets = new short[newEntries]; - SegmentId[] newAfterSegmentIds = new SegmentId[newEntries]; short[] newAfterOffsets = new short[newEntries]; + int[] newAfterSegmentIds = new int[newEntries]; + Map newAfterSegments = newHashMap(); + int newIndex = 0; int newEntry = 0; int oldEntry = 0; @@ -190,9 +270,10 @@ newmsbs[newEntry] = uuid.getMostSignificantBits(); newlsbs[newEntry] = uuid.getLeastSignificantBits(); - Map map = mapping.get(uuid); - if (map == null) { - map = newTreeMap(); + // offset -> record + Map newsegment = mapping.get(uuid); + if (newsegment == null) { + newsegment = newTreeMap(); } if (oldEntry < msbs.length @@ -201,21 +282,29 @@ int index = entryIndex[oldEntry]; int limit = entryIndex[oldEntry + 1]; for (int i = index; i < limit; i++) { - map.put((beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS, - new RecordId( - afterSegmentIds[i], - (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS)); + newsegment.put(decode(beforeOffsets[i]), new RecordId( + asSegmentId(i), decode(afterOffsets[i]))); } oldEntry++; } newEntryIndex[newEntry++] = newIndex; - for (Entry entry : map.entrySet()) { + for (Entry entry : newsegment.entrySet()) { int key = entry.getKey(); RecordId id = entry.getValue(); - newBeforeOffsets[newIndex] = (short) (key >> RECORD_ALIGN_BITS); - newAfterSegmentIds[newIndex] = id.getSegmentId(); - newAfterOffsets[newIndex] = (short) (id.getOffset() >> RECORD_ALIGN_BITS); + newBeforeOffsets[newIndex] = encode(key); + newAfterOffsets[newIndex] = encode(id.getOffset()); + + UUID aUUID = asUUID(id.getSegmentId()); + int aSIdx = -1; + if (newAfterSegments.containsKey(aUUID)) { + aSIdx = newAfterSegments.get(aUUID); + } else { + aSIdx = newAfterSegments.size(); + newAfterSegments.put(aUUID, aSIdx); + } + newAfterSegmentIds[newIndex] = aSIdx; + newIndex++; } } @@ -227,12 +316,21 @@ this.entryIndex = newEntryIndex; this.beforeOffsets = newBeforeOffsets; - this.afterSegmentIds = newAfterSegmentIds; this.afterOffsets = newAfterOffsets; - recent.clear(); + this.afterSegmentIds = newAfterSegmentIds; + this.amsbs = new long[newAfterSegments.size()]; + this.alsbs = new long[newAfterSegments.size()]; + for (Entry entry : newAfterSegments.entrySet()) { + this.amsbs[entry.getValue()] = entry.getKey() + .getMostSignificantBits(); + this.alsbs[entry.getValue()] = entry.getKey() + .getLeastSignificantBits(); - } + } + recent = newHashMap(); + } + /** * Finds the given segment identifier (UUID) within the list of * identifiers of compacted segments tracked by this instance. @@ -292,6 +390,79 @@ // not found return -1; + } + + /** + * TODO: merge the 2 maps (assume that 'prev' is bigger than the current map + * as it contains the entire history, but don't change any values as it + * might still be in use by other threads) + */ + void merge(CompactionMap prev) { + this.prev = prev; + this.prevWeight = prev.getEstimatedWeight(); + } + + public String getCompactionStats() { + StringBuilder sb = new StringBuilder(); + CompactionMap cm = this; + while (cm != null) { + sb.append("["); + sb.append(getCompactionStats(cm)); + sb.append("], "); + cm = cm.prev; + } + return sb.toString(); + } + + private static String getCompactionStats(CompactionMap cm) { + StringBuilder sb = new StringBuilder(); + sb.append("Estimated Weight: "); + sb.append(humanReadableByteCount(getEstimatedWeight(cm))); + sb.append(", Records: "); + sb.append(cm.afterOffsets.length); + sb.append(", Segments: "); + sb.append(cm.amsbs.length); + return sb.toString(); + } + + public long getEstimatedWeight() { + long total = 0; + CompactionMap cm = this; + while (cm != null) { + total += getEstimatedWeight(cm); + cm = cm.prev; + } + return total; + } + + public long getLastMergeWeight() { + return this.prevWeight; + } + + private static long getEstimatedWeight(CompactionMap cm) { + // estimation of the object including empty 'recent' map + long total = 168; + + // msbs + total += 24 + cm.msbs.length * 8; + // lsbs + total += 24 + cm.lsbs.length * 8; + // beforeOffsets + total += 24 + cm.beforeOffsets.length * 2; + + // entryIndex + total += 24 + cm.entryIndex.length * 4; + // afterOffsets + total += 24 + cm.afterOffsets.length * 2; + + // afterSegmentIds + total += 24 + cm.afterSegmentIds.length * 4; + // amsbs + total += 24 + cm.amsbs.length * 8; + // alsbs + total += 24 + cm.alsbs.length * 8; + + return total; } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/HeavyWriteIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/HeavyWriteIT.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/HeavyWriteIT.java (revision ) @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment; + +import static org.apache.commons.io.FileUtils.deleteDirectory; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_OLD; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HeavyWriteIT { + + private File directory; + + @Before + public void setUp() throws IOException { + directory = File.createTempFile( + "FileStoreTest", "dir", new File("target")); + directory.delete(); + directory.mkdir(); + } + + @After + public void cleanDir() throws IOException { + deleteDirectory(directory); + } + + @Test + public void heavyWrite() throws IOException, CommitFailedException, InterruptedException { + final FileStore store = new FileStore(directory, 128, false); + store.setCompactionStrategy(new CompactionStrategy(false, false, + CLEAN_OLD, 30000, (byte) 0)); + final SegmentNodeStore nodeStore = new SegmentNodeStore(store); + + int writes = 100; + final AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int k = 1; run.get(); k++) { + store.gc(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + }); + thread.start(); + + try { + for (int k = 1; k<=writes; k++) { + NodeBuilder root = nodeStore.getRoot().builder(); + NodeBuilder test = root.setChildNode("test"); + createNodes(nodeStore, test, 10, 2); + nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + root = nodeStore.getRoot().builder(); + root.getChildNode("test").remove(); + nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + } finally { + run.set(false); + thread.join(); + store.close(); + } + } + + private static void createNodes(NodeStore nodeStore, NodeBuilder builder, int count, int depth) throws IOException { + if (depth > 0) { + for (int k = 0; k < count; k++) { + NodeBuilder child = builder.setChildNode("node" + k); + createProperties(nodeStore, child, count); + createNodes(nodeStore, child, count, depth - 1); + } + } + } + + private static void createProperties(NodeStore nodeStore, NodeBuilder builder, int count) throws IOException { + for (int k = 0; k < count; k++) { + builder.setProperty("property-" + k, createBlob(nodeStore, 100000)); + } + } + + private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return nodeStore.createBlob(new ByteArrayInputStream(data)); + } + +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision ) @@ -29,6 +29,7 @@ import java.io.InputStream; import java.util.Random; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; @@ -92,6 +93,17 @@ void setMaximumBackoff(long max) { this.maximumBackoff = max; + } + + boolean locked(Callable c) throws Exception { + if (commitSemaphore.tryAcquire()) { + try { + return c.call(); + } finally { + commitSemaphore.release(); + } + } + return false; } /** Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (revision ) @@ -603,7 +603,7 @@ return thisLevel.iterator().next(); } - public MapRecord writeMap(MapRecord base, Map changes) { + MapRecord writeMap(MapRecord base, Map changes) { if (base != null && base.isDiff()) { Segment segment = base.getSegment(); RecordId key = segment.readRecordId(base.getOffset(8)); @@ -1030,7 +1030,7 @@ && store.containsSegment(((SegmentPropertyState) property).getRecordId().getSegmentId())) { ids.add(((SegmentPropertyState) property).getRecordId()); } else if (!(before instanceof SegmentNodeState) - || store.containsSegment(((SegmentNodeState) before).getRecordId().getSegmentId())) { + || store.containsSegment(before.getRecordId().getSegmentId())) { ids.add(writeProperty(property)); } else { // reuse previously stored property, if possible @@ -1059,6 +1059,10 @@ } return new SegmentNodeState(recordId); } + } + + public SegmentTracker getTracker() { + return tracker; } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision ) @@ -25,10 +25,10 @@ import static com.google.common.collect.Sets.newHashSet; import static java.lang.String.format; import static java.util.Collections.singletonMap; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT; import java.io.File; import java.io.IOException; @@ -42,25 +42,28 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.base.Stopwatch; - +import com.google.common.collect.Maps; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; import org.apache.jackrabbit.oak.plugins.segment.Compactor; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException; -import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; -import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; +import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -81,7 +84,7 @@ private static final String JOURNAL_FILE_NAME = "journal.log"; - private static final boolean MEMORY_MAPPING_DEFAULT = + static final boolean MEMORY_MAPPING_DEFAULT = "64".equals(System.getProperty("sun.arch.data.model", "32")); private final SegmentTracker tracker; @@ -129,17 +132,15 @@ */ private final BackgroundThread compactionThread; + private CompactionStrategy compactionStrategy = new CompactionStrategy( + true, false, CLEAN_NONE, 0, MEMORY_THRESHOLD_DEFAULT); + /** * Flag to request revision cleanup during the next flush. */ private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false); /** - * Flag to set the compaction on pause. - */ - private volatile boolean pauseCompaction = true; - - /** * List of old tar file generations that are waiting to be removed. They can * not be removed immediately, because they first need to be closed, and the * JVM needs to release the memory mapped file references. @@ -255,37 +256,66 @@ new Runnable() { @Override public void run() { + maybeCompact(true); + } + }); + + log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); + } + + public boolean maybeCompact(boolean cleanup) { - log.info("TarMK compaction started"); + log.info("TarMK compaction started"); + + Runtime runtime = Runtime.getRuntime(); + long avail = runtime.totalMemory() - runtime.freeMemory(); + long delta = 0; + if (compactionStrategy.getCompactionMap() != null) { + delta = compactionStrategy.getCompactionMap().getLastMergeWeight(); + } + long needed = delta * compactionStrategy.getMemoryThreshold(); + if (needed >= avail) { + log.info( + "Not enough available memory {}, needed {}, last merge delta {}, so skipping compaction for now", + humanReadableByteCount(avail), + humanReadableByteCount(needed), + humanReadableByteCount(delta)); + if (cleanup) { + cleanupNeeded.set(true); + } + return false; + } + - Stopwatch watch = Stopwatch.createStarted(); + Stopwatch watch = Stopwatch.createStarted(); + compactionStrategy.setCompactionStart(System.currentTimeMillis()); + boolean compacted = false; + - CompactionGainEstimate estimate = estimateCompactionGain(); - long gain = estimate.estimateCompactionGain(); - if (gain >= 10) { - log.info( - "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction", + CompactionGainEstimate estimate = estimateCompactionGain(); + long gain = estimate.estimateCompactionGain(); + if (gain >= 10) { + log.info( + "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction", - watch, gain, - estimate.getReachableSize(), + watch, gain, estimate.getReachableSize(), - estimate.getTotalSize(), - humanReadableByteCount(estimate.getReachableSize()), - humanReadableByteCount(estimate.getTotalSize())); + estimate.getTotalSize(), + humanReadableByteCount(estimate.getReachableSize()), + humanReadableByteCount(estimate.getTotalSize())); - if (!pauseCompaction) { + if (!compactionStrategy.isPaused()) { - compact(); + compact(); + compacted = true; - } else { - log.info("TarMK compaction paused"); - } - } else { - log.info( + } else { + log.info("TarMK compaction paused"); + } + } else { + log.info( - "Estimated compaction in {}ms, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now", - watch, gain, - estimate.getReachableSize(), + "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now", + watch, gain, estimate.getReachableSize(), - estimate.getTotalSize(), - humanReadableByteCount(estimate.getReachableSize()), - humanReadableByteCount(estimate.getTotalSize())); - } + estimate.getTotalSize(), + humanReadableByteCount(estimate.getReachableSize()), + humanReadableByteCount(estimate.getTotalSize())); + } + if (cleanup) { - cleanupNeeded.set(true); - } + cleanupNeeded.set(true); + } - }); - - log.info("TarMK opened: {} (mmap={})", directory, memoryMapping); + return compacted; } static Map> collectFiles(File directory) @@ -451,10 +481,10 @@ } writer.cleanup(ids); - List list = - newArrayListWithCapacity(readers.size()); + CompactionMap cm = tracker.getCompactionMap(); + List list = newArrayListWithCapacity(readers.size()); for (TarReader reader : readers) { - TarReader cleaned = reader.cleanup(ids); + TarReader cleaned = reader.cleanup(ids, cm); if (cleaned == reader) { list.add(reader); } else { @@ -475,12 +505,11 @@ } public void compact() { - long start = System.nanoTime(); - log.info("TarMK compaction running"); + log.info("TarMK compaction running, strategy={}", compactionStrategy); + long start = System.currentTimeMillis(); SegmentWriter writer = new SegmentWriter(this, tracker); - Compactor compactor = new Compactor(writer); - + final Compactor compactor = new Compactor(writer, compactionStrategy.cloneBinaries()); SegmentNodeState before = getHead(); long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS) .getChildNodeCount(Long.MAX_VALUE); @@ -491,27 +520,23 @@ } SegmentNodeState after = compactor.compact(EMPTY_NODE, before); - writer.flush(); - while (!setHead(before, after)) { + + Callable setHead = new SetHead(before, after, compactor); + try { + while(!compactionStrategy.compacted(setHead)) { - // Some other concurrent changes have been made. - // Rebase (and compact) those changes on top of the - // compacted state before retrying to set the head. - SegmentNodeState head = getHead(); + // Some other concurrent changes have been made. + // Rebase (and compact) those changes on top of the + // compacted state before retrying to set the head. + SegmentNodeState head = getHead(); - after = compactor.compact(before, head); - before = head; - writer.flush(); + after = compactor.compact(after, head); + setHead = new SetHead(head, after, compactor); - } + } - tracker.setCompactionMap(compactor.getCompactionMap()); - - // Drop the SegmentWriter caches and flush any existing state - // in an attempt to prevent new references to old pre-compacted - // content. TODO: There should be a cleaner way to do this. - tracker.getWriter().dropCache(); - tracker.getWriter().flush(); - - log.info("TarMK compaction completed in {}ms", MILLISECONDS - .convert(System.nanoTime() - start, NANOSECONDS)); + log.info("TarMK compaction completed in {}ms", + System.currentTimeMillis() - start); + } catch (Exception e) { + log.error("Error while running TarMK compaction", e); - } + } + } public synchronized Iterable getSegmentIds() { List ids = newArrayList(); @@ -704,8 +729,42 @@ compactionThread.trigger(); } - public FileStore setPauseCompaction(boolean pauseCompaction) { - this.pauseCompaction = pauseCompaction; + public FileStore setCompactionStrategy(CompactionStrategy strategy) { + this.compactionStrategy = strategy; return this; + } + + private class SetHead implements Callable { + private final SegmentNodeState before; + private final SegmentNodeState after; + private final Compactor compactor; + + public SetHead(SegmentNodeState before, SegmentNodeState after, Compactor compactor) { + this.before = before; + this.after = after; + this.compactor = compactor; + } + + @Override + public Boolean call() throws Exception { + // When used in conjunction with the SegmentNodeStore, this method + // needs to be called inside the commitSemaphore as doing otherwise + // might result in mixed segments. See OAK-2192. + if (setHead(before, after)) { + CompactionMap cm = compactor.getCompactionMap(); + tracker.setCompactionMap(cm); + compactionStrategy.setCompactionMap(cm); + + // Drop the SegmentWriter caches and flush any existing state + // in an attempt to prevent new references to old pre-compacted + // content. TODO: There should be a cleaner way to do this. + tracker.getWriter().dropCache(); + tracker.getWriter().flush(); + tracker.clearSegmentIdTables(compactionStrategy); + return true; + } else { + return false; + } + } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (revision ) @@ -25,6 +25,8 @@ import java.util.Collection; import java.util.Map; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; + /** * Hash table of weak references to segment identifiers. */ @@ -137,4 +139,24 @@ return ((int) lsb) & (references.size() - 1); } + synchronized void clearSegmentIdTables(CompactionStrategy strategy) { + int size = references.size(); + boolean dirty = false; + for (int i = 0; i < size; i++) { + WeakReference reference = references.get(i); + if (reference != null) { + SegmentId id = reference.get(); + if (id != null) { + if (strategy.canRemove(id)) { + reference.clear(); + references.set(i, null); + dirty = true; + } + } + } + } + if (dirty) { + refresh(); + } + } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java (revision ) @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment; + +import static com.google.common.collect.Lists.newArrayList; +import static org.apache.commons.io.FileUtils.byteCountToDisplaySize; +import static org.apache.commons.io.FileUtils.deleteDirectory; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_OLD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.io.ByteStreams; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class CompactionAndCleanupTest { + + private File directory; + + @Before + public void setUp() throws IOException { + directory = File.createTempFile( + "FileStoreTest", "dir", new File("target")); + directory.delete(); + directory.mkdir(); + } + + @Test + @Ignore("OAK-2045") + public void compactionAndWeakReferenceMagic() throws Exception { + final int MB = 1024 * 1024; + final int blobSize = 5 * MB; + + final int dataNodes = 10000; + + // really long time span, no binary cloning + CompactionStrategy custom = new CompactionStrategy(false, + false, CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0); + + FileStore fileStore = new FileStore(directory, 1); + SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); + fileStore.setCompactionStrategy(custom); + + // 1a. Create a bunch of data + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + for (int i = 0; i < dataNodes; i++) { + NodeBuilder c = content.child("c" + i); + for (int j = 0; j < 1000; j++) { + c.setProperty("p" + i, "v" + i); + } + } + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + // ---- + + final long dataSize = fileStore.size(); + System.out.printf("File store dataSize %s%n", + byteCountToDisplaySize(dataSize)); + + // 1. Create a property with 5 MB blob + NodeBuilder builder = nodeStore.getRoot().builder(); + builder.setProperty("a1", createBlob(nodeStore, blobSize)); + builder.setProperty("b", "foo"); + + // Keep a reference to this nodeState to simulate long + // running session + NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, + CommitInfo.EMPTY); + + System.out.printf("File store pre removal %s expecting %s %n", + byteCountToDisplaySize(fileStore.size()), + byteCountToDisplaySize(blobSize + dataSize)); + assertEquals(mb(blobSize + dataSize), mb(fileStore.size())); + + // 2. Now remove the property + builder = nodeStore.getRoot().builder(); + builder.removeProperty("a1"); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // Size remains same + System.out.printf("File store pre compaction %s expecting %s%n", + byteCountToDisplaySize(fileStore.size()), + byteCountToDisplaySize(blobSize + dataSize)); + assertEquals(mb(blobSize + dataSize), mb(fileStore.size())); + + // 3. Compact + assertTrue(fileStore.maybeCompact(false)); + // fileStore.cleanup(); + + // Size still remains same + System.out.printf("File store post compaction %s expecting %s%n", + byteCountToDisplaySize(fileStore.size()), + byteCountToDisplaySize(blobSize + dataSize)); + assertEquals("File store post compaction size", + mb(blobSize + dataSize), mb(fileStore.size())); + + // 4. Add some more property to flush the current TarWriter + builder = nodeStore.getRoot().builder(); + builder.setProperty("a2", createBlob(nodeStore, blobSize)); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // Size is double + System.out.printf("File store pre cleanup %s expecting %s%n", + byteCountToDisplaySize(fileStore.size()), + byteCountToDisplaySize(2 * blobSize + dataSize)); + assertEquals(mb(2 * blobSize + dataSize), mb(fileStore.size())); + + // 5. Cleanup + assertTrue(fileStore.maybeCompact(false)); + fileStore.cleanup(); + + System.out.printf( + "File store post cleanup %s expecting between [%s,%s]%n", + byteCountToDisplaySize(fileStore.size()), + byteCountToDisplaySize(blobSize + dataSize), + byteCountToDisplaySize(blobSize + 2 * dataSize)); + + // 0 data size: fileStore.size() == blobSize + // >0 data size: fileStore.size() in [blobSize + dataSize, blobSize + + // 2xdataSize] + assertTrue(mb(fileStore.size()) >= mb(blobSize + dataSize) + && mb(fileStore.size()) <= mb(blobSize + 2 * dataSize)); + + // refresh the ts ref, to simulate a long wait time + custom.setOlderThan(0); + assertFalse(fileStore.maybeCompact(false)); + + // no data loss happened + byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot() + .getProperty("a2").getValue(Type.BINARY).getNewStream()); + assertEquals(blobSize, blob.length); + } + + @After + public void cleanDir() throws IOException { + deleteDirectory(directory); + } + + private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return nodeStore.createBlob(new ByteArrayInputStream(data)); + } + + private static long mb(long size){ + return size / (1024 * 1024); + } + + @Test + public void testMixedSegments() throws Exception { + FileStore store = new FileStore(directory, 2, false); + final SegmentNodeStore nodeStore = new SegmentNodeStore(store); + store.setCompactionStrategy(new CompactionStrategy(true, false, CLEAN_NONE, 0, (byte) 5) { + @Override + public boolean compacted(Callable setHead) throws Exception { + return nodeStore.locked(setHead); + } + }); + + NodeBuilder root = nodeStore.getRoot().builder(); + createNodes(root.setChildNode("test"), 10, 3); + nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + final Set beforeSegments = new HashSet(); + collectSegments(store.getHead(), beforeSegments); + + final AtomicReference run = new AtomicReference(true); + final List failedCommits = newArrayList(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int k = 0; run.get(); k++) { + try { + NodeBuilder root = nodeStore.getRoot().builder(); + root.setChildNode("b" + k); + nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY); + Thread.sleep(5); + } catch (CommitFailedException e) { + failedCommits.add(k); + } catch (InterruptedException e) { + Thread.interrupted(); + break; + } + } + } + }); + t.start(); + + store.compact(); + run.set(false); + t.join(); + + assertTrue(failedCommits.isEmpty()); + + Set afterSegments = new HashSet(); + collectSegments(store.getHead(), afterSegments); + try { + for (UUID u : beforeSegments) { + assertFalse("Mixed segments found: " + u, afterSegments.contains(u)); + } + } finally { + store.close(); + } + } + + private static void collectSegments(SegmentNodeState s, Set segmentIds) { + SegmentId sid = s.getRecordId().getSegmentId(); + UUID id = new UUID(sid.getMostSignificantBits(), + sid.getLeastSignificantBits()); + segmentIds.add(id); + for (ChildNodeEntry cne : s.getChildNodeEntries()) { + collectSegments((SegmentNodeState) cne.getNodeState(), segmentIds); + } + for (PropertyState propertyState : s.getProperties()) { + sid = ((SegmentPropertyState) propertyState).getRecordId().getSegmentId(); + id = new UUID(sid.getMostSignificantBits(), + sid.getLeastSignificantBits()); + segmentIds.add(id); + } + } + + private static void createNodes(NodeBuilder builder, int count, int depth) { + if (depth > 0) { + for (int k = 0; k < count; k++) { + NodeBuilder child = builder.setChildNode("node" + k); + createProperties(child, count); + createNodes(child, count, depth - 1); + } + } + } + + private static void createProperties(NodeBuilder builder, int count) { + for (int k = 0; k < count; k++) { + builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString()); + } + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (date 1418330678000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (revision ) @@ -110,6 +110,10 @@ SegmentNodeState head = builder.getNodeState(); assertTrue(store.setHead(base, head)); assertEquals("bar", store.getHead().getString("foo")); + + Compactor compactor = new Compactor(writer); + SegmentNodeState compacted = + compactor.compact(EmptyNodeState.EMPTY_NODE, head); store.close(); // First simulate the case where during compaction a reference to the @@ -117,9 +121,6 @@ store = new FileStore(directory, 1, false); head = store.getHead(); assertTrue(store.size() > largeBinarySize); - Compactor compactor = new Compactor(writer); - SegmentNodeState compacted = - compactor.compact(EmptyNodeState.EMPTY_NODE, head); builder = head.builder(); builder.setChildNode("old", head); // reference to pre-compacted state builder.getNodeState(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision ) @@ -18,6 +18,12 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLEANUP_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLONE_BINARIES_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PAUSE_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; import java.io.Closeable; @@ -25,6 +31,7 @@ import java.io.IOException; import java.util.Dictionary; import java.util.Hashtable; +import java.util.concurrent.Callable; import org.apache.commons.io.FilenameUtils; import org.apache.felix.scr.annotations.Activate; @@ -32,6 +39,7 @@ import org.apache.felix.scr.annotations.ConfigurationPolicy; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.PropertyOption; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; @@ -41,6 +49,10 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean; import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType; +import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -77,11 +89,26 @@ @Property(description="Cache size (MB)", intValue=256) public static final String CACHE = "cache"; - @Property(description = "TarMK compaction paused flag", boolValue = true) + @Property(description = "TarMK compaction clone binaries flag", boolValue = CLONE_BINARIES_DEFAULT) + public static final String COMPACTION_CLONE_BINARIES = "compaction.cloneBinaries"; + + @Property(options = { @PropertyOption(name = "all", value = "all"), + @PropertyOption(name = "none", value = "none"), + @PropertyOption(name = "timestamp", value = "timestamp") }, value = "timestamp") + public static final String COMPACTION_CLEANUP = "compaction.cleanup"; + + @Property(description = "TarMK compaction strategy timestamp older (ms)", longValue = TIMESTAMP_DEFAULT) + public static final String COMPACTION_CLEANUP_TIMESTAMP = "compaction.cleanup.timestamp"; + + @Property(description = "TarMK compaction available memory multiplier needed to run compaction", byteValue = MEMORY_THRESHOLD_DEFAULT) + public static final String COMPACTION_MEMORY_THRESHOLD = "compaction.memoryThreshold"; + + @Property(description = "TarMK compaction paused flag", boolValue = PAUSE_DEFAULT) public static final String PAUSE_COMPACTION = "pauseCompaction"; @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false) public static final String STANDBY = "standby"; + /** * Boolean value indicating a blobStore is to be used */ @@ -107,7 +134,9 @@ private ServiceRegistration providerRegistration; private Registration revisionGCRegistration; private Registration blobGCRegistration; + private Registration compactionStrategyRegistration; private WhiteboardExecutor executor; + private boolean customBlobStore; @Override protected synchronized SegmentNodeStore getNodeStore() { @@ -118,20 +147,28 @@ @Activate private void activate(ComponentContext context) throws IOException { this.context = context; + this.customBlobStore = Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE)); - if(blobStore == null && - Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE))){ + if (blobStore == null && customBlobStore) { log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available"); - }else{ - registerNodeStore(); + } else if (registerNodeStore()) { + Dictionary props = new Hashtable(); + props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); + + boolean standby = toBoolean(lookup(context, STANDBY), false); + providerRegistration = context.getBundleContext().registerService( + SegmentStoreProvider.class.getName(), this, props); + if (!standby) { + storeRegistration = context.getBundleContext().registerService( + NodeStore.class.getName(), this, props); - } - } + } + } + } - public synchronized void registerNodeStore() - throws IOException { + public synchronized boolean registerNodeStore() throws IOException { - if(context == null){ + if (context == null) { log.info("Component still not activated. Ignoring the initialization call"); - return; + return false; } Dictionary properties = context.getProperties(); @@ -155,26 +192,49 @@ size = System.getProperty(SIZE, "256"); } - boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), true); - store = new FileStore( - blobStore, - new File(directory), - Integer.parseInt(size), "64".equals(mode)) - .setPauseCompaction(pauseCompaction); + boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), + PAUSE_DEFAULT); + boolean cloneBinaries = toBoolean( + lookup(context, COMPACTION_CLONE_BINARIES), + CLONE_BINARIES_DEFAULT); + long cleanupTs = toLong(lookup(context, COMPACTION_CLEANUP_TIMESTAMP), + TIMESTAMP_DEFAULT); + String cleanup = lookup(context, COMPACTION_CLEANUP); + if (cleanup == null) { + cleanup = CLEANUP_DEFAULT.toString(); + } + String memoryThresholdS = lookup(context, COMPACTION_MEMORY_THRESHOLD); + byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT; + if (memoryThresholdS != null) { + memoryThreshold = Byte.valueOf(memoryThresholdS); + } + CompactionStrategy compactionStrategy = new CompactionStrategy( + pauseCompaction, cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs, + memoryThreshold) { + @Override + public boolean compacted(Callable setHead) throws Exception { + // Need to guard against concurrent commits to avoid + // mixed segments. See OAK-2192. + return delegate.locked(setHead); + } + }; + + boolean memoryMapping = "64".equals(mode); + int cacheSize = Integer.parseInt(size); + if (customBlobStore) { + log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore); + store = new FileStore(blobStore, new File(directory), cacheSize, + memoryMapping).setCompactionStrategy(compactionStrategy); + } else { + store = new FileStore(new File(directory), cacheSize, memoryMapping) + .setCompactionStrategy(compactionStrategy); + } + delegate = new SegmentNodeStore(store); observerTracker = new ObserverTracker(delegate); observerTracker.start(context.getBundleContext()); - Dictionary props = new Hashtable(); - props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); - - boolean standby = toBoolean(lookup(context, STANDBY), false); - providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props); - if (!standby) { - storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props); - } - OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); executor = new WhiteboardExecutor(); executor.start(whiteboard); @@ -188,13 +248,13 @@ revisionGCRegistration = registerMBean(whiteboard, RevisionGCMBean.class, revisionGC, RevisionGCMBean.TYPE, "Segment node store revision garbage collection"); - if (blobStore instanceof GarbageCollectableBlobStore) { + if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { BlobGarbageCollector gc = new BlobGarbageCollector() { @Override public void collectGarbage() throws Exception { MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store.getTracker()), - (GarbageCollectableBlobStore) blobStore, + (GarbageCollectableBlobStore) store.getBlobStore(), executor); gc.collectGarbage(); } @@ -204,7 +264,14 @@ BlobGCMBean.TYPE, "Segment node store blob garbage collection"); } + compactionStrategyRegistration = registerMBean(whiteboard, + CompactionStrategyMBean.class, + new DefaultCompactionStrategyMBean(compactionStrategy), + CompactionStrategyMBean.TYPE, + "Segment node store compaction strategy settings"); + log.info("SegmentNodeStore initialized"); + return true; } private static String lookup(ComponentContext context, String property) { @@ -229,7 +296,6 @@ } protected void bindBlobStore(BlobStore blobStore) throws IOException { - log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore); this.blobStore = blobStore; registerNodeStore(); } @@ -255,6 +321,10 @@ if (blobGCRegistration != null) { blobGCRegistration.unregister(); blobGCRegistration = null; + } + if (compactionStrategyRegistration != null) { + compactionStrategyRegistration.unregister(); + compactionStrategyRegistration = null; } if (executor != null) { executor.stop(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (revision ) @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment.compaction; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.System.currentTimeMillis; + +import java.util.concurrent.Callable; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; +import org.apache.jackrabbit.oak.plugins.segment.SegmentId; + +public class CompactionStrategy { + + public enum CleanupType { + + /** + * {@code CLEAN_ALL} must be used in conjunction with {@code cloneBinaries} + * otherwise segments can go away ({@code SegmentNotFoundException}) + *

+ * Pros: best compaction results + *

+ * Cons: larger repo size during compaction (2x). High chances that a currently + * running diff (e.g. observation) fails with {@code SegmentNotFoundException}. + */ + CLEAN_ALL, + + CLEAN_NONE, + + /** + * {@code CLEAN_OLD} with {@code cloneBinaries} + *

+ * Pros: better compaction results + *

+ * Cons: larger repo size {@code during} compaction (2x). {@code SegmentNotFoundException} + * with insufficiently large values for {@code olderThan}. + *

+ * {@code CLEAN_OLD} without {@code cloneBinaries} + *

+ * Pros: weakest compaction results, smaller size during compaction (1x + size of + * data-segments). + *

+ * Cons: {@code SegmentNotFoundException} with insufficiently large values for + * {@code olderThan}. + */ + CLEAN_OLD + } + + public static final boolean PAUSE_DEFAULT = true; + + public static final boolean CLONE_BINARIES_DEFAULT = false; + + public static final CleanupType CLEANUP_DEFAULT = CleanupType.CLEAN_OLD; + + public static final long TIMESTAMP_DEFAULT = 1000 * 60 * 5; + + public static final byte MEMORY_THRESHOLD_DEFAULT = 5; + + private boolean paused; + + private boolean cloneBinaries; + + @Nonnull + private CleanupType cleanupType; + + /** + * anything that has a lifetime bigger than this will be removed. a value of + * 0 (or very small) acts like a CLEANUP.NONE, a value of -1 (or negative) + * acts like a CLEANUP.ALL + * + */ + private long olderThan; + + private byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT; + + private CompactionMap compactionMap; + + private long compactionStart = currentTimeMillis(); + + public CompactionStrategy(boolean paused, + boolean cloneBinaries, @Nonnull CleanupType cleanupType, long olderThan, byte memoryThreshold) { + checkArgument(olderThan >= 0); + this.paused = paused; + this.cloneBinaries = cloneBinaries; + this.cleanupType = checkNotNull(cleanupType); + this.olderThan = olderThan; + this.memoryThreshold = memoryThreshold; + } + + public boolean canRemove(SegmentId id) { + switch (cleanupType) { + case CLEAN_ALL: + return true; + case CLEAN_NONE: + return false; + case CLEAN_OLD: + return compactionStart - id.getCreationTime() > olderThan; + } + return false; + } + + public boolean cloneBinaries() { + return cloneBinaries; + } + + public boolean isPaused() { + return paused; + } + + public void setPaused(boolean paused) { + this.paused = paused; + } + + public void setCloneBinaries(boolean cloneBinaries) { + this.cloneBinaries = cloneBinaries; + } + + public void setCleanupType(@Nonnull CleanupType cleanupType) { + this.cleanupType = checkNotNull(cleanupType); + } + + public void setOlderThan(long olderThan) { + checkArgument(olderThan >= 0); + this.olderThan = olderThan; + } + + public void setCompactionMap(@Nonnull CompactionMap compactionMap) { + this.compactionMap = checkNotNull(compactionMap); + } + + String getCleanupType() { + return cleanupType.toString(); + } + + long getOlderThan() { + return olderThan; + } + + @CheckForNull + public CompactionMap getCompactionMap() { + return this.compactionMap; + } + + @Override + public String toString() { + return "DefaultCompactionStrategy [pauseCompaction=" + paused + + ", cloneBinaries=" + cloneBinaries + ", cleanup=" + cleanupType + + ", olderThan=" + olderThan + ']'; + } + + public void setCompactionStart(long ms) { + this.compactionStart = ms; + } + + public byte getMemoryThreshold() { + return memoryThreshold; + } + + public void setMemoryThreshold(byte memoryThreshold) { + this.memoryThreshold = memoryThreshold; + } + + public boolean compacted(@Nonnull Callable setHead) throws Exception { + return checkNotNull(setHead).call(); + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java (revision ) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java (revision ) @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.segment.file; + +import static org.apache.commons.io.FileUtils.deleteDirectory; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.util.Random; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class CompactionEstimatorTest { + + private File directory; + + @Before + public void setUp() throws IOException { + directory = File.createTempFile( + "FileStoreTest", "dir", new File("target")); + directory.delete(); + directory.mkdir(); + } + + @After + public void cleanDir() throws IOException { + deleteDirectory(directory); + } + + @Test + public void testGainEstimator() throws Exception { + final int MB = 1024 * 1024; + final int blobSize = 2 * MB; + + FileStore fileStore = new FileStore(directory, 2, false); + SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); + + // 1. Create some blob properties + NodeBuilder builder = nodeStore.getRoot().builder(); + + NodeBuilder c1 = builder.child("c1"); + c1.setProperty("a", createBlob(nodeStore, blobSize)); + c1.setProperty("b", "foo"); + + NodeBuilder c2 = builder.child("c2"); + c2.setProperty("a", createBlob(nodeStore, blobSize)); + c2.setProperty("b", "foo"); + + NodeBuilder c3 = builder.child("c3"); + c3.setProperty("a", createBlob(nodeStore, blobSize)); + c3.setProperty("b", "foo"); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // 2. Now remove the property + builder = nodeStore.getRoot().builder(); + builder.child("c1").remove(); + builder.child("c2").remove(); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + fileStore.flush(); + try { + // should be at 66% + assertTrue(fileStore.estimateCompactionGain() + .estimateCompactionGain() > 60); + } finally { + fileStore.close(); + } + } + + private static Blob createBlob(NodeStore nodeStore, int size) throws IOException { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return nodeStore.createBlob(new ByteArrayInputStream(data)); + } + +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (revision ) @@ -52,11 +52,6 @@ private static final Logger log = LoggerFactory.getLogger(Compactor.class); /** - * over 64K in size, not will be included in the compaction map - */ - private static final long threshold = 65536; - - /** * Locks down the RecordId persistence structure */ static long[] recordAsKey(RecordId r) { @@ -66,7 +61,7 @@ private final SegmentWriter writer; - private CompactionMap map = new CompactionMap(100000); + private final CompactionMap map; /** * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted @@ -87,16 +82,23 @@ public Compactor(SegmentWriter writer, boolean cloneBinaries) { this.writer = writer; + this.map = new CompactionMap(100000, writer.getTracker()); this.cloneBinaries = cloneBinaries; } - public SegmentNodeState compact(NodeState before, NodeState after) { + protected SegmentNodeBuilder process(NodeState before, NodeState after) { SegmentNodeBuilder builder = new SegmentNodeBuilder( writer.writeNode(before), writer); after.compareAgainstBaseState(before, new CompactDiff(builder)); - return builder.getNodeState(); + return builder; } + public SegmentNodeState compact(NodeState before, NodeState after) { + SegmentNodeState compacted = process(before, after).getNodeState(); + writer.flush(); + return compacted; + } + public CompactionMap getCompactionMap() { map.compress(); return map; @@ -113,7 +115,6 @@ return super.propertyAdded(compact(after)); } - @Override public boolean propertyChanged( PropertyState before, PropertyState after) { @@ -139,7 +140,7 @@ if (success) { SegmentNodeState state = writer.writeNode(child.getNodeState()); builder.setChildNode(name, state); - if (id != null && includeInMap(state)) { + if (id != null) { map.put(id, state.getRecordId()); } } @@ -147,23 +148,6 @@ return success; } - private boolean includeInMap(SegmentNodeState state) { - if (state.getChildNodeCount(2) > 1) { - return true; - } - long count = 0; - for (PropertyState ps : state.getProperties()) { - for (int i = 0; i < ps.count(); i++) { - long size = ps.size(i); - count += size; - if (size >= threshold || count >= threshold) { - return true; - } - } - } - return false; - } - @Override public boolean childNodeChanged( String name, NodeState before, NodeState after) { @@ -178,14 +162,16 @@ } NodeBuilder child = builder.getChildNode(name); - boolean success = after.compareAgainstBaseState( - before, new CompactDiff(child)); + boolean success = after.compareAgainstBaseState(before, + new CompactDiff(child)); - if (success && id != null && child.getChildNodeCount(2) > 1) { - RecordId compactedId = - writer.writeNode(child.getNodeState()).getRecordId(); + if (success) { + RecordId compactedId = writer.writeNode(child.getNodeState()) + .getRecordId(); + if (id != null) { - map.put(id, compactedId); - } + map.put(id, compactedId); + } + } return success; } @@ -221,16 +207,18 @@ SegmentBlob sb = (SegmentBlob) blob; try { - // if the blob is inlined or external, just clone it - if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { - return sb.clone(writer, cloneBinaries); - } - // else check if we've already cloned this specific record RecordId id = sb.getRecordId(); RecordId compactedId = map.get(id); if (compactedId != null) { return new SegmentBlob(compactedId); + } + + // if the blob is inlined or external, just clone it + if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { + SegmentBlob clone = sb.clone(writer, cloneBinaries); + map.put(id, clone.getRecordId()); + return clone; } // alternatively look if the exact same binary has been cloned Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (revision ) @@ -45,6 +45,7 @@ import java.util.zip.CRC32; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.plugins.segment.CompactionMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +147,7 @@ if (reader != null) { return reader; } else { - throw new IOException("Failed to open recoved tar file " + file); + throw new IOException("Failed to open recovered tar file " + file); } } @@ -584,12 +585,15 @@ return -1; } - synchronized TarReader cleanup(Set referencedIds) throws IOException { + synchronized TarReader cleanup(Set referencedIds, CompactionMap cm) throws IOException { + if (referencedIds.isEmpty()) { + return null; + } + Map> graph = null; if (this.graph != null) { graph = parseGraph(); } - TarEntry[] sorted = new TarEntry[index.remaining() / 24]; int position = index.position(); for (int i = 0; position < index.limit(); i++) { @@ -611,17 +615,28 @@ // this segment is not referenced anywhere sorted[i] = null; } else { + if (isDataSegmentId(entry.lsb())) { - size += getEntrySize(entry.size()); - count += 1; + size += getEntrySize(entry.size()); + count += 1; - if (isDataSegmentId(entry.lsb())) { // this is a referenced data segment, so follow the graph if (graph != null) { List refids = graph.get( new UUID(entry.msb(), entry.lsb())); if (refids != null) { - referencedIds.addAll(refids); + for (UUID r : refids) { + if (isDataSegmentId(r.getLeastSignificantBits())) { + referencedIds.add(r); + } else { + if (cm != null && cm.wasCompacted(id)) { + // skip bulk compacted segment + // references + } else { + referencedIds.add(r); - } + } + } + } + } } else { // a pre-compiled graph is not available, so read the // references directly from this segment @@ -632,10 +647,26 @@ int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff; int refend = pos + 16 * (refcount + 1); for (int refpos = pos + 16; refpos < refend; refpos += 16) { - referencedIds.add(new UUID( - segment.getLong(refpos), - segment.getLong(refpos + 8))); + UUID r = new UUID(segment.getLong(refpos), + segment.getLong(refpos + 8)); + if (isDataSegmentId(r.getLeastSignificantBits())) { + referencedIds.add(r); + } else { + if (cm != null && cm.wasCompacted(id)) { + // skip bulk compacted segment references + } else { + referencedIds.add(r); - } + } + } + } + } + } else { + // bulk segments compaction check + if (cm != null && cm.wasCompacted(id)) { + sorted[i] = null; + } else { + size += getEntrySize(entry.size()); + count += 1; } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (revision ) @@ -61,7 +61,7 @@ * The number of bytes (or bits of address space) to use for the * alignment boundary of segment records. */ - static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary + public static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary /** * Maximum segment size. Record identifiers are stored as three-byte @@ -185,8 +185,10 @@ if (!id.isDataSegmentId()) { type = "bulk"; } + long delta = System.currentTimeMillis() - id.getCreationTime(); throw new IllegalStateException("RefId '" + index - + "' doesn't exist in " + type + " segment " + id); + + "' doesn't exist in " + type + " segment " + id + + ". Creation date delta is " + delta + " ms."); } SegmentId refid = refids[index]; if (refid == null) { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (date 1418330678000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (revision ) @@ -44,17 +44,21 @@ private final long lsb; + private final long creationTime; + private volatile Segment segment; - public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) { + private SegmentId(SegmentTracker tracker, long msb, long lsb, + Segment segment, long creationTime) { this.tracker = tracker; this.msb = msb; this.lsb = lsb; this.segment = segment; + this.creationTime = creationTime; } public SegmentId(SegmentTracker tracker, long msb, long lsb) { - this(tracker, msb, lsb, null); + this(tracker, msb, lsb, null, System.currentTimeMillis()); } /** @@ -72,7 +76,7 @@ * @return {@code true} for a bulk segment, {@code false} otherwise */ public boolean isBulkSegmentId() { - return (lsb >>> 60) == 0xBL; + return (lsb >>> 60) == 0xBL; } public boolean equals(long msb, long lsb) { @@ -110,18 +114,22 @@ return tracker; } + public long getCreationTime() { + return creationTime; + } + - //--------------------------------------------------------< Comparable >-- + // --------------------------------------------------------< Comparable >-- @Override public int compareTo(SegmentId that) { int d = Long.valueOf(this.msb).compareTo(Long.valueOf(that.msb)); if (d == 0) { - d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb)); + d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb)); } return d; } - //------------------------------------------------------------< Object >-- + // ------------------------------------------------------------< Object >-- @Override public String toString() { @@ -130,7 +138,13 @@ @Override public boolean equals(Object object) { - return this == object; + if (this == object) { + return true; + } else if (object instanceof SegmentId) { + SegmentId that = (SegmentId) object; + return msb == that.msb && lsb == that.lsb; + } + return false; } @Override