Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Cannot Reproduce
-
1.9.0, 1.12.0
-
None
Description
banmoy and I met this instable test below:
https://api.travis-ci.org/v3/job/565270958/log.txt
https://api.travis-ci.com/v3/job/221237628/log.txt
The root cause is task Source: Custom Source -> Map -> Sink: Unnamed (1/1) failed due to expected artificial test failure and then free task resource including closing the registry. However, the async checkpoint thread in SourceStreamTask would then failed and send decline checkpoint message to JM.
The key logs is like:
03:36:46,639 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Unnamed (1/1) (f45ff068d2c80da22c2a958739ec0c87) switched from RUNNING to FAILED. java.lang.Exception: Artificial Test Failure at org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172) 03:36:46,637 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 12 by task f45ff068d2c80da22c2a958739ec0c87 of job d5b629623731c66f1bac89dec3e87b89 at 03cbfd77-0727-4366-83c4-9aa4923fc817 @ localhost (dataPort=-1). 03:36:46,640 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 12 of job d5b629623731c66f1bac89dec3e87b89. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 12 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1). Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1248) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1182) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:853) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:758) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:667) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:147) at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1138) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument. at org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:85) at org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:122) at org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:110) at org.apache.flink.runtime.state.AsyncSnapshotCallable.toAsyncSnapshotFutureTask(AsyncSnapshotCallable.java:104) at org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:127) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:401) ... 12 more 03:36:46,642 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state RUNNING to FAILING. java.lang.Exception: Artificial Test Failure at org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172) 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Exactly once test (d5b629623731c66f1bac89dec3e87b89) if no longer possible. 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state FAILING to RESTARTING. 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job Exactly once test (d5b629623731c66f1bac89dec3e87b89). 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution f45ff068d2c80da22c2a958739ec0c87. 03:36:46,644 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from state RESTARTING to FAILING. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$null$1(ExecutionGraph.java:586) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
The failure of Source: Custom Source -> Map -> Sink: Unnamed would fail the job for the 1st time. However, due to receive declined checkpoint CheckpointFailureManager would also fail the job again for the 2nd time. Unfortunately, some tests within KafkaProducerExactlyOnceITCase only allow one restart attempt by FixedDelayRestartStrategy, that's why the IT case failed at last.
Attachments
Issue Links
- duplicates
-
FLINK-13593 Prevent failing the wrong execution attempt in CheckpointFailureManager
- Resolved
- is caused by
-
FLINK-13593 Prevent failing the wrong execution attempt in CheckpointFailureManager
- Resolved
- is related to
-
FLINK-13497 Checkpoints can complete after CheckpointFailureManager fails job
- Closed