Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (date 1444142332000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (date 1444166445000) @@ -225,6 +225,7 @@ this.refids = new SegmentId[SEGMENT_REFERENCE_LIMIT + 1]; this.refids[0] = id; this.version = SegmentVersion.fromByte(buffer[3]); + this.id.setSegment(this); } SegmentVersion getSegmentVersion() { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (date 1444142332000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (date 1444166445000) @@ -168,7 +168,6 @@ this.version = version; this.buffer = createNewBuffer(version); this.segment = new Segment(tracker, buffer); - segment.getSegmentId().setSegment(segment); } /** @@ -176,75 +175,97 @@ * store. This is done automatically (called from prepare) when there is not * enough space for a record. It can also be called explicitly. */ - public synchronized void flush() { + public void flush() { + // Id of the segment to be written in the file store. If the segment id + // is not null, a segment will be written outside of the synchronized block. + SegmentId segmentId = null; + + // Buffer containing segment data, and offset and length to locate the + // segment data into the buffer. These variable will be initialized in + // the synchronized block. + byte[] segmentBuffer = null; + int segmentOffset = 0; + int segmentLength = 0; + + synchronized (this) { - if (length > 0) { - int refcount = segment.getRefCount(); + if (length > 0) { + int refcount = segment.getRefCount(); - int rootcount = roots.size(); - buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8); - buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount; + int rootcount = roots.size(); + buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8); + buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount; - int blobrefcount = blobrefs.size(); - buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8); - buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount; + int blobrefcount = blobrefs.size(); + buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8); + buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount; - length = align( - refcount * 16 + rootcount * 3 + blobrefcount * 2 + length, - 16); + length = align( + refcount * 16 + rootcount * 3 + blobrefcount * 2 + length, + 16); - checkState(length <= buffer.length); + checkState(length <= buffer.length); - int pos = refcount * 16; - if (pos + length <= buffer.length) { - // the whole segment fits to the space *after* the referenced - // segment identifiers we've already written, so we can safely - // copy those bits ahead even if concurrent code is still - // reading from that part of the buffer + int pos = refcount * 16; + if (pos + length <= buffer.length) { + // the whole segment fits to the space *after* the referenced + // segment identifiers we've already written, so we can safely + // copy those bits ahead even if concurrent code is still + // reading from that part of the buffer - System.arraycopy(buffer, 0, buffer, buffer.length-length, pos); + System.arraycopy(buffer, 0, buffer, buffer.length - length, pos); - pos += buffer.length - length; - } else { - // this might leave some empty space between the header and - // the record data, but this case only occurs when the - // segment is >252kB in size and the maximum overhead is <<4kB, - // which is acceptable - length = buffer.length; - } + pos += buffer.length - length; + } else { + // this might leave some empty space between the header and + // the record data, but this case only occurs when the + // segment is >252kB in size and the maximum overhead is <<4kB, + // which is acceptable + length = buffer.length; + } - for (Map.Entry entry : roots.entrySet()) { - int offset = entry.getKey().getOffset(); - buffer[pos++] = (byte) entry.getValue().ordinal(); - buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); - buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); - } + for (Map.Entry entry : roots.entrySet()) { + int offset = entry.getKey().getOffset(); + buffer[pos++] = (byte) entry.getValue().ordinal(); + buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); + buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); + } - for (RecordId blobref : blobrefs) { - int offset = blobref.getOffset(); - buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); - buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); - } + for (RecordId blobref : blobrefs) { + int offset = blobref.getOffset(); + buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); + buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); + } - SegmentId id = segment.getSegmentId(); - log.debug("Writing data segment {} ({} bytes)", id, length); - store.writeSegment(id, buffer, buffer.length - length, length); + segmentId = segment.getSegmentId(); + segmentBuffer = buffer; + segmentOffset = buffer.length - length; + segmentLength = length; - // Keep this segment in memory as it's likely to be accessed soon - ByteBuffer data; - if (buffer.length - length > 4096) { - data = ByteBuffer.allocate(length); - data.put(buffer, buffer.length - length, length); - data.rewind(); - } else { - data = ByteBuffer.wrap(buffer, buffer.length - length, length); - } - tracker.setSegment(id, new Segment(tracker, id, data)); - - buffer = createNewBuffer(version); - roots.clear(); - blobrefs.clear(); - length = 0; - position = buffer.length; - segment = new Segment(tracker, buffer); + buffer = createNewBuffer(version); + roots.clear(); + blobrefs.clear(); + length = 0; + position = buffer.length; + segment = new Segment(tracker, buffer); - segment.getSegmentId().setSegment(segment); + } + } + + if (segmentId != null) { + log.debug("Writing data segment {} ({} bytes)", segmentId, segmentLength); + store.writeSegment(segmentId, segmentBuffer, segmentOffset, segmentLength); + + // Keep this segment in memory as it's likely to be accessed soon + ByteBuffer data; + if (segmentOffset > 4096) { + data = ByteBuffer.allocate(segmentLength); + data.put(segmentBuffer, segmentOffset, segmentLength); + data.rewind(); + } else { + data = ByteBuffer.wrap(segmentBuffer, segmentOffset, segmentLength); + } + + // It is important to put the segment into the cache only *after* it has been + // written to the store since as soon as it is in the cache it becomes eligible + // for eviction, which might lead to SNFEs when it is not yet in the store at that point. + tracker.setSegment(segmentId, new Segment(tracker, segmentId, data)); } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (date 1444142332000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (date 1444166445000) @@ -99,7 +99,9 @@ // really long time span, no binary cloning - FileStore fileStore = new FileStore(directory, 1); + FileStore fileStore = FileStore.newFileStore(directory) + .withMaxFileSize(1) + .create(); final SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore); CompactionStrategy custom = new CompactionStrategy(false, false, CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0) {