Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -69,7 +69,7 @@ * * @return segment tracker */ - protected SegmentTracker getTracker() { + public SegmentTracker getTracker() { // FIXME undo return segmentId.getTracker(); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -18,7 +18,9 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.hash.Hashing.sha1; import static java.util.Collections.emptySet; +import static org.apache.jackrabbit.oak.commons.IOUtils.readFully; import static org.apache.jackrabbit.oak.plugins.segment.Segment.MEDIUM_LIMIT; import static org.apache.jackrabbit.oak.plugins.segment.Segment.SMALL_LIMIT; import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE; @@ -48,7 +50,7 @@ } } - SegmentBlob(RecordId id) { + public SegmentBlob(RecordId id) { // FIXME review / undo super(id); } @@ -57,6 +59,17 @@ byte[] inline = new byte[length]; segment.readBytes(offset, inline, 0, length); return new SegmentStream(getRecordId(), inline); + } + + public String getBlobKey() throws IOException { // FIXME also use from compactor + InputStream stream = getNewStream(); + try { + byte[] buffer = new byte[BLOCK_SIZE]; + int n = readFully(stream, buffer, 0, buffer.length); + return length() + ":" + sha1().hashBytes(buffer, 0, n); + } finally { + stream.close(); + } } @Override @Nonnull Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.api.Type.STRING; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.apache.jackrabbit.oak.plugins.segment.Record.fastEquals; import java.io.Closeable; @@ -54,6 +55,7 @@ import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff; +import org.apache.jackrabbit.oak.spi.state.CopyDiff; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; @@ -231,7 +233,13 @@ if (!fastEquals(before, root)) { SegmentNodeState after = snb.getNodeState(); snb.reset(root); - after.compareAgainstBaseState( + NodeState afterCopy; + if (preDatesLastCompaction(after)) { + afterCopy = copyAfter(before, after); + } else { + afterCopy = after; + } + afterCopy.compareAgainstBaseState( before, new ConflictAnnotatingRebaseDiff(snb)); } @@ -402,6 +410,17 @@ return head.get().getChildNode(CHECKPOINTS); } + @Nonnull + private NodeState copyAfter(NodeState before, NodeState after) { + NodeBuilder builder = before.builder(); + after.compareAgainstBaseState(before, new CopyDiff(builder, store.getTracker().getWriter().writeNode(EMPTY_NODE))); + return builder.getNodeState(); + } + + private boolean preDatesLastCompaction(SegmentNodeState state) { + return true; // TODO implement preDatesLast + } + private class Commit { private final Random random = new Random(); @@ -446,7 +465,13 @@ // there were some external changes, so do the full rebase ConflictAnnotatingRebaseDiff diff = new ConflictAnnotatingRebaseDiff(builder.child(ROOT)); - after.compareAgainstBaseState(before, diff); + NodeState afterCopy; + if (preDatesLastCompaction(after)) { + afterCopy = copyAfter(before, after); + } else { + afterCopy = after; + } + afterCopy.compareAgainstBaseState(before, diff); // apply commit hooks on the rebased changes builder.setChildNode(ROOT, hook.processCommit( builder.getBaseState().getChildNode(ROOT), Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -86,7 +86,7 @@ private static final Logger log = LoggerFactory.getLogger(SegmentWriter.class); - static final int BLOCK_SIZE = 1 << 12; // 4kB + public static final int BLOCK_SIZE = 1 << 12; // 4kB FIXME review / undo static byte[] createNewBuffer(SegmentVersion v) { byte[] buffer = new byte[Segment.MAX_SEGMENT_SIZE]; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/package-info.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("5.0.0") +@Version("5.1.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.plugins.segment; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/CopyDiff.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/CopyDiff.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/CopyDiff.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.spi.state; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; +import static org.apache.jackrabbit.oak.api.Type.BINARIES; +import static org.apache.jackrabbit.oak.api.Type.BINARY; +import static org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.compareAgainstEmptyState; +import static org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState.binaryPropertyFromBlob; +import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.segment.RecordId; +import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob; +import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; +import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CopyDiff implements NodeStateDiff { + private static final Logger LOG = LoggerFactory.getLogger(CopyDiff.class); + + private final NodeBuilder builder; + private final SegmentWriter writer; + private final NodeState empty; + + public CopyDiff(NodeBuilder builder, NodeState empty) { + this(builder, toSegmentNodeState(empty).getTracker().getWriter(), empty); + } + + private static SegmentNodeState toSegmentNodeState(NodeState empty) { + // FIXME this should work for any node state + checkArgument(empty instanceof SegmentNodeState); + return (SegmentNodeState) empty; + } + + private CopyDiff(NodeBuilder builder, SegmentWriter writer, NodeState empty) { + this.builder = builder; + this.writer = writer; + this.empty = empty; + } + + // FIXME move to common location and share with dup in CompactorDiff + private PropertyState copy(PropertyState property) { + String name = property.getName(); + Type type = property.getType(); + if (type == BINARY) { + Blob blob = copy(property.getValue(BINARY)); + return binaryProperty(name, blob); + } else if (type == BINARIES) { + List blobs = new ArrayList(); + for (Blob blob : property.getValue(BINARIES)) { + blobs.add(copy(blob)); + } + return binaryPropertyFromBlob(name, blobs); + } else { + return createProperty(name, property.getValue(type), type); + } + } + + private final Map> binaries = newHashMap(); + + // FIXME move to common location and share with dup in CompactorDiff + private Blob copy(Blob blob) { + if (blob instanceof SegmentBlob) { + SegmentBlob sb = (SegmentBlob) blob; + try { + RecordId id = sb.getRecordId(); + + // if the blob is inlined or external, just clone it + if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) { + return sb.clone(writer, false); + } + + // alternatively look if the exact same binary has been cloned + String key = ((SegmentBlob) blob).getBlobKey(); + List ids = binaries.get(key); + if (ids != null) { + for (RecordId duplicateId : ids) { + SegmentBlob dupBlob = new SegmentBlob(duplicateId); + if (dupBlob.equals(sb)) { + return dupBlob; + } + } + } + + // if not, clone the blob and keep track of the result + sb = sb.clone(writer, false); + if (ids == null) { + ids = newArrayList(); + binaries.put(key, ids); + } + ids.add(sb.getRecordId()); + + return sb; + } catch (IOException e) { + LOG.warn("Failed to copy a blob", e); + // fall through + } + } + + // no way to compact this blob, so we'll just keep it as-is + return blob; + } + + private NodeState copy(NodeState nodeState) { + NodeBuilder builder = empty.builder(); + compareAgainstEmptyState(nodeState, new ApplyDiff(builder)); + return builder.getNodeState(); + } + + @Override + public boolean propertyAdded(PropertyState after) { + builder.setProperty(copy(after)); + return true; + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + builder.setProperty(copy(after)); + return true; + } + + @Override + public boolean propertyDeleted(PropertyState before) { + builder.removeProperty(before.getName()); + return true; + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + builder.setChildNode(name, copy(after)); + return true; + } + + @Override + public boolean childNodeChanged(String name, NodeState before, NodeState after) { + return after.compareAgainstBaseState(before, new CopyDiff(builder.getChildNode(name), writer, empty)); + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + builder.getChildNode(name).remove(); + return true; + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/package-info.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/package-info.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/package-info.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("1.2.0") +@Version("1.3.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.spi.state; \ No newline at end of file Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (revision 1c535ee94f1ae9f79fabd3bee61a60bbb740083e) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (revision 1651c92aad7d1122b840fb2fd228f44935ad0c40) @@ -67,7 +67,6 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -361,7 +360,6 @@ * Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments */ @Test - @Ignore("OAK-3348") // FIXME OAK-3348 public void preCompactionReferences() throws IOException, CommitFailedException, InterruptedException { for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) { File repoDir = new File(directory, ref);