diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java index aac011f8a1..fd785311b9 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java @@ -142,14 +142,15 @@ public class ChunkedBlobStream implements ChunkedInput { byte mask = createMask(data.length); Hasher hasher = Hashing.murmur3_32().newHasher(); - long hash = hasher.putByte(mask).putBytes(data).hash().padToLong(); + long hash = hasher.putByte(mask).putLong(length).putBytes(data).hash().padToLong(); byte[] blobIdBytes = blobId.getBytes(); ByteBuf out = allocator.buffer(); - out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length); + out.writeInt(1 + 1 + 8 + 4 + blobIdBytes.length + 8 + data.length); out.writeByte(Messages.HEADER_BLOB); out.writeByte(mask); + out.writeLong(length); out.writeInt(blobIdBytes.length); out.writeBytes(blobIdBytes); out.writeLong(hash); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java index 0ed45f2c4f..877fdabe19 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Files; import java.util.List; import java.util.UUID; @@ -123,7 +124,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { private static void decodeGetBlobResponse(int length, ByteBuf in, List out) throws IOException { byte mask = in.readByte(); - + long blobLength = in.readLong(); + int blobIdLength = in.readInt(); byte[] blobIdBytes = new byte[blobIdLength]; in.readBytes(blobIdBytes); @@ -134,7 +136,7 @@ public class ResponseDecoder extends ByteToMessageDecoder { if ((mask & (1 << 0)) != 0) { if (tempFile.exists()) { log.debug("Detected previous incomplete transfer for {}. Cleaning up...", blobId); - tempFile.delete(); + Files.delete(tempFile.toPath()); } } @@ -144,7 +146,7 @@ public class ResponseDecoder extends ByteToMessageDecoder { byte[] chunkData = new byte[in.readableBytes()]; in.readBytes(chunkData); - if (hash(mask, chunkData) != hash) { + if (hash(mask, blobLength, chunkData) != hash) { log.debug("Invalid checksum, discarding current chunk from {}", blobId); return; } else { @@ -158,8 +160,12 @@ public class ResponseDecoder extends ByteToMessageDecoder { if ((mask & (1 << 1)) != 0) { log.debug("Received entire blob {}", blobId); - FileInputStream fis = new DeleteOnCloseFileInputStream(tempFile); - out.add(new GetBlobResponse(null, blobId, fis, fis.getChannel().size())); + if (blobLength == tempFile.length()) { + FileInputStream fis = new DeleteOnCloseFileInputStream(tempFile); + out.add(new GetBlobResponse(null, blobId, fis, fis.getChannel().size())); + } else { + log.debug("Size mismatch for blob {}", blobId); + } } } @@ -194,8 +200,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong(); } - private static long hash(byte mask, byte[] data) { - return Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong(); + private static long hash(byte mask, long blobLength, byte[] data) { + return Hashing.murmur3_32().newHasher().putByte(mask).putLong(blobLength).putBytes(data).hash().padToLong(); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java index 417a2c0f86..763640f162 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java @@ -58,8 +58,8 @@ public class StandbyTestUtils { return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong(); } - public static long hash(byte mask, byte[] data) { - return Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong(); + public static long hash(byte mask, long blobLength, byte[] data) { + return Hashing.murmur3_32().newHasher().putByte(mask).putLong(blobLength).putBytes(data).hash().padToLong(); } public static byte createMask(int currentChunk, int totalChunks) { @@ -75,16 +75,17 @@ public class StandbyTestUtils { return mask; } - public static ByteBuf createBlobChunkBuffer(byte header,String blobId, byte[] data, byte mask) { + public static ByteBuf createBlobChunkBuffer(byte header, long blobLength, String blobId, byte[] data, byte mask) { byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8); ByteBuf buf = Unpooled.buffer(); - buf.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length); + buf.writeInt(1 + 1 + 8 + 4 + blobIdBytes.length + 8 + data.length); buf.writeByte(header); buf.writeByte(mask); + buf.writeLong(blobLength); buf.writeInt(blobIdBytes.length); buf.writeBytes(blobIdBytes); - buf.writeLong(hash(mask, data)); + buf.writeLong(hash(mask, blobLength, data)); buf.writeBytes(data); return buf; diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java index 1618b78b6b..ba38fcb34d 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java @@ -40,7 +40,7 @@ public class GetBlobResponseEncoderTest { EmbeddedChannel channel = new EmbeddedChannel(new ChunkedWriteHandler(), new GetBlobResponseEncoder(3)); channel.writeOutbound(new GetBlobResponse("clientId", blobId,new ByteArrayInputStream(blobData), blobData.length)); ByteBuf buffer = (ByteBuf) channel.readOutbound(); - ByteBuf expected = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, blobData, mask); + ByteBuf expected = createBlobChunkBuffer(Messages.HEADER_BLOB, 3L, blobId, blobData, mask); assertEquals(expected, buffer); } @@ -58,13 +58,13 @@ public class GetBlobResponseEncoderTest { ByteBuf firstBuffer = (ByteBuf) channel.readOutbound(); byte firstMask = createMask(1, 2); - ByteBuf firstExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, firstChunkData, firstMask); + ByteBuf firstExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, firstChunkData, firstMask); assertEquals(firstExpected, firstBuffer); ByteBuf secondBuffer = (ByteBuf) channel.readOutbound(); byte secondMask = createMask(2, 2); - ByteBuf secondExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, secondChunkbData, secondMask); + ByteBuf secondExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, secondChunkbData, secondMask); assertEquals(secondExpected, secondBuffer); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java index 28299f8184..524e8f9f74 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java @@ -58,7 +58,7 @@ public class ResponseDecoderTest { String blobId = "blobId"; byte mask = createMask(1, 1); - ByteBuf buf = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, blobData, mask); + ByteBuf buf = createBlobChunkBuffer(Messages.HEADER_BLOB, 3L, blobId, blobData, mask); EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder()); channel.writeInbound(buf); @@ -80,10 +80,10 @@ public class ResponseDecoderTest { String blobId = "blobId"; byte firstMask = createMask(1, 2); - ByteBuf firstBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, firstChunkData, firstMask); + ByteBuf firstBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, firstChunkData, firstMask); byte secondMask = createMask(2, 2); - ByteBuf secondBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, secondChunkbData, secondMask); + ByteBuf secondBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, 4L, blobId, secondChunkbData, secondMask); EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder()); channel.writeInbound(firstBuf); @@ -105,12 +105,13 @@ public class ResponseDecoderTest { byte mask = createMask(1, 1); ByteBuf buf = Unpooled.buffer(); - buf.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + blobData.length); + buf.writeInt(1 + 1 + 8 + 4 + blobIdBytes.length + 8 + blobData.length); buf.writeByte(Messages.HEADER_BLOB); buf.writeByte(mask); + buf.writeLong(3L); buf.writeInt(blobIdBytes.length); buf.writeBytes(blobIdBytes); - buf.writeLong(hash(mask, blobData) + 1); + buf.writeLong(hash(mask, 3L, blobData) + 1); buf.writeBytes(blobData); EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());