Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29627

Sink - Duplicate key exception during recover more than 1 committable.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.16.0, 1.17.0, 1.15.2
    • 1.17.0, 1.15.3, 1.16.1
    • Connectors / Common
    • None

    Description

      Recovery more than one Committable causes `IllegalStateException` and prevents cluster to start.

      When we recover the `CheckpointCommittableManager` we deserialize SubtaskCommittableManager instances from recovery state and we put them into `Map<Integer, SubtaskCommittableManager<CommT>>`. The key of this map is subtaskId of the recovered manager. However this will fail if we have to recover more than one committable.

      What w should do is to call `SubtaskCommittableManager::merge` if we already deserialize manager for this subtaskId.

      Stack Trace:

      28603 [flink-akka.actor.default-dispatcher-8] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Global Committer (1/1) (485dc57aca56235b9d1ab803c8c966ad_47d89856a1cf553f16e7063d953b7d42_0_1) switched from INITIALIZING to FAILED on 2ed5c848-d360-48ae-9a92-730b022c8a39 @ kubernetes.docker.internal (dataPort=-1).
      java.lang.IllegalStateException: Duplicate key 0 (attempted merging values org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager@631940ac and org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager@7ff3bd7)
      	at java.util.stream.Collectors.duplicateKeyException(Collectors.java:133) ~[?:?]
      	at java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180) ~[?:?]
      	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:?]
      	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
      	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
      	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer$CheckpointSimpleVersionedSerializer.deserialize(CommittableCollectorSerializer.java:153) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer$CheckpointSimpleVersionedSerializer.deserialize(CommittableCollectorSerializer.java:124) ~[classes/:?]
      	at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeserializeList(SimpleVersionedSerialization.java:148) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserializeV2(CommittableCollectorSerializer.java:105) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserialize(CommittableCollectorSerializer.java:82) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer.deserialize(CommittableCollectorSerializer.java:41) ~[classes/:?]
      	at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:121) ~[classes/:?]
      	at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserializeV2(GlobalCommitterSerializer.java:128) ~[classes/:?]
      	at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserialize(GlobalCommitterSerializer.java:99) ~[classes/:?]
      	at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer.deserialize(GlobalCommitterSerializer.java:42) ~[classes/:?]
      	at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) ~[classes/:?]
      	at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) ~[classes/:?]
      	at java.lang.Iterable.forEach(Iterable.java:74) ~[?:?]
      	at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.initializeState(GlobalCommitterOperator.java:133) ~[classes/:?]
      	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[classes/:?]
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:727) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:703) ~[classes/:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:670) ~[classes/:?]
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[classes/:?]
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[classes/:?]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[classes/:?]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[classes/:?]
      	at java.lang.Thread.run(Thread.java:834) ~[?:?]
      

      Attachments

        Issue Links

          Activity

            People

              KristoffSC Krzysztof Chmielewski
              KristoffSC Krzysztof Chmielewski
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: