Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.12.2
-
None
-
None
Description
Original post on user mailing list: link
After resuming a Flink application from a snapshot with a new subscription, I got following errors repeatedly.
org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have passed a subscription that does not belong to the given ack ID (resource=projects/xxxxx/subscriptions/xxxx). at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244) ~[?:?] at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225) ~[?:?] at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142) ~[?:?] at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84) ~[?:?] at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208) ~[?:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Thread.run(Thread.java:834) ~[?:?]
As I see it, the AckId became invalid as long as we change to another subscription.
I also noticed an interesting thing. The process of doing a checkpoint/savepoint is as follow:
- output a checkpoint/savepoint which contains non-acknowledged message's ackIds
- If the checkpoint/savepoint success, do the ack (src)
- remove those ackIds from state (src)
If we resume a job from a snapshot, those acknowledged ackIds (removed in step 3) still exist in the savepoint (created in step 1), so it will do the ack again when the next checkpoint complete.
In my opinion, these ackIds stored in savepoint is the root cause to make we unable changing subscriptions.