Uploaded image for project: 'Apache Cassandra'
  1. Apache Cassandra
  2. CASSANDRA-16808

Pre-4.0 FWD_FRM message parameter serialization and message-id forwarding is incorrect

    XMLWordPrintableJSON

Details

    Description

      Fixing CASSANDRA-16797 has exposed an issue with the way FWD_FRM is serialized.

      In the code cleanup during the internode messaging refactor, the serialization for FWD_FRM (the endpoint to respond to for forwarded messages) was implemented using the same serialization format as CompactEndpointSerializationHelper which prefixes the address bytes with their length, however the FWD_FRM parameter value does not include a length and just converts the parameter value to an InetAddress.

      In a mixed version cluster this causes the pre-4.0 nodes to fail when deserializing the mutation

      java.lang.RuntimeException: java.net.UnknownHostException: addr is of illegal length
              at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72) ~[dtest-3.0.25.jar:na]
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
              at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162) ~[dtest-3.0.25.jar:na]
              at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:134) ~[dtest-3.0.25.jar:na]
              at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:109) ~[dtest-3.0.25.jar:na]
              at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
      Caused by: java.net.UnknownHostException: addr is of illegal length
              at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1208) ~[na:na]
              at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1571) ~[na:na]
              at org.apache.cassandra.db.MutationVerbHandler.doVerb(MutationVerbHandler.java:57) ~[dtest-3.0.25.jar:na]
              at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:67) ~[dtest-3.0.25.jar:na]
              ... 5 common frames omitted
      

      Unfortunately there isn't a clean fix I can see as org.apache.cassandra.io.IVersionedAsymmetricSerializer#deserialize used to deserialize the FWD_FRM address does not take a maximum length to deserialize and it's impossible to tell definitely know if it's an IPv4 or IPv6 address from the first four bytes.

      The patch I'm submitting special-cases the deserializing pre-4.0 FWD_FRM parameters in the Message deserializer. That seems preferable to extending the deserialization interface or creating a new DataInputBuffer limited by the parameter value length.

      Once that was fixed, the INSERT statements were still failing which I tracked down to the 4.0 optimization of serializing the forwarded message once if the message id is the same
      https://github.com/apache/cassandra/blob/cassandra-4.0/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L76

      In the test case I wrote, only one message was being forwarded and that had a different id to the original forwarded message. The useSameMessageID method only checked message Ids within the forwarded messages.

       

      Code Details:

      When MutationVerbHandler.forwardToLocalNodes is constructing the forwarding message it just stores the the byte array representing the IPv4 or IPv6 address in the parameter array.

      (link https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L90 )

          private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
          {
              try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
              {
                  int size = in.readInt();
      
                  // tell the recipients who to send their ack to
                  MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
      

      When the message is serialized in 3.0 MessageOut.serialize, that raw entry of bytes is written with the length

      (link https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/net/MessageOut.java#L119 )

          public void serialize(DataOutputPlus out, int version) throws IOException
          {
              CompactEndpointSerializationHelper.serialize(from, out);
      
              out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).getId());
              out.writeInt(parameters.size());
              for (Map.Entry<String, byte[]> entry : parameters.entrySet())
              {
                  out.writeUTF(entry.getKey());
                  out.writeInt(entry.getValue().length);
                  out.write(entry.getValue());
              }
              ....
          }
      

      And we do the same on 4.0, however in 4.0 the parameter is serialized using the ParamType enum

      (link https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/Message.java#L1154 )

                  for (int i = 0; i < count; i++)
                  {
                      ParamType type = version >= VERSION_40
                          ? ParamType.lookUpById(Ints.checkedCast(in.readUnsignedVInt()))
                          : ParamType.lookUpByAlias(in.readUTF());
      
                      int length = version >= VERSION_40
                          ? Ints.checkedCast(in.readUnsignedVInt())
                          : in.readInt();
      
                      if (null != type)
                          params.put(type, type.serializer.deserialize(in, version));
                      else
                          in.skipBytesFully(length); // forward compatibiliy with minor version changes
                  }
      

      (link https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/ParamType.java#L45 )

      public enum ParamType
      {
          FORWARD_TO          (0, "FWD_TO",        ForwardingInfo.serializer),
          RESPOND_TO          (1, "FWD_FRM",       inetAddressAndPortSerializer),
          ...
      }
      

      The InetAddressAndPortSerializer has been based on the 3.0 CompactEndpointSerializationHelper encoding used in the message header,
      however that format includes a single byte with the length of the address when pre-4.0 nodes are just expecting the parameter value
      to contain the raw address bytes.

      (link https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/locator/InetAddressAndPort.java#L308 )

                  if (version >= MessagingService.VERSION_40)
                  {
                      out.writeByte(buf.length + 2);
                      out.write(buf);
                      out.writeShort(endpoint.port);
                  }
                  else
                  {
                      out.writeByte(buf.length); //// Surprise!  Bonus byte!
                      out.write(buf);
                  }
      

      Attachments

        Issue Links

          Activity

            People

              jmeredithco Jon Meredith
              jmeredithco Jon Meredith
              Jon Meredith
              Benedict Elliott Smith, Brandon Williams, Caleb Rackliffe
              Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 4h 50m
                  4h 50m