Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17170

Cannot stop streaming job with savepoint which uses kinesis consumer

Details

    Description

      I am encountering a very strange situation where I can't stop with savepoint a streaming job.

      The job reads from kinesis and sinks to S3, very simple job, no mapping function, no watermarks, just source->sink. 

      Source is using flink-kinesis-consumer, sink is using StreamingFileSink. 

      Everything works fine, except stopping the job with savepoints.

      The behaviour happens only when multiple task managers are involved, having sub-tasks off the job spread across multiple task manager instances. When a single task manager has all the sub-tasks this issue never occurred.

      Using latest Flink 1.10.0 version, deployment done in HA mode (2 job managers), in EC2, savepoints and checkpoints written on S3.

      When trying to stop, the savepoint is created correctly and appears on S3, but not all sub-tasks are stopped. Some of them finished, but some just remain hanged. Sometimes, on the same task manager part of the sub-tasks are finished, part aren't.

      The logs don't show any errors. For the ones that succeed, the standard messages appear, with "Source: <....> switched from RUNNING to FINISHED".

      For the sub-tasks hanged the last message is "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Shutting down the shard consumer threads of subtask 0 ..." and that's it.

       

      I tried using the cli (flink stop <job_id>)

      Timeout Message:

      root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop cf43cecd9339e8f02a12333e52966a25
      root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop cf43cecd9339e8f02a12333e52966a25Suspending job "cf43cecd9339e8f02a12333e52966a25" with a savepoint. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) ... 9 more

       

      Using the monitoring api, I keep getting infinite message when querying based on the savepoint id, that the status id is still "IN_PROGRESS".

       

      When performing a cancel instead of stop, it works. But cancel is deprecated, so I am a bit concerned that this might fail also, maybe I was just lucky.

       

      I attached a screenshot with what the UI is showing when this happens

       

      Attachments

        1. threaddump_tm1
          89 kB
          Vasii Cosmin Radu
        2. Screenshot 2020-04-15 at 18.16.26.png
          35 kB
          Vasii Cosmin Radu

        Issue Links

          Activity

            yunta Yun Tang added a comment -

            cvasii Has the savepoint completed via the web UI? Did the sub-task checkpoint on those hanged task finished? (You could get the information via the web UI of checkpoint details). And the quickest solution to detect the root cause is using jstack to capture what the task thread is doing when it is hanged.

            yunta Yun Tang added a comment - cvasii Has the savepoint completed via the web UI? Did the sub-task checkpoint on those hanged task finished? (You could get the information via the web UI of checkpoint details). And the quickest solution to detect the root cause is using jstack to capture what the task thread is doing when it is hanged.

            I was doing that. And the shard consumer threads are blocked. Looks like it's the checkpointLock they are all waiting on.

             

            Here is a sample of one stack:

            shardConsumers-Source: source<my_name> -> Sink: sink-s3a://<my_path> (12/12)-thread-0priority:5 - threadId:0x00007f565c854000 - nativeId:0x49e0 - nativeId (decimal):18912 - state:BLOCKED
            stackTrace:
            java.lang.Thread.State: BLOCKED (on object monitor)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774)
            - waiting to lock <0x00000006a4d18620> (a java.lang.Object)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760)
            at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
            at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
            
            cvasii Vasii Cosmin Radu added a comment - I was doing that. And the shard consumer threads are blocked. Looks like it's the checkpointLock they are all waiting on.   Here is a sample of one stack: shardConsumers-Source: source<my_name> -> Sink: sink-s3a: //<my_path> (12/12)-thread-0priority:5 - threadId:0x00007f565c854000 - nativeId:0x49e0 - nativeId (decimal):18912 - state:BLOCKED stackTrace: java.lang. Thread .State: BLOCKED (on object monitor) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774) - waiting to lock <0x00000006a4d18620> (a java.lang. Object ) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang. Thread .run( Thread .java:748)

            And regarding the savepoint, yes it finished successfully on all sub-tasks.

            cvasii Vasii Cosmin Radu added a comment - And regarding the savepoint, yes it finished successfully on all sub-tasks.

            It's quite hard to debug, because I have multiple threads with the same name blocked on the same lock. A sub-tasks consumes multiple shards, so a fetcher per subtask is created, but the method KinesisDataFetcher#createShardConsumersThreadPool just allocates the same identifier for the threads (any ThreadFactory implementation is better there by the way, a thing to improve).

             

            There is also one blocked thread which is single, not competing with others for any lock. 

            cvasii Vasii Cosmin Radu added a comment - It's quite hard to debug, because I have multiple threads with the same name blocked on the same lock. A sub-tasks consumes multiple shards, so a fetcher per subtask is created, but the method KinesisDataFetcher#createShardConsumersThreadPool just allocates the same identifier for the threads (any ThreadFactory implementation is better there by the way, a thing to improve).   There is also one blocked thread which is single, not competing with others for any lock. 
            yunta Yun Tang added a comment -

            If the savepoint could finally complete, I doubt the task stuck when notified checkpoint complete to finish. Could you share the information of source stream task or the whole jstack info of the hanged JVM? BTW, debug level info logs could also help to know more information.

            yunta Yun Tang added a comment - If the savepoint could finally complete, I doubt the task stuck when notified checkpoint complete to finish . Could you share the information of source stream task or the whole jstack info of the hanged JVM? BTW, debug level info logs could also help to know more information.
            klion26 Congxian Qiu added a comment -

            Hi, cvasii from the description, seems the savepoint successfully, and "unfinished" task was blocked by something.

            Currently, the lifetime of task logic is "trigger savepoint" -> "savepoint complete" -> "savepoint complete" -> "finish task"

            From the previous comments you given, seems the stack was waiting for some lock, could you please check what is it waiting for?

            or could you please share the whole jstack message about the "unfinished" task.

            klion26 Congxian Qiu added a comment - Hi, cvasii  from the description, seems the savepoint successfully, and "unfinished" task was blocked by something. Currently, the lifetime of task logic is "trigger savepoint" -> "savepoint complete" -> "savepoint complete" -> "finish task" From the previous comments you given, seems the stack was waiting for some lock, could you please check what is it waiting for? or could you please share the whole jstack message about the "unfinished" task.

            Thanks guys for looking into this, klion26 yunta I've attached a full thread dump from one of the task managers, file called "threaddump_tm1".

             

            I've seen that shard consumers are waiting for the checkpoint lock, but the checkpoint lock it's already taken by the thread which performs the stop.

             

            Some relevant parts of the thread dump

             

             

            shardConsumers-Source: source -> Sink: sink (12/12)-thread-0priority:5 - threadId:0x00007f7c7c298000 - nativeId:0x774c - nativeId (decimal):30540 - state:BLOCKED
            stackTrace:
            java.lang.Thread.State: BLOCKED (on object monitor)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774)
            - waiting to lock <0x00000007abe026c8> (a java.lang.Object)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760)
            at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
            at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
            

             

             

            but the lock 0x00000007abe026c8 it's already taken here

            Source: source -> Sink: sink (12/12)priority:5 - threadId:0x00007f7c2c0cf000 - nativeId:0x76fe - nativeId (decimal):30462 - state:TIMED_WAITING
            stackTrace:
            java.lang.Thread.State: TIMED_WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000007ac041158> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
            at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
            at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:637)
            at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:365)
            at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:147)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:947)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
            at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$838/1354138988.run(Unknown Source)
            at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125)
            at org.apache.flink.util.function.FunctionUtils$$Lambda$839/1349808364.call(Unknown Source)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
            - locked <0x00000007abe026c8> (a java.lang.Object)
            at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
            at java.lang.Thread.run(Thread.java:748)
            

             

            So, FlinkKinesisConsumer#close() method is called, which delegates to FlinkKinesisConsumer#cancel(), then KinesisDataFetcher#shutdownFetcher() is called and as it seems it executes successfully, cause I don't  have any errrors in the logs. If there would have been an exception, the message from FlinkKinesisConsumer line 367 "Error while closing Kinesis data fetcher" would have been in my logs. Then fetcher.awaitTermination() is called, and in KinesisDataFetcher#awaitTermination it waits, for 1 minute. But there isn't any Thread.setDefaultUncaughtExceptionHandler set, so if there's an exception in the shardConsumersExecutor, I can't really see it. Side question: can I set the UncaughtExceptionHandler somehow?

             

            For me it is quite obvious that the shardConsumersExecutor is not shutdown and based on my JDK knowledge, calling shutdownNow on an executor service does not guarantee that it will actually terminate. And since multiple threads are competing for the checkpoint lock, I guess the threads from the shardConsumersExecutor must be interrupted. How come it works when doing a cancel? Is there a forced interrupt being done?

             

            cvasii Vasii Cosmin Radu added a comment - Thanks guys for looking into this, klion26 yunta  I've attached a full thread dump from one of the task managers, file called "threaddump_tm1".   I've seen that shard consumers are waiting for the checkpoint lock, but the checkpoint lock it's already taken by the thread which performs the stop.   Some relevant parts of the thread dump     shardConsumers-Source: source -> Sink: sink (12/12)-thread-0priority:5 - threadId:0x00007f7c7c298000 - nativeId:0x774c - nativeId (decimal):30540 - state:BLOCKED stackTrace: java.lang. Thread .State: BLOCKED (on object monitor) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774) - waiting to lock <0x00000007abe026c8> (a java.lang. Object ) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang. Thread .run( Thread .java:748)     but the lock 0x00000007abe026c8 it's already taken here Source: source -> Sink: sink (12/12)priority:5 - threadId:0x00007f7c2c0cf000 - nativeId:0x76fe - nativeId (decimal):30462 - state:TIMED_WAITING stackTrace: java.lang. Thread .State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007ac041158> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:637) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:365) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:147) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$838/1354138988.run(Unknown Source) at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125) at org.apache.flink.util.function.FunctionUtils$$Lambda$839/1349808364.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) - locked <0x00000007abe026c8> (a java.lang. Object ) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang. Thread .run( Thread .java:748)   So, FlinkKinesisConsumer#close() method is called, which delegates to FlinkKinesisConsumer#cancel(), then KinesisDataFetcher#shutdownFetcher() is called and as it seems it executes successfully, cause I don't  have any errrors in the logs. If there would have been an exception, the message from FlinkKinesisConsumer line 367 "Error while closing Kinesis data fetcher" would have been in my logs. Then fetcher.awaitTermination() is called, and in KinesisDataFetcher#awaitTermination it waits, for 1 minute. But there isn't any Thread. setDefaultUncaughtExceptionHandler set, so if there's an exception in the shardConsumersExecutor, I can't really see it. Side question: can I set the UncaughtExceptionHandler somehow?   For me it is quite obvious that the shardConsumersExecutor is not shutdown and based on my JDK knowledge, calling shutdownNow on an executor service does not guarantee that it will actually terminate. And since multiple threads are competing for the checkpoint lock, I guess the threads from the shardConsumersExecutor must be interrupted. How come it works when doing a cancel? Is there a forced interrupt being done?  
            yunta Yun Tang added a comment -

            I could reproduce this with a batch lines of code. There existed a dead lock here: one thread is waiting to get the lock, while the other thread which holds the lock is waiting for the previous thread to terminate.

            I still need some time to investigate this to see whether there existed more possible dead locks there.

            yunta Yun Tang added a comment - I could reproduce this with a batch lines of code. There existed a dead lock here: one thread is waiting to get the lock, while the other thread which holds the lock is waiting for the previous thread to terminate. I still need some time to investigate this to see whether there existed more possible dead locks there.
            yunta Yun Tang added a comment -

            I think there might be three solutions:

            1. Use new source operator API which introduced in FLIP-27 to avoid the checkpoint lock with mailbox when emitting records in source thread. However, I think new source operator API is not fully ready now.
            2. Avoid the endless awaitTermination when shutdownFetcher, I am not sure whether this could meet kinesis requests, cc tzulitai
            3. Check whether this fetcher is running within emitRecordAndUpdateState first before try to access the checkpointLock. If not running, just return instead of access the checkpointLock. I think this could resolve this problem.
            yunta Yun Tang added a comment - I think there might be three solutions: Use new source operator API which introduced in FLIP-27 to avoid the checkpoint lock with mailbox when emitting records in source thread. However, I think new source operator API is not fully ready now. Avoid the endless awaitTermination when shutdownFetcher , I am not sure whether this could meet kinesis requests, cc tzulitai Check whether this fetcher is running within emitRecordAndUpdateState first before try to access the checkpointLock . If not running, just return instead of access the checkpointLock . I think this could resolve this problem.

            How come it works when I perform a cancel, instead of stop action?

            cvasii Vasii Cosmin Radu added a comment - How come it works when I perform a cancel, instead of stop action?
            yunta Yun Tang added a comment -

            cvasii cancel job does not need to trigger a savepoint before shutdown. In other words, cancel would not trigger "notifyCheckpoitComplete" on task side once the checkpoint is completed on checkpoint coordinator side. When we call "notifyCheckpoitComplete", it will grab the lock which would also be grabbed when await termination of kinesis fetcher.

            yunta Yun Tang added a comment - cvasii cancel job does not need to trigger a savepoint before shutdown. In other words, cancel would not trigger "notifyCheckpoitComplete" on task side once the checkpoint is completed on checkpoint coordinator side. When we call "notifyCheckpoitComplete", it will grab the lock which would also be grabbed when await termination of kinesis fetcher.
            ubyyj Youjun Yuan added a comment -

            any update on this?

            I hit the exactly same issue here, with 1.10.1.

            BTW, call the savepoint rest api can trigger savepoint and as well cancel the job.

            something like this:

            curl -d '{"target-directory": "s3://dp-flink/savepoints/","cancel-job":true}' :jobmanagerTracking-URL/jobs/:jobid/savepoints

            ubyyj Youjun Yuan added a comment - any update on this? I hit the exactly same issue here, with 1.10.1. BTW, call the savepoint rest api can trigger savepoint and as well cancel the job. something like this: curl -d '{"target-directory": "s3://dp-flink/savepoints/","cancel-job":true}' :jobmanagerTracking-URL /jobs/:jobid/savepoints
            qinjunjerry Jun Qin added a comment -

            I also see the exact same behavior happened to one customer with just one TM and parallelism=1.  By 'exact', I mean

            • the source thread is holding the checkpoint lock, and is waiting for the shardConsumers to finish, but the shardConsumers cannot finish because they are waiting to lock the checkpoint lock
            • the code line numbers in the stack trace is same as the one attached there. 

             

            qinjunjerry Jun Qin added a comment - I also see the exact same behavior happened to one customer with just one TM and parallelism=1.  By 'exact', I mean the source thread is holding the checkpoint lock, and is waiting for the shardConsumers to finish, but the shardConsumers cannot finish because they are waiting to lock the checkpoint lock the code line numbers in the stack trace is same as the one attached there.   
            qinjunjerry Jun Qin added a comment -

            The source thread holds the checkpoint lock and is waiting here forever for the shardConsumers thread to finish, while the shardConsumers can only finish once they get the checkpoint lock:

            // From: KinesisDataFetcher.java
            public void awaitTermination() throws InterruptedException {
               while (!shardConsumersExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
                  // Keep waiting.
               }
            }
            
            qinjunjerry Jun Qin added a comment - The source thread holds the checkpoint lock and is waiting here forever for the shardConsumers thread to finish, while the shardConsumers can only finish once they get the checkpoint lock: // From: KinesisDataFetcher.java public void awaitTermination() throws InterruptedException { while (!shardConsumersExecutor.awaitTermination(1, TimeUnit.MINUTES)) { // Keep waiting. } }
            yunta Yun Tang added a comment -

            I think my previous suggestion of "Check whether this fetcher is running within emitRecordAndUpdateState first before try to access the checkpointLock" might not solve the problem fundamentally if running has been set as false while checkpointLock has been synchronized in the main thread. Shall the endless await termination is a must be for kinesis fetcher tzulitai?

            yunta Yun Tang added a comment - I think my previous suggestion of "Check whether this fetcher is running within emitRecordAndUpdateState first before try to access the checkpointLock" might not solve the problem fundamentally if running has been set as false while checkpointLock has been synchronized in the main thread. Shall the endless await termination is a must be for kinesis fetcher tzulitai ?
            mapohl Matthias Pohl added a comment -

            We're investigating a bug with stop-with-savepoint where the savepoint is successfully created but the job does not finish due to some failure after savepoint creation. See FLINK-21030 for further details. FLINK-17170 might be related.

            mapohl Matthias Pohl added a comment - We're investigating a bug with stop-with-savepoint where the savepoint is successfully created but the job does not finish due to some failure after savepoint creation. See FLINK-21030 for further details. FLINK-17170 might be related.
            kezhuw Kezhu Wang added a comment -

            I think it is problem of FlinkKinesisConsumer.cancel, it should not await fetcher to finished, it should do only signalling.

            qinjunjerry is correct about the deadlock. In stop-with-savepoint path, FlinkKinesisConsumer.cancel is called with checkpointLock hold.

            kezhuw Kezhu Wang added a comment - I think it is problem of FlinkKinesisConsumer.cancel , it should not await fetcher to finished, it should do only signalling. qinjunjerry is correct about the deadlock. In stop-with-savepoint path, FlinkKinesisConsumer.cancel is called with checkpointLock hold.
            kezhuw Kezhu Wang added a comment -

            In stop-with-savepoint path, FlinkKinesisConsumer.cancel is called with checkpointLock hold.

            This holds after 1.10, so this issue should exist in all versions after 1.10.

            kezhuw Kezhu Wang added a comment - In stop-with-savepoint path, FlinkKinesisConsumer.cancel is called with checkpointLock hold. This holds after 1.10, so this issue should exist in all versions after 1.10.
            flink-jira-bot Flink Jira Bot added a comment -

            This critical issue is unassigned and itself and all of its Sub-Tasks have not been updated for 7 days. So, it has been labeled "stale-critical". If this ticket is indeed critical, please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized.

            flink-jira-bot Flink Jira Bot added a comment - This critical issue is unassigned and itself and all of its Sub-Tasks have not been updated for 7 days. So, it has been labeled "stale-critical". If this ticket is indeed critical, please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized.
            dannycranmer Danny Cranmer added a comment -

            I have taken a read through the issue/code and agree with the diagnosed deadlock. I agree that removing fetcher.awaitTermination() from FlinkKinesisConsumer::cancel will fix the deadlock. However this would potentially result in the job transitioning to finished with open resources. The expectation would be that the resources will terminate, but this could still result in temporary leak/dangling objects on the TM.

            I believe we can fix this by moving the fetcher.awaitTermination() to the close() method. I have taken a quick dive and it looks like this would be called outside of the checkpoint lock. However I am not 100% sure this would be ok for all Source lifecycle possibilities. Does anyone have any reason to believe this is not a good idea?

            dannycranmer Danny Cranmer added a comment - I have taken a read through the issue/code and agree with the diagnosed deadlock. I agree that removing fetcher.awaitTermination() from FlinkKinesisConsumer::cancel will fix the deadlock. However this would potentially result in the job transitioning to finished with open resources. The expectation would be that the resources will terminate, but this could still result in temporary leak/dangling objects on the TM. I believe we can fix this by moving the fetcher.awaitTermination() to the close() method. I have taken a quick dive and it looks like this would be called outside of the checkpoint lock. However I am not 100% sure this would be ok for all Source lifecycle possibilities. Does anyone have any reason to believe this is not a good idea?
            arvid Arvid Heise added a comment -

            Hi dannycranmer , why do we need to have this{{ fetcher.awaitTermination()}} in cancel/close in the first place? Wouldn't it suffice to just rely on the await at the end of FlinkKinesisConsumer#run? As far as I can see, the cancel is shutting down the fetcher, so it should return in a graceful manner in run. Going further, runFetcher is already invoking awaitTermination on its own, so we could clean this up even further.

            arvid Arvid Heise added a comment - Hi dannycranmer , why do we need to have this{{ fetcher.awaitTermination()}} in cancel/close in the first place? Wouldn't it suffice to just rely on the await at the end of FlinkKinesisConsumer#run ? As far as I can see, the cancel is shutting down the fetcher, so it should return in a graceful manner in run . Going further, runFetcher is already invoking  awaitTermination on its own, so we could clean this up even further.
            pnowojski Piotr Nowojski added a comment - - edited

            I also think sources shouldn't be doing blocking waits on such conditions inside cancel(). Basically what dannycranmer suggested makes sense.

            edit:

            Hi Danny Cranmer , why do we need to have this{{ fetcher.awaitTermination()}} in cancel/close in the first place? Wouldn't it suffice to just rely on the await at the end of FlinkKinesisConsumer#run? As far as I can see, the cancel is shutting down the fetcher, so it should return in a graceful manner in run. Going further, runFetcher is already invoking awaitTermination on its own, so we could clean this up even further.

            arvid FlinkKinesisConsumer#run can be interrupted, and that's expected behaviour. close should guarantee that all resources are cleaned up. If resource cleaning up is not possible, it's better for close to hang until task's cancellation times outs, and whole Task Manager is killed.

            pnowojski Piotr Nowojski added a comment - - edited I also think sources shouldn't be doing blocking waits on such conditions inside cancel() . Basically what dannycranmer suggested makes sense. edit: Hi Danny Cranmer , why do we need to have this{{ fetcher.awaitTermination()}} in cancel/close in the first place? Wouldn't it suffice to just rely on the await at the end of FlinkKinesisConsumer#run? As far as I can see, the cancel is shutting down the fetcher, so it should return in a graceful manner in run. Going further, runFetcher is already invoking awaitTermination on its own, so we could clean this up even further. arvid FlinkKinesisConsumer#run can be interrupted, and that's expected behaviour. close should guarantee that all resources are cleaned up. If resource cleaning up is not possible, it's better for close to hang until task's cancellation times outs, and whole Task Manager is killed.
            arvid Arvid Heise added a comment -

            Merged into 1.12 as 1de3d6345a4132dda4b31f66275fb732afe3ef30.

            arvid Arvid Heise added a comment - Merged into 1.12 as 1de3d6345a4132dda4b31f66275fb732afe3ef30.
            arvid Arvid Heise added a comment -

            Merged into 1.13 as a102549f08759e177074dad286fef2f56176d005..282f9a3d5505a5aa58d7d9cca466939610d41ed3.

            arvid Arvid Heise added a comment - Merged into 1.13 as a102549f08759e177074dad286fef2f56176d005..282f9a3d5505a5aa58d7d9cca466939610d41ed3.
            arvid Arvid Heise added a comment -

            Merged into master as 442dc76bc76b9a7628202b33b21126ef3d48a90c..49d9092b31d0a95d727b790ee04a4ed8d72924e3.

            arvid Arvid Heise added a comment - Merged into master as 442dc76bc76b9a7628202b33b21126ef3d48a90c..49d9092b31d0a95d727b790ee04a4ed8d72924e3.

            People

              arvid Arvid Heise
              cvasii Vasii Cosmin Radu
              Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: