diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreBackup.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreBackup.java index 8b16bba..9b6369d 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreBackup.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreBackup.java @@ -69,11 +69,21 @@ public class FileStoreBackup { try { int gen = 0; gen = current.getRecordId().getSegment().getGcGeneration(); - SegmentBufferWriter bufferWriter = new SegmentBufferWriter(backup, - backup.getTracker(), backup.getReader(), "b", gen); - SegmentWriter writer = new SegmentWriter(backup, - backup.getReader(), backup.getBlobStore(), - new WriterCacheManager.Default(), bufferWriter); + SegmentBufferWriter bufferWriter = new SegmentBufferWriter( + backup, + backup.getTracker(), + backup.getReader(), + "b", + gen + ); + SegmentWriter writer = new SegmentWriter( + backup, + backup.getReader(), + backup.getBlobStore(), + new WriterCacheManager.Default(), + bufferWriter, + backup.getBinaryReferenceConsumer() + ); Compactor compactor = new Compactor(backup.getReader(), writer, backup.getBlobStore(), Suppliers.ofInstance(false), gcOptions); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreRestore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreRestore.java index f104fdc..e6cac74 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreRestore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/backup/FileStoreRestore.java @@ -61,11 +61,21 @@ public class FileStoreRestore { try { SegmentNodeState head = restore.getHead(); int gen = head.getRecordId().getSegment().getGcGeneration(); - SegmentBufferWriter bufferWriter = new SegmentBufferWriter(store, - store.getTracker(), store.getReader(), "r", gen); - SegmentWriter writer = new SegmentWriter(store, store.getReader(), - store.getBlobStore(), new WriterCacheManager.Default(), - bufferWriter); + SegmentBufferWriter bufferWriter = new SegmentBufferWriter( + store, + store.getTracker(), + store.getReader(), + "r", + gen + ); + SegmentWriter writer = new SegmentWriter( + store, + store.getReader(), + store.getBlobStore(), + new WriterCacheManager.Default(), + bufferWriter, + store.getBinaryReferenceConsumer() + ); SegmentGCOptions gcOptions = defaultGCOptions().setOffline(); Compactor compactor = new Compactor(store.getReader(), writer, store.getBlobStore(), Suppliers.ofInstance(false), diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumer.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumer.java new file mode 100644 index 0000000..bd18c14 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +/** + * A consumer for references to external binaries. An implementor of this + * interface is called every time an external binary reference is written in the + * store. + */ +public interface BinaryReferenceConsumer { + + /** + * Consume the reference to an external binary. + * + * @param generation The generation of the record referencing the + * binary. + * @param binaryReference The opaque string representation of the binary + * reference. + */ + void consume(int generation, String binaryReference); + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumers.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumers.java new file mode 100644 index 0000000..460e0d4 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BinaryReferenceConsumers.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +/** + * Utility methods to work with {@link BinaryReferenceConsumer} instances. + */ +public class BinaryReferenceConsumers { + + /** + * Creates a new instance of {@link BinaryReferenceConsumer} that ignores + * every binary reference it consumes. + * + * @return A new instance of {@link BinaryReferenceConsumer}. + */ + public static BinaryReferenceConsumer newDiscardBinaryReferenceConsumer() { + return new BinaryReferenceConsumer() { + + @Override + public void consume(int generation, String binaryReference) { + // Discard the binary reference + } + + }; + } + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordWriters.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordWriters.java index d21db1a..6a06100 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordWriters.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordWriters.java @@ -399,7 +399,6 @@ final class RecordWriters { // This allows the code to take apart small from a large blob IDs. writer.writeByte((byte) 0xF0); writer.writeRecordId(stringRecord); - writer.addBlobRef(id); return id; } } @@ -426,7 +425,6 @@ final class RecordWriters { int length = blobId.length; writer.writeShort((short) (length | 0xE000)); writer.writeBytes(blobId, 0, length); - writer.addBlobRef(id); return id; } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java index 4658901..5016a62 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java @@ -26,7 +26,6 @@ import static com.google.common.collect.Lists.newArrayListWithCapacity; import static com.google.common.collect.Maps.newConcurrentMap; import static java.lang.Boolean.getBoolean; import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly; -import static org.apache.jackrabbit.oak.segment.SegmentBlob.readBlobId; import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId; import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; import static org.apache.jackrabbit.oak.segment.SegmentVersion.isValid; @@ -49,7 +48,6 @@ import org.apache.commons.io.HexDump; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; import org.apache.jackrabbit.oak.plugins.memory.PropertyStates; /** @@ -120,8 +118,6 @@ public class Segment { static final int ROOT_COUNT_OFFSET = 6; - static final int BLOBREF_COUNT_OFFSET = 8; - public static final int GC_GENERATION_OFFSET = 10; @Nonnull @@ -391,20 +387,6 @@ public class Segment { return data.remaining(); } - public void collectBlobReferences(ReferenceCollector collector) { - int refcount = getRefCount(); - int rootcount = - data.getShort(data.position() + ROOT_COUNT_OFFSET) & 0xffff; - int blobrefcount = - data.getShort(data.position() + BLOBREF_COUNT_OFFSET) & 0xffff; - int blobrefpos = data.position() + refcount * 16 + rootcount * 3; - - for (int i = 0; i < blobrefcount; i++) { - int offset = (data.getShort(blobrefpos + i * 2) & 0xffff) << RECORD_ALIGN_BITS; - collector.addReference(readBlobId(this, offset), null); - } - } - byte readByte(int offset) { return data.get(pos(offset, 1)); } @@ -593,14 +575,6 @@ public class Segment { RecordType.values()[data.get(pos + rootid * 3) & 0xff], data.getShort(pos + rootid * 3 + 1) & 0xffff); } - int blobrefcount = data.getShort(BLOBREF_COUNT_OFFSET) & 0xffff; - pos += rootcount * 3; - for (int blobrefid = 0; blobrefid < blobrefcount; blobrefid++) { - int offset = data.getShort(pos + blobrefid * 2) & 0xffff; - writer.format( - "blobref %d: %s at %04x%n", blobrefid, - readBlobId(this, offset << RECORD_ALIGN_BITS), offset); - } } writer.println("--------------------------------------------------------------------------"); int pos = data.limit() - ((length + 15) & ~15); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java index ab9ef7b..c39cae3 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java @@ -23,7 +23,6 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Sets.newHashSet; import static java.lang.System.arraycopy; @@ -41,7 +40,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -78,11 +76,6 @@ public class SegmentBufferWriter implements WriteOperationHandler { */ private final Map roots = newLinkedHashMap(); - /** - * Identifiers of the external blob references stored in this segment. - */ - private final List blobrefs = newArrayList(); - @Nonnull private final SegmentStore store; @@ -173,7 +166,6 @@ public class SegmentBufferWriter implements WriteOperationHandler { length = 0; position = buffer.length; roots.clear(); - blobrefs.clear(); String metaInfo = "{\"wid\":\"" + wid + '"' + @@ -294,10 +286,6 @@ public class SegmentBufferWriter implements WriteOperationHandler { position += length; } - public void addBlobRef(RecordId blobId) { - blobrefs.add(blobId); - } - /** * Adds a segment header to the buffer and writes a segment to the segment * store. This is done automatically (called from prepare) when there is not @@ -312,13 +300,7 @@ public class SegmentBufferWriter implements WriteOperationHandler { buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8); buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount; - int blobrefcount = blobrefs.size(); - buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8); - buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount; - - length = align( - refcount * 16 + rootcount * 3 + blobrefcount * 2 + length, - 16); + length = align(refcount * 16 + rootcount * 3 + length, 16); checkState(length <= buffer.length); @@ -345,12 +327,6 @@ public class SegmentBufferWriter implements WriteOperationHandler { buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); } - for (RecordId blobref : blobrefs) { - int offset = blobref.getOffset(); - buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); - buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); - } - SegmentId segmentId = segment.getSegmentId(); LOG.debug("Writing data segment {} ({} bytes)", segmentId, length); store.writeSegment(segmentId, buffer, buffer.length - length, length); @@ -386,16 +362,14 @@ public class SegmentBufferWriter implements WriteOperationHandler { // that *all* identifiers stored in this record point to previously // unreferenced segments. int refCount = segment.getRefCount() + idCount; - int blobRefCount = blobrefs.size() + 1; int rootCount = roots.size() + 1; - int headerSize = refCount * 16 + rootCount * 3 + blobRefCount * 2; + int headerSize = refCount * 16 + rootCount * 3; int segmentSize = align(headerSize + recordSize + length, 16); // If the size estimate looks too big, recompute it with a more // accurate refCount value. We skip doing this when possible to // avoid the somewhat expensive list and set traversals. - if (segmentSize > buffer.length - 1 - || refCount > Segment.SEGMENT_REFERENCE_LIMIT) { + if (segmentSize > buffer.length - 1 || refCount > Segment.SEGMENT_REFERENCE_LIMIT) { refCount -= idCount; Set segmentIds = newHashSet(); @@ -423,12 +397,11 @@ public class SegmentBufferWriter implements WriteOperationHandler { refCount += segmentIds.size(); } - headerSize = refCount * 16 + rootCount * 3 + blobRefCount * 2; + headerSize = refCount * 16 + rootCount * 3; segmentSize = align(headerSize + recordSize + length, 16); } if (segmentSize > buffer.length - 1 - || blobRefCount > 0xffff || rootCount > 0xffff || refCount > Segment.SEGMENT_REFERENCE_LIMIT) { flush(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java index 922b1ec..33e036d 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java @@ -189,10 +189,22 @@ public class SegmentBufferWriterPool implements WriteOperationHandler { try { SegmentBufferWriter writer = writers.remove(key); if (writer == null) { - writer = new SegmentBufferWriter(store, tracker, reader, getWriterId(wid), gcGeneration.get()); + writer = new SegmentBufferWriter( + store, + tracker, + reader, + getWriterId(wid), + gcGeneration.get() + ); } else if (writer.getGeneration() != gcGeneration.get()) { disposed.add(writer); - writer = new SegmentBufferWriter(store, tracker, reader, getWriterId(wid), gcGeneration.get()); + writer = new SegmentBufferWriter( + store, + tracker, + reader, + getWriterId(wid), + gcGeneration.get() + ); } borrowed.add(writer); return writer; diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java index 55e892a..1da2b92 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java @@ -99,6 +99,9 @@ public class SegmentWriter { @Nonnull private final WriteOperationHandler writeOperationHandler; + @Nonnull + private final BinaryReferenceConsumer binaryReferenceConsumer; + /** * Create a new instance of a {@code SegmentWriter}. Note the thread safety properties * pointed out in the class comment. @@ -113,12 +116,15 @@ public class SegmentWriter { @Nonnull SegmentReader reader, @Nullable BlobStore blobStore, @Nonnull WriterCacheManager cacheManager, - @Nonnull WriteOperationHandler writeOperationHandler) { + @Nonnull WriteOperationHandler writeOperationHandler, + @Nonnull BinaryReferenceConsumer binaryReferenceConsumer + ) { this.store = checkNotNull(store); this.reader = checkNotNull(reader); this.blobStore = blobStore; this.cacheManager = checkNotNull(cacheManager); this.writeOperationHandler = checkNotNull(writeOperationHandler); + this.binaryReferenceConsumer = checkNotNull(binaryReferenceConsumer); } public void flush() throws IOException { @@ -776,11 +782,18 @@ public class SegmentWriter { */ private RecordId writeBlobId(String blobId) throws IOException { byte[] data = blobId.getBytes(UTF_8); + + RecordId recordId; + if (data.length < Segment.BLOB_ID_SMALL_LIMIT) { - return RecordWriters.newBlobIdWriter(data).write(writer); + recordId = RecordWriters.newBlobIdWriter(data).write(writer); } else { - return RecordWriters.newBlobIdWriter(writeString(blobId)).write(writer); + recordId = RecordWriters.newBlobIdWriter(writeString(blobId)).write(writer); } + + binaryReferenceConsumer.consume(writer.getGeneration(), blobId); + + return recordId; } private RecordId writeBlock(@Nonnull byte[] bytes, int offset, int length) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java index a5eda74..14d0319 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java @@ -143,8 +143,14 @@ public final class SegmentWriterBuilder { */ @Nonnull public SegmentWriter build(@Nonnull FileStore store) { - return new SegmentWriter(checkNotNull(store), store.getReader(), - store.getBlobStore(), cacheManager, createWriter(store, pooled)); + return new SegmentWriter( + checkNotNull(store), + store.getReader(), + store.getBlobStore(), + cacheManager, + createWriter(store, pooled), + store.getBinaryReferenceConsumer() + ); } /** @@ -152,8 +158,14 @@ public final class SegmentWriterBuilder { */ @Nonnull public SegmentWriter build(@Nonnull MemoryStore store) { - return new SegmentWriter(checkNotNull(store), store.getReader(), - store.getBlobStore(), cacheManager, createWriter(store, pooled)); + return new SegmentWriter( + checkNotNull(store), + store.getReader(), + store.getBlobStore(), + cacheManager, + createWriter(store, pooled), + store.getBinaryReferenceConsumer() + ); } /** @@ -161,40 +173,76 @@ public final class SegmentWriterBuilder { */ @Nonnull public SegmentWriter build(@Nonnull HttpStore store) { - return new SegmentWriter(checkNotNull(store), store.getReader(), - store.getBlobStore(), cacheManager, createWriter(store, pooled)); + return new SegmentWriter( + checkNotNull(store), + store.getReader(), + store.getBlobStore(), + cacheManager, + createWriter(store, pooled), + store.getBinaryReferenceConsumer() + ); } @Nonnull private WriteOperationHandler createWriter(@Nonnull FileStore store, boolean pooled) { if (pooled) { - return new SegmentBufferWriterPool(store, - store.getTracker(), store.getReader(), name, generation); + return new SegmentBufferWriterPool( + store, + store.getTracker(), + store.getReader(), + name, + generation + ); } else { - return new SegmentBufferWriter(store, - store.getTracker(), store.getReader(), name, generation.get()); + return new SegmentBufferWriter( + store, + store.getTracker(), + store.getReader(), + name, + generation.get() + ); } } @Nonnull private WriteOperationHandler createWriter(@Nonnull MemoryStore store, boolean pooled) { if (pooled) { - return new SegmentBufferWriterPool(store, - store.getTracker(), store.getReader(), name, generation); + return new SegmentBufferWriterPool( + store, + store.getTracker(), + store.getReader(), + name, + generation + ); } else { - return new SegmentBufferWriter(store, - store.getTracker(), store.getReader(), name, generation.get()); + return new SegmentBufferWriter( + store, + store.getTracker(), + store.getReader(), + name, + generation.get() + ); } } @Nonnull private WriteOperationHandler createWriter(@Nonnull HttpStore store, boolean pooled) { if (pooled) { - return new SegmentBufferWriterPool(store, - store.getTracker(), store.getReader(), name, generation); + return new SegmentBufferWriterPool( + store, + store.getTracker(), + store.getReader(), + name, + generation + ); } else { - return new SegmentBufferWriter(store, - store.getTracker(), store.getReader(), name, generation.get()); + return new SegmentBufferWriter( + store, + store.getTracker(), + store.getReader(), + name, + generation.get() + ); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index 9e42114..f194078 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -76,6 +76,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.segment.BinaryReferenceConsumer; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.Compactor; import org.apache.jackrabbit.oak.segment.RecordId; @@ -132,6 +133,9 @@ public class FileStore implements SegmentStore, Closeable { @Nonnull private final CachingSegmentReader segmentReader; + @Nonnull + private final BinaryReferenceConsumer binaryReferenceConsumer; + private final File directory; private final BlobStore blobStore; @@ -267,6 +271,22 @@ public class FileStore implements SegmentStore, Closeable { return getGcGeneration(); } }; + + binaryReferenceConsumer = new BinaryReferenceConsumer() { + + @Override + public void consume(int generation, String binaryReference) { + fileStoreLock.writeLock().lock(); + + try { + tarWriter.addBinaryReference(generation, binaryReference); + } finally { + fileStoreLock.writeLock().unlock(); + } + } + + }; + this.segmentWriter = segmentWriterBuilder("sys") .withGeneration(getGeneration) .withWriterPool() @@ -810,7 +830,7 @@ public class FileStore implements SegmentStore, Closeable { int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1; for (TarReader tarReader : tarReaders) { - tarReader.collectBlobReferences(this, collector, minGeneration); + tarReader.collectBlobReferences(collector, minGeneration); } } @@ -893,8 +913,7 @@ public class FileStore implements SegmentStore, Closeable { } final int newGeneration = getGcGeneration() + 1; - SegmentBufferWriter bufferWriter = new SegmentBufferWriter( - this, tracker, segmentReader, "c", newGeneration); + SegmentBufferWriter bufferWriter = new SegmentBufferWriter(this, tracker, segmentReader, "c", newGeneration); Supplier cancel = newCancelCompactionCondition(); SegmentNodeState after = compact(bufferWriter, before, cancel); if (after == null) { @@ -983,7 +1002,7 @@ public class FileStore implements SegmentStore, Closeable { Supplier cancel) throws IOException { if (gcOptions.isOffline()) { - SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, new Default(), bufferWriter); + SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer); return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions) .compact(EMPTY_NODE, head, EMPTY_NODE); } else { @@ -1057,6 +1076,11 @@ public class FileStore implements SegmentStore, Closeable { } @Nonnull + public BinaryReferenceConsumer getBinaryReferenceConsumer() { + return binaryReferenceConsumer; + } + + @Nonnull public TarRevisions getRevisions() { return revisions; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java index c8ff4ea..a9d2c0e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarReader.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayListWithCapacity; import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.newHashMapWithExpectedSize; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Maps.newTreeMap; import static com.google.common.collect.Sets.newHashSet; @@ -32,6 +33,7 @@ import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.segment.Segment.REF_COUNT_OFFSET; import static org.apache.jackrabbit.oak.segment.Segment.getGcGeneration; import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId; +import static org.apache.jackrabbit.oak.segment.file.TarWriter.BINARY_REFERENCES_MAGIC; import static org.apache.jackrabbit.oak.segment.file.TarWriter.GRAPH_MAGIC; import java.io.Closeable; @@ -44,6 +46,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.UUID; @@ -53,12 +56,11 @@ import java.util.zip.CRC32; import javax.annotation.Nonnull; +import com.google.common.base.Charsets; import com.google.common.base.Predicate; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor; -import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -734,17 +736,23 @@ class TarReader implements Closeable { /** * Collect the references of those blobs that are reachable from any segment with a * generation at or above {@code minGeneration}. - * @param store * @param collector * @param minGeneration */ - void collectBlobReferences(SegmentStore store, ReferenceCollector collector, int minGeneration) { - for (TarEntry entry : getEntries()) { - if (entry.generation() >= minGeneration) { - // FIXME OAK-4201: Add an index of binary references in a tar file - // Fetch the blob references from the tar index instead reading them from the segment - SegmentId id = store.newSegmentId(entry.msb(), entry.lsb()); - id.getSegment().collectBlobReferences(collector); + void collectBlobReferences(ReferenceCollector collector, int minGeneration) { + Map> references = getBinaryReferences(); + + if (references == null) { + return; + } + + for (Entry> entry : references.entrySet()) { + if (entry.getKey() < minGeneration) { + continue; + } + + for (String reference : entry.getValue()) { + collector.addReference(reference, null); } } } @@ -930,6 +938,106 @@ class TarReader implements Closeable { return hasGraph; } + private int getIndexEntrySize() { + return getEntrySize(index.remaining() + 16); + } + + private int getGraphEntrySize() { + ByteBuffer buffer; + + try { + buffer = loadGraph(); + } catch (IOException e) { + return 0; + } + + if (buffer == null) { + return 0; + } + + return getEntrySize(buffer.getInt(buffer.limit() - 8)); + } + + Map> getBinaryReferences() { + ByteBuffer buffer; + + try { + buffer = loadBinaryReferences(); + } catch (IOException e) { + return null; + } + + if (buffer == null) { + return null; + } + + return parseBinaryReferences(buffer); + } + + private ByteBuffer loadBinaryReferences() throws IOException { + int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize(); + + ByteBuffer meta = access.read(end - 16, 16); + + int crc32 = meta.getInt(); + int count = meta.getInt(); + int size = meta.getInt(); + int magic = meta.getInt(); + + if (magic != BINARY_REFERENCES_MAGIC) { + log.warn("Invalid binary references magic number"); + return null; + } + + if (count < 0 || size < count * 22 + 16) { + log.warn("Invalid binary references size or count"); + return null; + } + + ByteBuffer buffer = access.read(end - size, size); + + byte[] data = new byte[size - 16]; + buffer.mark(); + buffer.get(data); + buffer.reset(); + + CRC32 checksum = new CRC32(); + checksum.update(data); + + if ((int) (checksum.getValue()) != crc32) { + log.warn("Invalid binary references checksum"); + return null; + } + + return buffer; + } + + private Map> parseBinaryReferences(ByteBuffer buffer) { + int nGenerations = buffer.getInt(buffer.limit() - 12); + + Map> binaryReferences = newHashMapWithExpectedSize(nGenerations); + + for (int i = 0; i < nGenerations; i++) { + int generation = buffer.getInt(); + int nReferences = buffer.getInt(); + + Set references = newHashSetWithExpectedSize(nReferences); + + for (int j = 0; j < nReferences; j++) { + int length = buffer.getInt(); + + byte[] data = new byte[length]; + buffer.get(data); + + references.add(new String(data, Charsets.UTF_8)); + } + + binaryReferences.put(generation, references); + } + + return binaryReferences; + } + /** * Loads the optional pre-compiled graph entry from the given tar file. * @@ -938,7 +1046,7 @@ class TarReader implements Closeable { */ private ByteBuffer loadGraph() throws IOException { // read the graph metadata just before the tar index entry - int pos = access.length() - 2 * BLOCK_SIZE - getEntrySize(index.remaining() + 16); + int pos = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize(); ByteBuffer meta = access.read(pos - 16, 16); int crc32 = meta.getInt(); int count = meta.getInt(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarWriter.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarWriter.java index bfe464f..c7dda88 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarWriter.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarWriter.java @@ -22,8 +22,6 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkPositionIndexes; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Lists.reverse; import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Maps.newTreeMap; @@ -42,11 +40,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.UUID; import java.util.zip.CRC32; +import com.google.common.base.Charsets; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +96,11 @@ class TarWriter implements Closeable { static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n'; + /** + * Magic sequence at the end of the binary references block. + */ + static final int BINARY_REFERENCES_MAGIC = ('\n' << 24) + ('0' << 16) + ('B' << 8) + '\n'; + /** The tar file block size. */ static final int BLOCK_SIZE = 512; @@ -122,7 +127,7 @@ class TarWriter implements Closeable { /** * File handle. Initialized lazily in - * {@link #writeEntry(long, long, byte[], int, int)} to avoid creating + * {@link #writeEntry(long, long, byte[], int, int, int)} to avoid creating * an extra empty file when just reading from the repository. * Should only be accessed from synchronized code. */ @@ -154,6 +159,11 @@ class TarWriter implements Closeable { */ private final SortedMap> graph = newTreeMap(); + /** + * List of binary references contained in this TAR file. + */ + private final Map> binaryReferences = newHashMap(); + TarWriter(File file) { this(file, FileStoreMonitor.DEFAULT); } @@ -272,6 +282,17 @@ class TarWriter implements Closeable { return currentLength; } + void addBinaryReference(int generation, String reference) { + Set references = binaryReferences.get(generation); + + if (references == null) { + references = newHashSet(); + binaryReferences.put(generation, references); + } + + references.add(reference); + } + /** * Flushes the entries that have so far been written to the disk. * This method is not synchronized to allow concurrent reads @@ -328,6 +349,7 @@ class TarWriter implements Closeable { long initialPosition, currentPosition; synchronized (file) { initialPosition = access.getFilePointer(); + writeBinaryReferences(); writeGraph(); writeIndex(); access.write(ZERO_BYTES); @@ -340,6 +362,84 @@ class TarWriter implements Closeable { monitor.written(currentPosition - initialPosition); } + private void writeBinaryReferences() throws IOException { + int binaryReferenceSize = 0; + + // The following information are stored in the footer as meta- + // information about the entry. + + // 4 bytes to store a magic number identifying this entry as containing + // references to binary values. + binaryReferenceSize += 4; + + // 4 bytes to store the CRC32 checksum of the data in this entry. + binaryReferenceSize += 4; + + // 4 bytes to store the length of this entry, without including the + // optional padding. + binaryReferenceSize += 4; + + // 4 bytes to store the number of generations pairs in the binary + // references map. + binaryReferenceSize += 4; + + // The following information are stored as part of the main content of + // this entry, after the optional padding. + + for (Set references : binaryReferences.values()) { + // 4 bytes per generation to store the generation number itself. + binaryReferenceSize += 4; + + // 4 bytes per generation to store the amount of binary references + // associated to the generation. + binaryReferenceSize += 4; + + for (String reference : references) { + // 4 bytes for each reference to store the length of the reference. + binaryReferenceSize += 4; + + // A variable amount of bytes, depending on the reference itself. + binaryReferenceSize += reference.getBytes(Charsets.UTF_8).length; + } + } + + ByteBuffer buffer = ByteBuffer.allocate(binaryReferenceSize); + + for (Entry> entry : binaryReferences.entrySet()) { + int generation = entry.getKey(); + Set references = entry.getValue(); + + buffer.putInt(generation); + buffer.putInt(references.size()); + + for (String reference : references) { + byte[] bytes = reference.getBytes(Charsets.UTF_8); + + buffer.putInt(bytes.length); + buffer.put(bytes); + } + } + + CRC32 checksum = new CRC32(); + checksum.update(buffer.array(), 0, buffer.position()); + buffer.putInt((int) checksum.getValue()); + buffer.putInt(binaryReferences.size()); + buffer.putInt(binaryReferenceSize); + buffer.putInt(BINARY_REFERENCES_MAGIC); + + int paddingSize = getPaddingSize(binaryReferenceSize); + + byte[] header = newEntryHeader(file.getName() + ".brf", binaryReferenceSize + paddingSize); + + access.write(header); + + if (paddingSize > 0) { + access.write(ZERO_BYTES, 0, paddingSize); + } + + access.write(buffer.array()); + } + private void writeGraph() throws IOException { List uuids = Lists.newArrayListWithCapacity( index.size() + references.size()); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java index bc2c02b..f890964 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java @@ -34,6 +34,8 @@ import javax.annotation.Nonnull; import com.google.common.base.Supplier; import com.google.common.io.ByteStreams; +import org.apache.jackrabbit.oak.segment.BinaryReferenceConsumer; +import org.apache.jackrabbit.oak.segment.BinaryReferenceConsumers; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.Segment; @@ -110,6 +112,11 @@ public class HttpStore implements SegmentStore { return revisions; } + @Nonnull + public BinaryReferenceConsumer getBinaryReferenceConsumer() { + return BinaryReferenceConsumers.newDiscardBinaryReferenceConsumer(); + } + @Override @Nonnull public SegmentId newSegmentId(long msb, long lsb) { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java index ee07ba1..b85a129 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java @@ -29,6 +29,8 @@ import javax.annotation.Nonnull; import com.google.common.base.Supplier; import com.google.common.collect.Maps; +import org.apache.jackrabbit.oak.segment.BinaryReferenceConsumer; +import org.apache.jackrabbit.oak.segment.BinaryReferenceConsumers; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.Segment; @@ -105,6 +107,11 @@ public class MemoryStore implements SegmentStore { return revisions; } + @Nonnull + public BinaryReferenceConsumer getBinaryReferenceConsumer() { + return BinaryReferenceConsumers.newDiscardBinaryReferenceConsumer(); + } + @Override public boolean containsSegment(SegmentId id) { return id.sameStore(this) || segments.containsKey(id); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java index af88829..9ab492b 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java @@ -18,15 +18,12 @@ */ package org.apache.jackrabbit.oak.segment; -import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK; -import static org.apache.jackrabbit.oak.commons.FixturesHelper.getFixtures; import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; import java.io.ByteArrayInputStream; import java.io.File; @@ -56,7 +53,6 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.junit.After; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -71,12 +67,8 @@ public class ExternalBlobIT { @Rule public TemporaryFolder folder = new TemporaryFolder(new File("target")); - @BeforeClass - public static void assumptions() { - assumeTrue(getFixtures().contains(SEGMENT_MK)); - } - - @Test @Ignore("would need a FileBlobStore for this") + @Test + @Ignore("would need a FileBlobStore for this") public void testFileBlob() throws Exception { nodeStore = getNodeStore(new TestBlobStore()); testCreateAndRead(getFileBlob()); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java index 572cc85..72519b2 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java @@ -434,8 +434,13 @@ public class RecordTest { @Test public void testCancel() throws IOException { NodeBuilder builder = EMPTY_NODE.builder(); - SegmentBufferWriter bufferWriter = new SegmentBufferWriter(store, store.getTracker(), - store.getReader(), "test", 0); + SegmentBufferWriter bufferWriter = new SegmentBufferWriter( + store, + store.getTracker(), + store.getReader(), + "test", + 0 + ); NodeState state = writer.writeNode(builder.getNodeState(), bufferWriter, Suppliers.ofInstance(true)); assertNull(state); } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java index 3d1f45a..9f10ec1 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java @@ -51,7 +51,12 @@ public class SegmentBufferWriterPoolTest { private final RecordId rootId = store.getRevisions().getHead(); private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool( - store, store.getTracker(), store.getReader(), "", Suppliers.ofInstance(0)); + store, + store.getTracker(), + store.getReader(), + "", + Suppliers.ofInstance(0) + ); private final ExecutorService[] executors = new ExecutorService[] { newSingleThreadExecutor(), newSingleThreadExecutor(), newSingleThreadExecutor()}; diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarFileTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarFileTest.java index 565a012..c5a2354 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarFileTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarFileTest.java @@ -19,15 +19,17 @@ package org.apache.jackrabbit.oak.segment.file; import static com.google.common.base.Charsets.UTF_8; -import static junit.framework.Assert.assertEquals; +import static com.google.common.collect.Sets.newHashSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; import java.util.UUID; -import org.apache.jackrabbit.oak.segment.file.TarReader; -import org.apache.jackrabbit.oak.segment.file.TarWriter; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -60,7 +62,7 @@ public class TarFileTest { writer.close(); } - assertEquals(4096, file.length()); + assertEquals(5120, file.length()); TarReader reader = TarReader.open(file, false); try { @@ -77,4 +79,35 @@ public class TarFileTest { } } + @Test + public void testWriteAndReadBinaryReferences() throws Exception { + try (TarWriter writer = new TarWriter(file)) { + writer.writeEntry(0x00, 0x00, new byte[] {0x01, 0x02, 0x3}, 0, 3, 0); + + writer.addBinaryReference(1, "r0"); + writer.addBinaryReference(1, "r1"); + writer.addBinaryReference(1, "r2"); + writer.addBinaryReference(1, "r3"); + + writer.addBinaryReference(2, "r4"); + writer.addBinaryReference(2, "r5"); + writer.addBinaryReference(2, "r6"); + + writer.addBinaryReference(3, "r7"); + writer.addBinaryReference(3, "r8"); + } + + try (TarReader reader = TarReader.open(file, false)) { + Map> brf = reader.getBinaryReferences(); + + assertNotNull(brf); + + assertEquals(newHashSet(1, 2, 3), brf.keySet()); + + assertEquals(newHashSet("r0", "r1", "r2", "r3"), brf.get(1)); + assertEquals(newHashSet("r4", "r5", "r6"), brf.get(2)); + assertEquals(newHashSet("r7", "r8"), brf.get(3)); + } + } + }