Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24587

Let PubSub source support changing subscriptions

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.12.2
    • None
    • None

    Description

      Original post on user mailing list: link

      After resuming a Flink application from a snapshot with a new subscription, I got following errors repeatedly.

       

      org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
      passed a subscription that does not belong to the given ack ID
      (resource=projects/xxxxx/subscriptions/xxxx).
              at
      io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
      ~[?:?]
              at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
      ~[?:?]
              at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
      ~[?:?]
              at
      com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
      ~[?:?]
              at
      org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
      ~[?:?]
              at
      org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
      ~[?:?]
              at
      org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
      ~[?:?]
              at
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
      ~[flink-dist_2.12-1.12.2.jar:1.12.2]
              at java.lang.Thread.run(Thread.java:834) ~[?:?]
      

       

      As I see it, the AckId became invalid as long as we change to another subscription.

      I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:

      1. output a checkpoint/savepoint which contains non-acknowledged message's ackIds
      2. If the checkpoint/savepoint success, do the ack (src)
      3. remove those ackIds from state (src)

      If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.

      In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.

      Attachments

        Activity

          People

            Unassigned Unassigned
            sayuan Shiao-An Yuan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: