Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (revision 1597026) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (working copy) @@ -86,7 +86,7 @@ * value. And since small values are never stored as medium ones, we can * extend the size range to cover that many longer values. */ - static final int MEDIUM_LIMIT = (1 << (16 - 2)) + SMALL_LIMIT; + public static final int MEDIUM_LIMIT = (1 << (16 - 2)) + SMALL_LIMIT; public static int REF_COUNT_OFFSET = 5; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (revision 1597026) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (working copy) @@ -21,6 +21,8 @@ import static org.apache.jackrabbit.oak.plugins.segment.Segment.SMALL_LIMIT; import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE; +import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; import javax.annotation.CheckForNull; @@ -30,7 +32,7 @@ import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob; import org.apache.jackrabbit.oak.spi.blob.BlobStore; -class SegmentBlob extends Record implements Blob { +public class SegmentBlob extends Record implements Blob { SegmentBlob(RecordId id) { super(id); @@ -138,6 +140,32 @@ } } + public SegmentBlob clone(SegmentWriter writer) throws IOException { + Segment segment = getSegment(); + int offset = getOffset(); + byte head = segment.readByte(offset); + if ((head & 0x80) == 0x00) { + // 0xxx xxxx: small value + return writer.writeStream(new BufferedInputStream(getNewStream())); + } else if ((head & 0xc0) == 0x80) { + // 10xx xxxx: medium value + return writer.writeStream(new BufferedInputStream(getNewStream())); + } else if ((head & 0xe0) == 0xc0) { + // 110x xxxx: long value + long length = (segment.readLong(offset) & 0x1fffffffffffffffL) + MEDIUM_LIMIT; + int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE); + ListRecord list = new ListRecord( + segment.readRecordId(offset + 8), listSize); + return writer.writeLargeBlob(length, list.getEntries()); + } else if ((head & 0xf0) == 0xe0) { + // 1110 xxxx: external value + return writer.writeExternalBlob(getBlobId()); + } else { + throw new IllegalStateException(String.format( + "Unexpected value record type: %02x", head & 0xff)); + } + } + //------------------------------------------------------------< Object >-- @Override Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (revision 1597026) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (working copy) @@ -717,6 +717,20 @@ return writeStream(blob.getNewStream()); } + SegmentBlob writeExternalBlob(String blobId) throws IOException { + RecordId id = writeValueRecord(blobId); + return new SegmentBlob(id); + } + + SegmentBlob writeLargeBlob(long length, List list) { + RecordId id = writeValueRecord(length, writeList(list)); + return new SegmentBlob(id); + } + + public void dropCache(){ + records.clear(); + } + /** * Writes a stream value record. The given stream is consumed * and closed by this method. Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (revision 1597026) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (working copy) @@ -35,6 +35,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileLock; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; @@ -49,14 +50,22 @@ import java.util.regex.Pattern; import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState; +import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState; +import org.apache.jackrabbit.oak.plugins.memory.PropertyStates; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; +import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob; import org.apache.jackrabbit.oak.plugins.segment.SegmentId; import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker; import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; import org.slf4j.Logger; @@ -134,6 +143,15 @@ */ private final CountDownLatch timeToClose = new CountDownLatch(1); + /** + * Numbers of levels deep into the repository the compaction is supposed to + * go + * + * Default value is 5, enough to reach the lucene binaries + * + */ + private int compactLevels = 5; + public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, boolean memoryMapping) throws IOException { this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping); @@ -327,6 +345,45 @@ return dataFiles; } + void compact(SegmentNodeState state, String name, int levels, + NodeBuilder dest) throws IOException { + // log.debug("... compacting {}", name); + for (PropertyState ps : state.getProperties()) { + if (Type.BINARY.tag() != ps.getType().tag()) { + ps = PropertyStates.createProperty(ps.getName(), + ps.getValue(ps.getType()), ps.getType()); + } else { + List newBlobList = new ArrayList(); + for (Blob b : ps.getValue(Type.BINARIES)) { + if (b instanceof SegmentBlob) { + SegmentBlob sb = (SegmentBlob) b; + b = sb.clone(tracker.getWriter()); + } + newBlobList.add(b); + } + if (ps.isArray()) { + ps = MultiBinaryPropertyState.binaryPropertyFromBlob( + ps.getName(), newBlobList); + } else { + ps = BinaryPropertyState.binaryProperty(ps.getName(), + newBlobList.get(0)); + } + } + dest.setProperty(ps); + } + + for (ChildNodeEntry entry : state.getChildNodeEntries()) { + SegmentNodeState child = (SegmentNodeState) entry.getNodeState(); + String n = entry.getName(); + if (levels > 0) { + compact(child, name + entry.getName() + "/", levels - 1, + dest.child(entry.getName())); + } else { + dest.setChildNode(n, child); + } + } + } + public void flush() throws IOException { synchronized (persistedHead) { RecordId before = persistedHead.get(); @@ -350,6 +407,20 @@ if (cleanup) { long start = System.nanoTime(); + log.debug("TarMK compaction"); + tracker.getWriter().dropCache(); + SegmentNodeState state = new SegmentNodeState(after); + NodeBuilder mem = EmptyNodeState.EMPTY_NODE.builder(); + compact(state, "/", compactLevels, mem); + setHead(state, + tracker.getWriter().writeNode( + mem.getNodeState())); + before = null; + after = null; + state = null; + mem = null; + System.gc(); + Set ids = newHashSet(); for (SegmentId id : tracker.getReferencedSegmentIds()) { ids.add(new UUID( @@ -591,4 +662,12 @@ System.gc(); cleanupNeeded.set(true); } + + public void setCompactLevels(int compactLevels) { + this.compactLevels = compactLevels; + } + + public int getCompactLevels() { + return compactLevels; + } } \ No newline at end of file Index: oak-run/README.md =================================================================== --- oak-run/README.md (revision 1597026) +++ oak-run/README.md (working copy) @@ -30,6 +30,14 @@ $ java -jar oak-run-*.jar debug /path/to/oak/repository [id...] +Compact +------- + +The 'compact' mode runs the segment compaction operation on the provided repository. +To start this mode, use: + + $ java -jar oak-run-*.jar compact /path/to/oak/repository + Upgrade ------- Index: oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java =================================================================== --- oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (revision 1597026) +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (working copy) @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -102,6 +103,9 @@ case DEBUG: debug(args); break; + case COMPACT: + compact(args); + break; case SERVER: server(URI, args); break; @@ -155,6 +159,50 @@ } } + private static void compact(String[] args) throws IOException { + if (args.length == 0 || args.length > 2) { + System.err.println("usage: compact [compactLevels]"); + System.exit(1); + } else { + int levels = -1; + if (args.length == 2) { + try { + levels = Integer.valueOf(args[1]); + } catch (NumberFormatException ex) { + System.err + .println("Bad number format for 'compactLevels', expecting a positive int."); + System.exit(1); + } + } + + File directory = new File(args[0]); + FileStore store = new FileStore(directory, 256, false); + if (levels > 0) { + store.setCompactLevels(levels); + } + System.out.println("Compacting " + directory + " on " + + store.getCompactLevels() + " levels."); + + System.out.println(" before " + Arrays.toString(directory.list())); + try { + store.gc(); + store.flush(); + store.close(); + System.gc(); + + store = new FileStore(directory, 256, false); + store.gc(); + store.flush(); + + System.out + .println(" after " + Arrays.toString(directory.list())); + } finally { + store.close(); + } + + } + } + private static void debug(String[] args) throws IOException { if (args.length == 0) { System.err.println("usage: debug [id...]"); @@ -481,6 +529,7 @@ BACKUP("backup"), BENCHMARK("benchmark"), DEBUG("debug"), + COMPACT("compact"), SERVER("server"), UPGRADE("upgrade");