Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13527

Instable KafkaProducerExactlyOnceITCase due to CheckpointFailureManager

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Cannot Reproduce
    • 1.9.0, 1.12.0
    • None
    • Tests

    Description

      banmoy and I met this instable test below:

      https://api.travis-ci.org/v3/job/565270958/log.txt
      https://api.travis-ci.com/v3/job/221237628/log.txt

      The root cause is task Source: Custom Source -> Map -> Sink: Unnamed (1/1) failed due to expected artificial test failure and then free task resource including closing the registry. However, the async checkpoint thread in SourceStreamTask would then failed and send decline checkpoint message to JM.
      The key logs is like:

      03:36:46,639 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Sink: Unnamed (1/1) (f45ff068d2c80da22c2a958739ec0c87) switched from RUNNING to FAILED.
      java.lang.Exception: Artificial Test Failure
      	at org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79)
      	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
      	at org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172)
      03:36:46,637 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Decline checkpoint 12 by task f45ff068d2c80da22c2a958739ec0c87 of job d5b629623731c66f1bac89dec3e87b89 at 03cbfd77-0727-4366-83c4-9aa4923fc817 @ localhost (dataPort=-1).
      03:36:46,640 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 12 of job d5b629623731c66f1bac89dec3e87b89.
      org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 12 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1). Failure reason: Checkpoint was declined.
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1248)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1182)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:853)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:758)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:667)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:147)
      	at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1138)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument.
      	at org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:85)
      	at org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:122)
      	at org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:110)
      	at org.apache.flink.runtime.state.AsyncSnapshotCallable.toAsyncSnapshotFutureTask(AsyncSnapshotCallable.java:104)
      	at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:127)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:401)
      	... 12 more
      03:36:46,642 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state RUNNING to FAILING.
      java.lang.Exception: Artificial Test Failure
      	at org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79)
      	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
      	at org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172)
      03:36:46,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Try to restart or fail the job Exactly once test (d5b629623731c66f1bac89dec3e87b89) if no longer possible.
      03:36:46,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state FAILING to RESTARTING.
      03:36:46,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting the job Exactly once test (d5b629623731c66f1bac89dec3e87b89).
      03:36:46,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Discarding the results produced by task execution f45ff068d2c80da22c2a958739ec0c87.
      03:36:46,644 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state RESTARTING to FAILING.
      org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
      	at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$null$1(ExecutionGraph.java:586)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      The failure of Source: Custom Source -> Map -> Sink: Unnamed would fail the job for the 1st time. However, due to receive declined checkpoint CheckpointFailureManager would also fail the job again for the 2nd time. Unfortunately, some tests within KafkaProducerExactlyOnceITCase only allow one restart attempt by FixedDelayRestartStrategy, that's why the IT case failed at last.

      Attachments

        Issue Links

          Activity

            People

              roman Roman Khachatryan
              yunta Yun Tang
              Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: