Details
-
Bug
-
Status: Resolved
-
Normal
-
Resolution: Fixed
-
4.0, 4.0.0, 4.1-alpha1, 4.1
-
None
-
Degradation - Other Exception
-
Normal
-
Normal
-
DTest
-
All
-
None
-
Description
Fixing CASSANDRA-16797 has exposed an issue with the way FWD_FRM is serialized.
In the code cleanup during the internode messaging refactor, the serialization for FWD_FRM (the endpoint to respond to for forwarded messages) was implemented using the same serialization format as CompactEndpointSerializationHelper which prefixes the address bytes with their length, however the FWD_FRM parameter value does not include a length and just converts the parameter value to an InetAddress.
In a mixed version cluster this causes the pre-4.0 nodes to fail when deserializing the mutation
java.lang.RuntimeException: java.net.UnknownHostException: addr is of illegal length at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72) ~[dtest-3.0.25.jar:na] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na] at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162) ~[dtest-3.0.25.jar:na] at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:134) ~[dtest-3.0.25.jar:na] at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:109) ~[dtest-3.0.25.jar:na] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] Caused by: java.net.UnknownHostException: addr is of illegal length at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1208) ~[na:na] at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1571) ~[na:na] at org.apache.cassandra.db.MutationVerbHandler.doVerb(MutationVerbHandler.java:57) ~[dtest-3.0.25.jar:na] at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:67) ~[dtest-3.0.25.jar:na] ... 5 common frames omitted
Unfortunately there isn't a clean fix I can see as org.apache.cassandra.io.IVersionedAsymmetricSerializer#deserialize used to deserialize the FWD_FRM address does not take a maximum length to deserialize and it's impossible to tell definitely know if it's an IPv4 or IPv6 address from the first four bytes.
The patch I'm submitting special-cases the deserializing pre-4.0 FWD_FRM parameters in the Message deserializer. That seems preferable to extending the deserialization interface or creating a new DataInputBuffer limited by the parameter value length.
Once that was fixed, the INSERT statements were still failing which I tracked down to the 4.0 optimization of serializing the forwarded message once if the message id is the same
https://github.com/apache/cassandra/blob/cassandra-4.0/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L76
In the test case I wrote, only one message was being forwarded and that had a different id to the original forwarded message. The useSameMessageID method only checked message Ids within the forwarded messages.
Code Details:
When MutationVerbHandler.forwardToLocalNodes is constructing the forwarding message it just stores the the byte array representing the IPv4 or IPv6 address in the parameter array.
private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException { try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) { int size = in.readInt(); // tell the recipients who to send their ack to MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
When the message is serialized in 3.0 MessageOut.serialize, that raw entry of bytes is written with the length
public void serialize(DataOutputPlus out, int version) throws IOException { CompactEndpointSerializationHelper.serialize(from, out); out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).getId()); out.writeInt(parameters.size()); for (Map.Entry<String, byte[]> entry : parameters.entrySet()) { out.writeUTF(entry.getKey()); out.writeInt(entry.getValue().length); out.write(entry.getValue()); } .... }
And we do the same on 4.0, however in 4.0 the parameter is serialized using the ParamType enum
for (int i = 0; i < count; i++) { ParamType type = version >= VERSION_40 ? ParamType.lookUpById(Ints.checkedCast(in.readUnsignedVInt())) : ParamType.lookUpByAlias(in.readUTF()); int length = version >= VERSION_40 ? Ints.checkedCast(in.readUnsignedVInt()) : in.readInt(); if (null != type) params.put(type, type.serializer.deserialize(in, version)); else in.skipBytesFully(length); // forward compatibiliy with minor version changes }
public enum ParamType { FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer), RESPOND_TO (1, "FWD_FRM", inetAddressAndPortSerializer), ... }
The InetAddressAndPortSerializer has been based on the 3.0 CompactEndpointSerializationHelper encoding used in the message header,
however that format includes a single byte with the length of the address when pre-4.0 nodes are just expecting the parameter value
to contain the raw address bytes.
if (version >= MessagingService.VERSION_40) { out.writeByte(buf.length + 2); out.write(buf); out.writeShort(endpoint.port); } else { out.writeByte(buf.length); //// Surprise! Bonus byte! out.write(buf); }
Attachments
Issue Links
- supercedes
-
CASSANDRA-16799 Add an upgrade test with multi-dc writes
- Resolved