diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java new file mode 100644 index 0000000000..d76c067525 --- /dev/null +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.codec; + +import java.io.InputStream; +import java.io.PushbackInputStream; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ChunkedBlobStream implements ChunkedInput { + private static final Logger log = LoggerFactory.getLogger(ChunkedBlobStream.class); + + private final String clientId; + private final String blobId; + private final long length; + private final PushbackInputStream in; + private final int chunkSize; + + private long offset; + private boolean closed; + + /** + * @param clientId + * identifier for client requesting the blob + * @param blobId + * blob identifier + * @param length + * blob length + * @param in + * blob stream + * @param chunkSize + * the number of bytes to fetch on each + * {@link #readChunk(ChannelHandlerContext)} call + */ + public ChunkedBlobStream(String clientId, String blobId, long length, InputStream in, int chunkSize) { + this.clientId = clientId; + this.blobId = blobId; + this.length = length; + + if (in == null) { + throw new NullPointerException("in"); + } + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize: " + chunkSize + " (expected: a positive integer)"); + } + + if (in instanceof PushbackInputStream) { + this.in = (PushbackInputStream) in; + } else { + this.in = new PushbackInputStream(in); + } + + this.chunkSize = chunkSize; + } + + /** + * Returns the number of transferred bytes. + */ + public long transferredBytes() { + return offset; + } + + @Override + public boolean isEndOfInput() throws Exception { + if (closed) { + return true; + } + + int b = in.read(); + if (b < 0) { + return true; + } else { + in.unread(b); + return false; + } + } + + @Override + public void close() throws Exception { + closed = true; + in.close(); + } + + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + if (isEndOfInput()) { + return null; + } + + boolean release = true; + ByteBuf decorated = ctx.alloc().buffer(); + + try { + ByteBuf buffer = ctx.alloc().buffer(); + int written = buffer.writeBytes(in, chunkSize); + decorated = decorateRawBuffer(ctx.alloc(), buffer); + + offset += written; + log.info("Sending chunk {}/{} of size {} to client {}", roundDiv(offset, chunkSize), + roundDiv(length, chunkSize), written, clientId); + + release = false; + return decorated; + } finally { + if (release) { + decorated.release(); + } + } + } + + private ByteBuf decorateRawBuffer(ByteBufAllocator allocator, ByteBuf buffer) { + byte[] data = new byte[buffer.readableBytes()]; + buffer.readBytes(data); + buffer.release(); + + byte mask = createMask(data.length); + Hasher hasher = Hashing.murmur3_32().newHasher(); + long hash = hasher.putByte(mask).putBytes(data).hash().padToLong(); + + byte[] blobIdBytes = blobId.getBytes(); + + ByteBuf out = allocator.buffer(); + out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length); + out.writeByte(Messages.HEADER_BLOB); + out.writeByte(mask); + out.writeInt(blobIdBytes.length); + out.writeBytes(blobIdBytes); + out.writeLong(hash); + out.writeBytes(data); + + return out; + } + + private static int roundDiv(long x, int y) { + return (int) Math.ceil((double) x / (double) y); + } + + private byte createMask(int bytesRead) { + byte mask = 0; + if (offset == 0) { + mask = (byte) (mask | (1 << 0)); + } + + if (offset + bytesRead == length) { + mask = (byte) (mask | (1 << 1)); + } + + return mask; + } +} diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java index ed11a51dbb..32c0764271 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java @@ -17,14 +17,8 @@ package org.apache.jackrabbit.oak.segment.standby.codec; -import static java.lang.Math.min; - import java.io.InputStream; -import java.nio.charset.Charset; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -46,68 +40,14 @@ public class GetBlobResponseEncoder extends ChannelOutboundHandlerAdapter { GetBlobResponse response = (GetBlobResponse) msg; log.debug("Sending blob {} to client {}", response.getBlobId(), response.getClientId()); + String clientId = response.getClientId(); String blobId = response.getBlobId(); long length = response.getLength(); InputStream in = response.getInputStream(); - byte[] blobIdBytes = blobId.getBytes(Charset.forName("UTF-8")); - - try { - byte[] buffer = new byte[blobChunkSize]; - int l = 0; - int totalChunks = (int) (length / (long) blobChunkSize); - if (length % blobChunkSize != 0) { - totalChunks++; - } - int currentChunk = 0; - while (currentChunk < totalChunks) { - l = in.read(buffer, 0, (int) min(buffer.length, length)); - byte[] data = new byte[l]; - System.arraycopy(buffer, 0, data, 0, l); - - currentChunk++; - ByteBuf out = createChunk(ctx, blobIdBytes, data, currentChunk, totalChunks); - log.info("Sending chunk {}/{} of size {} to client {}", currentChunk, totalChunks, data.length, - response.getClientId()); - ctx.writeAndFlush(out); - } - } finally { - in.close(); - } + ctx.writeAndFlush(new ChunkedBlobStream(clientId, blobId, length, in, blobChunkSize), promise); } else { ctx.write(msg, promise); } } - - private ByteBuf createChunk(ChannelHandlerContext ctx, byte[] blobIdBytes, byte[] data, int currentChunk, - int totalChunks) { - byte mask = createMask(currentChunk, totalChunks); - - Hasher hasher = Hashing.murmur3_32().newHasher(); - long hash = hasher.putByte(mask).putBytes(data).hash().padToLong(); - - ByteBuf out = ctx.alloc().buffer(); - out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length); - out.writeByte(Messages.HEADER_BLOB); - out.writeByte(mask); - out.writeInt(blobIdBytes.length); - out.writeBytes(blobIdBytes); - out.writeLong(hash); - out.writeBytes(data); - - return out; - } - - private byte createMask(int currentChunk, int totalChunks) { - byte mask = 0; - if (currentChunk == 1) { - mask = (byte) (mask | (1 << 0)); - } - - if (currentChunk == totalChunks) { - mask = (byte) (mask | (1 << 1)); - } - - return mask; - } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java index 97670fbd06..f4dc0684ec 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java @@ -39,6 +39,7 @@ import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.CharsetUtil; import org.apache.jackrabbit.core.data.util.NamedThreadFactory; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -162,9 +163,16 @@ class StandbyServer implements AutoCloseable { p.addLast(new StateHandler(builder.stateConsumer)); p.addLast(new RequestObserverHandler(builder.observer)); - // Encoders + // Snappy Encoder p.addLast(new SnappyFramedEncoder()); + + // Use chunking transparently + + p.addLast(new ChunkedWriteHandler()); + + // Other Encoders + p.addLast(new GetHeadResponseEncoder()); p.addLast(new GetSegmentResponseEncoder()); p.addLast(new GetBlobResponseEncoder(builder.blobChunkSize)); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java index c66e26908b..1618b78b6b 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java @@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream; import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.stream.ChunkedWriteHandler; import org.junit.Test; public class GetBlobResponseEncoderTest { @@ -36,7 +37,7 @@ public class GetBlobResponseEncoderTest { String blobId = "blobId"; byte mask = createMask(1, 1); - EmbeddedChannel channel = new EmbeddedChannel(new GetBlobResponseEncoder(3)); + EmbeddedChannel channel = new EmbeddedChannel(new ChunkedWriteHandler(), new GetBlobResponseEncoder(3)); channel.writeOutbound(new GetBlobResponse("clientId", blobId,new ByteArrayInputStream(blobData), blobData.length)); ByteBuf buffer = (ByteBuf) channel.readOutbound(); ByteBuf expected = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, blobData, mask); @@ -52,7 +53,7 @@ public class GetBlobResponseEncoderTest { String blobId = "blobId"; - EmbeddedChannel channel = new EmbeddedChannel(new GetBlobResponseEncoder(2)); + EmbeddedChannel channel = new EmbeddedChannel(new ChunkedWriteHandler(), new GetBlobResponseEncoder(2)); channel.writeOutbound(new GetBlobResponse("clientId", blobId,new ByteArrayInputStream(blobData), blobData.length)); ByteBuf firstBuffer = (ByteBuf) channel.readOutbound();