diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java index b4bd589..e1ebab0 100644 --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java @@ -176,75 +176,95 @@ public class SegmentWriter { * store. This is done automatically (called from prepare) when there is not * enough space for a record. It can also be called explicitly. */ - public synchronized void flush() { - if (length > 0) { - int refcount = segment.getRefCount(); - - int rootcount = roots.size(); - buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8); - buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount; - - int blobrefcount = blobrefs.size(); - buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8); - buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount; - - length = align( - refcount * 16 + rootcount * 3 + blobrefcount * 2 + length, - 16); - - checkState(length <= buffer.length); - - int pos = refcount * 16; - if (pos + length <= buffer.length) { - // the whole segment fits to the space *after* the referenced - // segment identifiers we've already written, so we can safely - // copy those bits ahead even if concurrent code is still - // reading from that part of the buffer - System.arraycopy(buffer, 0, buffer, buffer.length-length, pos); - pos += buffer.length - length; - } else { - // this might leave some empty space between the header and - // the record data, but this case only occurs when the - // segment is >252kB in size and the maximum overhead is <<4kB, - // which is acceptable - length = buffer.length; - } + public void flush() { + // Id of the segment to be written in the file store. If the segment id + // is not null, a segment will be written outside of the synchronized + // block. + SegmentId segmentId = null; + + // Buffer containing segment data, and offset and length to locate the + // segment data into the buffer. These variable will be initialized in + // the synchronized block. + byte[] segmentBuffer = null; + int segmentOffset = 0; + int segmentLength = 0; - for (Map.Entry entry : roots.entrySet()) { - int offset = entry.getKey().getOffset(); - buffer[pos++] = (byte) entry.getValue().ordinal(); - buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); - buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); - } + synchronized (this) { + if (length > 0) { + int refcount = segment.getRefCount(); + + int rootcount = roots.size(); + buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8); + buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount; + + int blobrefcount = blobrefs.size(); + buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8); + buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount; + + length = align( + refcount * 16 + rootcount * 3 + blobrefcount * 2 + length, + 16); + + checkState(length <= buffer.length); + + int pos = refcount * 16; + if (pos + length <= buffer.length) { + // the whole segment fits to the space *after* the referenced + // segment identifiers we've already written, so we can safely + // copy those bits ahead even if concurrent code is still + // reading from that part of the buffer + System.arraycopy(buffer, 0, buffer, buffer.length - length, pos); + pos += buffer.length - length; + } else { + // this might leave some empty space between the header and + // the record data, but this case only occurs when the + // segment is >252kB in size and the maximum overhead is <<4kB, + // which is acceptable + length = buffer.length; + } - for (RecordId blobref : blobrefs) { - int offset = blobref.getOffset(); - buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); - buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); - } + for (Map.Entry entry : roots.entrySet()) { + int offset = entry.getKey().getOffset(); + buffer[pos++] = (byte) entry.getValue().ordinal(); + buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); + buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); + } - SegmentId id = segment.getSegmentId(); - log.debug("Writing data segment {} ({} bytes)", id, length); - store.writeSegment(id, buffer, buffer.length - length, length); + for (RecordId blobref : blobrefs) { + int offset = blobref.getOffset(); + buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS)); + buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS); + } - // Keep this segment in memory as it's likely to be accessed soon - ByteBuffer data; - if (buffer.length - length > 4096) { - data = ByteBuffer.allocate(length); - data.put(buffer, buffer.length - length, length); - data.rewind(); - } else { - data = ByteBuffer.wrap(buffer, buffer.length - length, length); + segmentId = segment.getSegmentId(); + segmentBuffer = buffer; + segmentOffset = buffer.length - length; + segmentLength = length; + + // Keep this segment in memory as it's likely to be accessed soon + ByteBuffer data; + if (buffer.length - length > 4096) { + data = ByteBuffer.allocate(length); + data.put(buffer, buffer.length - length, length); + data.rewind(); + } else { + data = ByteBuffer.wrap(buffer, buffer.length - length, length); + } + tracker.setSegment(segmentId, new Segment(tracker, segmentId, data)); + + buffer = createNewBuffer(version); + roots.clear(); + blobrefs.clear(); + length = 0; + position = buffer.length; + segment = new Segment(tracker, buffer); + segment.getSegmentId().setSegment(segment); } - tracker.setSegment(id, new Segment(tracker, id, data)); - - buffer = createNewBuffer(version); - roots.clear(); - blobrefs.clear(); - length = 0; - position = buffer.length; - segment = new Segment(tracker, buffer); - segment.getSegmentId().setSegment(segment); + } + + if (segmentId != null) { + log.debug("Writing data segment {} ({} bytes)", segmentId, segmentLength); + store.writeSegment(segmentId, segmentBuffer, segmentOffset, segmentLength); } } @@ -451,8 +471,7 @@ public class SegmentWriter { return bucketId; } - private synchronized MapRecord writeMapBucket( - MapRecord base, Collection entries, int level) { + private MapRecord writeMapBucket(MapRecord base, Collection entries, int level) { // when no changed entries, return the base map (if any) as-is if (entries == null || entries.isEmpty()) { if (base != null) {