Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20244

RMQSource does not ACK duplicated messages

    XMLWordPrintableJSON

Details

    Description

      Background

      The following has been observed when using RMQSource with exactly-once-guarantees.

      When a job is restarted from a checkpoint, and there were unacked messages referenced by that checkpoint at time of the restart, those messages will be requeued by RMQ and resent to the restarted job. But they will not be acked. Later when the connection to RMQ is closed (the job either finishes or is restarted) they will be requeued again.

      When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete).

      Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled, that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ.

      I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK.

      The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add():
      .
      .
      .
      if (!addId(correlationId))

      { // we have already processed this message return false; }

      }
      sessionIds.add(deliveryTag);
      .
      .
      .

      Directly related to the above I also noticed that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2):

      2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during disposal of stream operator.
      com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
      at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

      AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details).

      I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2):

      2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected connection driver error occured
      java.lang.NoClassDefFoundError: com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
      at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
      at java.lang.Thread.run(Unknown Source) [?:?]

      These problems are most probably present in more versions than 1.11.2 and 1.12.0.

      Proposed fix (not verified)

      The fix to ACK all messages should be simple. Just move sessionIds.add(deliveryTag) up a few rows before the test that checks if correlation ID:s are used or not in RMQSource::setMessageIdentifier().

      The fix for close() and the stale RMQ connections would be to ignore and log any exceptions thrown by channel.basicCancel(), channel.close() and connection.close(). So that failing to call channel.basicCancel() does not inhibit channel.close() or connection.close() from being called. Or maybe just call connection.close() which should close the channel as well.

      Another minor thing that could be improved, while working on this, is to leverage the support to ack multiple messages at once, by doing channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a message and deliveryTags are monotonically increasing that should be doable, or?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              thomas.eckestad@gmail.com Thomas Eckestad
              Votes:
              2 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: