Description
When a large message is being diverted, a new copy of the original message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy message body. It is possible that one block of date is read into the byte array before the previous read has been replicated, causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption of data will be avoided.
Attachments
Issue Links
- links to
Activity
Github user jdanekrh commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
This commit fixes integration-test ReplicationWithDivertTest#testSendLargeMessage for me. Previously, the test used to fail with `java.lang.NullPointerException` on `message.getBytes("test" + i).length` at org.apache.activemq.artemis.tests.integration.divert.ReplicationWithDivertTest.testSendLargeMessage(ReplicationWithDivertTest.java:237).
Github user gaohoward commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@jdanekrh Glad to hear.
Github user clebertsuconic commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@gaohoward nice catch!
Can we do it differntly... change LargeServerMessageImpl to not reuse the same buffer instead?
this will introduce an extra copy for regular cases.. I would rather keep the hot path unchnaged.. and only cause the extra copy on the actualy copy.
That is...
```java
for (; {
byte[] bufferBytes = new byte[100 * 1024];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = file.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0)
newMessage.addBytes(bufferToWrite);
if (bytesRead < bufferBytes.length) { break; }
}
```
Github user gaohoward commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@clebertsuconic do you mean change the cited code piece to this:
`
for (; {
byte[] bufferBytes = new byte[100 * 1024];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = file.read(buffer);
byte[] bufferToWrite;
if (bytesRead <= 0)
newMessage.addBytes(bufferToWrite);
if (bytesRead < bufferBytes.length) { break; }
}
`
I can see this is less efficient because if there is no replication (standalone broker without a backup) the copy is not necessary. The good about this is that it is simple and with largemessages the cost of copying seems not important (largemessages are slower than normal messages anyway).
If the above understanding is right, I've no objection to making the changes.
Github user gaohoward commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@clebertsuconic
or I can make a new byte[] for every read. that's probably simpler.
Github user clebertsuconic commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@gaohoward my idea was to make the change on copy instead on the storage manager.
Github user gaohoward commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
@clebertsuconic done.
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1351#discussion_r124018115
— Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java —
@@ -282,7 +282,7 @@ public Message copy(final long newID) {
byte[] bufferToWrite;
if (bytesRead <= 0)
else if (bytesRead == bufferBytes.length)
{ + } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
— End diff –
hmmm.. nice one!
Github user gaohoward commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1351#discussion_r124035898
— Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java —
@@ -282,7 +282,7 @@ public Message copy(final long newID) {
byte[] bufferToWrite;
if (bytesRead <= 0)
else if (bytesRead == bufferBytes.length)
{ + } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
— End diff –
thanks.
Github user clebertsuconic commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
I'm adding the following comment here:
```java
// ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
// otherwise there could be another thread still using the buffer on a
// replication.
```
I am ammending your commit instead of making a separate one. .for the sake of simplicity.
Commit 045021f7df583f6109cb0749dc1601c9a85dbe75 in activemq-artemis's branch refs/heads/master from gaohoward
[ https://git-wip-us.apache.org/repos/asf?p=activemq-artemis.git;h=045021f ]
ARTEMIS-1220 Diverted LargeMessage file corrupted during replication
When a large message is being diverted, a new copy of the original
message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy
message body. It is possible that one block of date is read into
the byte array before the previous read has been replicated,
causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption
of data will be avoided.
Github user gaohoward commented on the issue:
https://github.com/apache/activemq-artemis/pull/1351
good point. Thanks!
On Tue, Jun 27, 2017 at 2:33 AM, asfgit <notifications@github.com> wrote:
> Closed #1351 <https://github.com/apache/activemq-artemis/pull/1351> via
> 1888e2c
> <https://github.com/apache/activemq-artemis/commit/1888e2ca4f30be5f02c886b36a3c431c04c261a9>
> .
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/activemq-artemis/pull/1351#event-1139201726>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/AAp476FuUIdRe4S4qDi7TQr950ipbdVbks5sH_l-gaJpZM4N_Z9d>
> .
>
Commit 144a7c8a42654d1723df41d39acbd8b223632a22 in activemq-artemis's branch refs/heads/1.x from gaohoward
[ https://git-wip-us.apache.org/repos/asf?p=activemq-artemis.git;h=144a7c8 ]
ARTEMIS-1220 Diverted LargeMessage file corrupted during replication
When a large message is being diverted, a new copy of the original
message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy
message body. It is possible that one block of date is read into
the byte array before the previous read has been replicated,
causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption
of data will be avoided.
(cherry picked from commit 045021f7df583f6109cb0749dc1601c9a85dbe75)
GitHub user gaohoward opened a pull request:
https://github.com/apache/activemq-artemis/pull/1351
ARTEMIS-1220Diverted LargeMessage file corrupted during replicationWhen a large message is being diverted, a new copy of the original
message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy
message body. It is possible that one block of date is read into
the byte array before the previous read has been replicated,
causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption
of data will be avoided.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/gaohoward/activemq-artemis master_1220
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/activemq-artemis/pull/1351.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1351
commit d4c75a8a7b0b92328fff9d56951d187f6fdfcc3d
Author: Howard Gao <howard.gao@gmail.com>
Date: 2017-06-20T10:38:26Z
ARTEMIS-1220Diverted LargeMessage file corrupted during replicationWhen a large message is being diverted, a new copy of the original
message is created and replicated (if there is a backup) to the backup.
In LargeServerMessageImpl.copy(long) it reuse a byte array to copy
message body. It is possible that one block of date is read into
the byte array before the previous read has been replicated,
causing the replicated bytes to corrupt.
If we make a copy of the byte array before replication, the corruption
of data will be avoided.