Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4905

Kafka test instability IllegalStateException: Client is not started

    Details

      Description

      The following travis build (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) failed because of this error

      08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to commit offsets to Kafka) changed to FAILING.
      java.lang.RuntimeException: Error while confirming checkpoint
      	at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.IllegalStateException: Client is not started
      	at org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
      	at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
      	at org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
      	at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
      	at org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
      	at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
      	at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
      	at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
      	at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
      	at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
      	... 5 more
      08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: Unnamed (1/3)
      
      1. Kafka08Fetcher.png
        38 kB
        Andrew Efimov

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Had a quick look at this. The only place we're closing the curator client ourselves is in the finally clause of the runFetchLoop in the 0.8 Fetcher.
          I'm suspecting the cause is that notifyCheckpointComplete was called before the fetch loop entered the finally clause and started the offset committing to ZK (prepareAndCommitOffsets() call), but the curator client was closed mid-way once the fetch loop entered finally clause. A good fix perhaps would be to add a lock to synchronize close() and prepareAndCommitOffsets() / getCommittedOffsets in the ZK offset handler to make sure we aren't called close mid way.

          From the test logs the testStartFromKafkaCommitOffsets seems unrelated to this instability.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Had a quick look at this. The only place we're closing the curator client ourselves is in the finally clause of the runFetchLoop in the 0.8 Fetcher. I'm suspecting the cause is that notifyCheckpointComplete was called before the fetch loop entered the finally clause and started the offset committing to ZK ( prepareAndCommitOffsets() call ), but the curator client was closed mid-way once the fetch loop entered finally clause. A good fix perhaps would be to add a lock to synchronize close() and prepareAndCommitOffsets() / getCommittedOffsets in the ZK offset handler to make sure we aren't called close mid way. From the test logs the testStartFromKafkaCommitOffsets seems unrelated to this instability.
          Show
          rmetzger Robert Metzger added a comment - I'm not 100% sure if this can happen, because close() and notifyCheckpointComplete are already synchronized. The lock for close is here: https://github.com/apache/flink/blob/770f2f83a81b2810aff171b2f56390ef686f725a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L279 The lock for the notify is here: https://github.com/apache/flink/blob/770f2f83a81b2810aff171b2f56390ef686f725a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L565
          Hide
          Andrew Efimov Andrew Efimov added a comment - - edited

          I have started investigation the issue and what I have:
          1. InterruptedException in Kafka08Fetcher.runFetchLoop

          2 > 08:18:41,705 ERROR org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher  - Exception while shutting down consumer threads
          java.lang.InterruptedException
          	at java.lang.Object.wait(Native Method)
          	at java.lang.Thread.join(Thread.java:1289)
          	at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:293)
          ...
          

          root cause - it will occur when call t.join on interrupted thread
          In finally block of the Kafka08Fetcher.runFetchLoop t.join is called for threads cancelation procedure, but Kafka08Fetcher.runFetchLoop is executing in the same thread as StreamTask
          which can be interrupted earlier.
          it is fixed by Stephan Ewen (10/26/16):

          // clear the interruption flag
          // this allows the joining on consumer threads (on best effort) to happen in
          // case the initial interrupt already
          Thread.interrupted();
          

          There are several problems in the interaction with ZookeeperOffsetHandler:
          2. I agree with Tzu-Li (Gordon) Tai, work with object ZookeeperOffsetHandler in Kafka08Fetcher is not assured thread safe:
          ok, there are synchronization points in StreamTask, but what happens in case of failure in runFetchLoop
          StreamTask lock will be free, because run() is performing,
          during performing finally block of runFetchLoop another thread can call notifyCheckpointComplete and acquire lock
          although close method is already called on ZookeeperOffsetHandler.

          3. ZookeeperOffsetHandler is given into PeriodicOffsetCommitter thread which is closing without thread join, also there is possibility to call prepareAndCommitOffsets although close method is already called on ZookeeperOffsetHandler.

          Show
          Andrew Efimov Andrew Efimov added a comment - - edited I have started investigation the issue and what I have: 1. InterruptedException in Kafka08Fetcher.runFetchLoop 2 > 08:18:41,705 ERROR org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher - Exception while shutting down consumer threads java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1289) at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:293) ... root cause - it will occur when call t.join on interrupted thread In finally block of the Kafka08Fetcher.runFetchLoop t.join is called for threads cancelation procedure, but Kafka08Fetcher.runFetchLoop is executing in the same thread as StreamTask which can be interrupted earlier. it is fixed by Stephan Ewen (10/26/16): // clear the interruption flag // this allows the joining on consumer threads (on best effort) to happen in // case the initial interrupt already Thread .interrupted(); There are several problems in the interaction with ZookeeperOffsetHandler : 2. I agree with Tzu-Li (Gordon) Tai , work with object ZookeeperOffsetHandler in Kafka08Fetcher is not assured thread safe: ok, there are synchronization points in StreamTask , but what happens in case of failure in runFetchLoop StreamTask lock will be free, because run() is performing, during performing finally block of runFetchLoop another thread can call notifyCheckpointComplete and acquire lock although close method is already called on ZookeeperOffsetHandler . 3. ZookeeperOffsetHandler is given into PeriodicOffsetCommitter thread which is closing without thread join, also there is possibility to call prepareAndCommitOffsets although close method is already called on ZookeeperOffsetHandler .
          Hide
          Andrew Efimov Andrew Efimov added a comment -

          I would suggest the following solution:

          • not only set null in finally block of Kafka08Fetcher this.zookeeperOffsetHandler = null;, also set flag volatile closed for ZookeeperOffsetHandler. Threads will check the flag before call methods ZookeeperOffsetHandler.setOffsetInZooKeeper or ZookeeperOffsetHandler.getOffsetFromZooKeeper
          • and create atomic thread counter for ZookeeperOffsetHandler and perform close only if counter = 0, with timeout of cause
            or use CheckpointLock that is in context

          Team, what do you think?

          Show
          Andrew Efimov Andrew Efimov added a comment - I would suggest the following solution: not only set null in finally block of Kafka08Fetcher this.zookeeperOffsetHandler = null; , also set flag volatile closed for ZookeeperOffsetHandler . Threads will check the flag before call methods ZookeeperOffsetHandler.setOffsetInZooKeeper or ZookeeperOffsetHandler.getOffsetFromZooKeeper and create atomic thread counter for ZookeeperOffsetHandler and perform close only if counter = 0, with timeout of cause or use CheckpointLock that is in context Team, what do you think?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Andrew Efimov,

          Thanks for looking at this. I haven't looked back at this issue in too much detail, but from what I recall, your first suggestion seems reasonable. However, even with a closed flag, the ZookeeperOffsetHandler#close() and ZookeeperOffsetHandler#prepareAndCommitOffsets() still needs to be synchronized, otherwise the curator client can still be closed by another thread before prepareAndCommitOffsets reaches the client API call.

          Have you found out a way to reproduce this issue? Would be great if we add a test with the fix to ensure this exception doesn't happen again.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Andrew Efimov , Thanks for looking at this. I haven't looked back at this issue in too much detail, but from what I recall, your first suggestion seems reasonable. However, even with a closed flag, the ZookeeperOffsetHandler#close() and ZookeeperOffsetHandler#prepareAndCommitOffsets() still needs to be synchronized, otherwise the curator client can still be closed by another thread before prepareAndCommitOffsets reaches the client API call. Have you found out a way to reproduce this issue? Would be great if we add a test with the fix to ensure this exception doesn't happen again.
          Hide
          Andrew Efimov Andrew Efimov added a comment - - edited

          Hi Tzu-Li (Gordon) Tai
          Thanks for reply.
          It is very difficult to reproduce this state of concurrency model. But we should make the model more transparent and predictable.
          I have created diagram that describes behaviour (used free idea plugin plantuml):
          see attached file Kafka08Fetcher.png
          There are two thread which can use the same Kafka08Fetcher and for each we should make a guarantee that curator client will be working at the moment of using.

          And I would suggest two approaches, please take a look at the draft:
          1. If each StreamTask has its Kafka08Fetcher then we can create a flag that will indicate commit handler is used or not
          (or we can create atomicInteger to increment and decrement count of thread which use handler)
          https://github.com/BrainLogic/flink/commit/ba327dc5c991d12366ab16ea6d8e707238b2e79c
          2. using checkpoint lock from sourceContext
          https://github.com/BrainLogic/flink/commit/abe16442c5dcf294b3c2bd28142765fe28515f8c

          Let's discuss and then I'm going to do tests.

          Show
          Andrew Efimov Andrew Efimov added a comment - - edited Hi Tzu-Li (Gordon) Tai Thanks for reply. It is very difficult to reproduce this state of concurrency model. But we should make the model more transparent and predictable. I have created diagram that describes behaviour (used free idea plugin plantuml): see attached file Kafka08Fetcher.png There are two thread which can use the same Kafka08Fetcher and for each we should make a guarantee that curator client will be working at the moment of using. And I would suggest two approaches, please take a look at the draft: 1. If each StreamTask has its Kafka08Fetcher then we can create a flag that will indicate commit handler is used or not (or we can create atomicInteger to increment and decrement count of thread which use handler) https://github.com/BrainLogic/flink/commit/ba327dc5c991d12366ab16ea6d8e707238b2e79c 2. using checkpoint lock from sourceContext https://github.com/BrainLogic/flink/commit/abe16442c5dcf294b3c2bd28142765fe28515f8c Let's discuss and then I'm going to do tests.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Andrew Efimov,

          Sorry for the very late response, I've been quite busy recently.
          Thanks a lot for looking into this in so much detail!

          For the two approaches you mentioned, the second one seems more reasonable to me at a first look. I would also recommend that we change the ZookeeperOffsetHandler's own logic to be fail-proof to these concurrent accesses.
          From what I understand of your approaches, we're still relying on the fetcher's logic to call ZookeeperOffsetHandler safely. I think it would be a good design that the ZookeeperOffsetHandler is also self-contained, and don't need to rely on the callers' behaviour at all.

          I'm not really sure if my comment makes sense, since I haven't looked too much into this yet. I think it would be great if you can go ahead and open the PR, and we can further look at it from there I'll be happy to shepherd your contribution!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Andrew Efimov , Sorry for the very late response, I've been quite busy recently. Thanks a lot for looking into this in so much detail! For the two approaches you mentioned, the second one seems more reasonable to me at a first look. I would also recommend that we change the ZookeeperOffsetHandler 's own logic to be fail-proof to these concurrent accesses. From what I understand of your approaches, we're still relying on the fetcher's logic to call ZookeeperOffsetHandler safely. I think it would be a good design that the ZookeeperOffsetHandler is also self-contained, and don't need to rely on the callers' behaviour at all. I'm not really sure if my comment makes sense, since I haven't looked too much into this yet. I think it would be great if you can go ahead and open the PR, and we can further look at it from there I'll be happy to shepherd your contribution!
          Hide
          StephanEwen Stephan Ewen added a comment -

          I saw the issue happening again: https://api.travis-ci.org/jobs/183927280/log.txt?deansi=true

          Andrew Efimov are you still working on a patch for this?

          Show
          StephanEwen Stephan Ewen added a comment - I saw the issue happening again: https://api.travis-ci.org/jobs/183927280/log.txt?deansi=true Andrew Efimov are you still working on a patch for this?
          Hide
          Andrew Efimov Andrew Efimov added a comment -

          Hi team,
          Thanks for replay. Sorry, I had a hard work of this week.
          Tzu-Li (Gordon) Tai I will be grateful for your mentoring!
          Stephan Ewen I believe, that the fix will be done early next week(19 dec).

          Show
          Andrew Efimov Andrew Efimov added a comment - Hi team, Thanks for replay. Sorry, I had a hard work of this week. Tzu-Li (Gordon) Tai I will be grateful for your mentoring! Stephan Ewen I believe, that the fix will be done early next week(19 dec).
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user BrainLogic opened a pull request:

          https://github.com/apache/flink/pull/3035

          [ FLINK-4905] Kafka test instability IllegalStateException: Client is not started

          Root cause of the issue:
          `notifyCheckpointComplete` can occur during the cancellation or `runFetchLoop` fail and call `commitOffset` on closed `curatorClient`, so use `CheckpointLock` to close `curatorClient`.
          There is a diagram in the jira that describes behaviour of using `Kafka08Fetcher`.

          Notes:
          1. I don't like approach where `checkPointLock` is leaked into `SourceContext`, this may lead to deadlock.
          2. Work with `ZookeeperOffsetHandler` can continue even after the call Kafka08Fetcher.cancel until the `Handler` will not be null.
          3. `ZookeeperOffsetHandler` could have `ReadWriteLock` and use `writeLock` only for close operation, but I have doubt, Flink code base does not contain any `ReentrantLocks`. There is possibility to implement such logic without any locks by using lock-free approach.
          4. Also in *jdk8, we have powerful tool `StampedLock`. In which version of Flink we will be able to use **jdk8* features?

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/BrainLogic/flink FLINK-4905_PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3035.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3035


          commit fd5524eb15ec772f95c4168818264c63e45f5784
          Author: cube <aefimov.cube@gmail.com>
          Date: 2016-12-20T23:41:12Z

          FLINK-4905 Kafka test instability IllegalStateException: Client is not started


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user BrainLogic opened a pull request: https://github.com/apache/flink/pull/3035 [ FLINK-4905 ] Kafka test instability IllegalStateException: Client is not started Root cause of the issue: `notifyCheckpointComplete` can occur during the cancellation or `runFetchLoop` fail and call `commitOffset` on closed `curatorClient`, so use `CheckpointLock` to close `curatorClient`. There is a diagram in the jira that describes behaviour of using `Kafka08Fetcher`. Notes: 1. I don't like approach where `checkPointLock` is leaked into `SourceContext`, this may lead to deadlock. 2. Work with `ZookeeperOffsetHandler` can continue even after the call Kafka08Fetcher.cancel until the `Handler` will not be null. 3. `ZookeeperOffsetHandler` could have `ReadWriteLock` and use `writeLock` only for close operation, but I have doubt, Flink code base does not contain any `ReentrantLocks`. There is possibility to implement such logic without any locks by using lock-free approach. 4. Also in * jdk8 , we have powerful tool `StampedLock`. In which version of Flink we will be able to use **jdk8 * features? You can merge this pull request into a Git repository by running: $ git pull https://github.com/BrainLogic/flink FLINK-4905 _PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3035.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3035 commit fd5524eb15ec772f95c4168818264c63e45f5784 Author: cube <aefimov.cube@gmail.com> Date: 2016-12-20T23:41:12Z FLINK-4905 Kafka test instability IllegalStateException: Client is not started
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3035

          Thank you for working on this @BrainLogic!

          First, regarding the approach proposed here:
          The approach should be able to fix the `IllegalStateException` we're encountering.
          However, I would also want to try to make `ZookeeperOffsetHandler` thread-safe and insensitive to the ordering `commitOffsets` and `close` are called from the outside. Otherwise, from the context of `Kafka08Fetcher` alone, the reason for the new synchronization isn't that obvious, and might result in harder-to-maintain code in the long run. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3035 Thank you for working on this @BrainLogic! First, regarding the approach proposed here: The approach should be able to fix the `IllegalStateException` we're encountering. However, I would also want to try to make `ZookeeperOffsetHandler` thread-safe and insensitive to the ordering `commitOffsets` and `close` are called from the outside. Otherwise, from the context of `Kafka08Fetcher` alone, the reason for the new synchronization isn't that obvious, and might result in harder-to-maintain code in the long run. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3035

          Regarding some of your notes:

          • The exposure of the checkpoint lock through `SourceContext` is meant for sources to atomically update their state (e.x. Kafka offsets) with record emitting, with regards to Flink's checkpointing mechanics for exactly once.
          • I don't think there has been any solid discussion on migrating to Java 8 yet.
          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3035 Regarding some of your notes: The exposure of the checkpoint lock through `SourceContext` is meant for sources to atomically update their state (e.x. Kafka offsets) with record emitting, with regards to Flink's checkpointing mechanics for exactly once. I don't think there has been any solid discussion on migrating to Java 8 yet.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          Dropping Java 7 has not happened and there is not yet consensus in the community, so it probably will not happen in the next weeks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 Dropping Java 7 has not happened and there is not yet consensus in the community, so it probably will not happen in the next weeks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3035

          Hi @BrainLogic, we're still seeing this exception in tests sometimes, and it'll be great to have this fixed soon.

          Please let us know on how you'd like to proceed with the contribution, otherwise if you don't have time, I can also pick it up Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3035 Hi @BrainLogic, we're still seeing this exception in tests sometimes, and it'll be great to have this fixed soon. Please let us know on how you'd like to proceed with the contribution, otherwise if you don't have time, I can also pick it up Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          I think we can take a very simple approach here. Many other parts of the code follow the approach to tolerate exceptions thrown during cancellation, or during asynchronous calls on closed operators.

          • The client may be closed during shutting down of the Kafka08Fetcher, before even the surrounding KafkaConsumerBase knows that it is closing
          • The KafkaConsumerBase tries to commit offsets, sees the exception, and re-throws it since it assumes it.
          • We can make the Kafka08Fetcher catch exceptions when committing offsets, and only re-throwing them if the fetcher is still running. That should do the trick.

          BTW: There are plans to make the streaming API sources appear to be single threaded, to avoid that sources have to plan for such situations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I think we can take a very simple approach here. Many other parts of the code follow the approach to tolerate exceptions thrown during cancellation, or during asynchronous calls on closed operators. The client may be closed during shutting down of the Kafka08Fetcher, before even the surrounding KafkaConsumerBase knows that it is closing The KafkaConsumerBase tries to commit offsets, sees the exception, and re-throws it since it assumes it. We can make the Kafka08Fetcher catch exceptions when committing offsets, and only re-throwing them if the fetcher is still running. That should do the trick. BTW: There are plans to make the streaming API sources appear to be single threaded, to avoid that sources have to plan for such situations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          I would like to pick this fix up. The exception has still occurred a few times for me in the past, and I prefer the above outlined solution, because it adds less locking on cancellation/shutdown, meaning there are fewer implications on deadlocks or long stalls on cancellation/shutdown.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I would like to pick this fix up. The exception has still occurred a few times for me in the past, and I prefer the above outlined solution, because it adds less locking on cancellation/shutdown, meaning there are fewer implications on deadlocks or long stalls on cancellation/shutdown.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          Bit of background how the error happens:

          • The test throws a `SuccessException`
          • While being in the finally clause and shutting down the CluratorClient, the containing `Task` has not seen the exception.
          • When the commitOffsets() call fails, this overrides the `SuccessException` as the reason why the streaming program terminated.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 Bit of background how the error happens: The test throws a `SuccessException` While being in the finally clause and shutting down the CluratorClient, the containing `Task` has not seen the exception. When the commitOffsets() call fails, this overrides the `SuccessException` as the reason why the streaming program terminated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3035

          Thank you for picking this up @StephanEwen! I've taken a look at your approach in the local branch, +1 to the approach.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3035 Thank you for picking this up @StephanEwen! I've taken a look at your approach in the local branch, +1 to the approach.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user BrainLogic commented on the issue:

          https://github.com/apache/flink/pull/3035

          Thanks for help and explanation.
          But in this approach an unlikely bug will be still occurred:
          `zkHandler.prepareAndCommitOffsets(offsets);` throws important exception when running is true
          then running becomes `false` and we get a swallowed exception in this rare case.
          Maybe at least, should log the exception anyway?

          I agree that it is not rational to spend time for time consuming refactoring kafka connector of 8 version.
          Current version of kafka is 0.10 and hardly, users will widely use 8 version.

          Show
          githubbot ASF GitHub Bot added a comment - Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 Thanks for help and explanation. But in this approach an unlikely bug will be still occurred: `zkHandler.prepareAndCommitOffsets(offsets);` throws important exception when running is true then running becomes `false` and we get a swallowed exception in this rare case. Maybe at least, should log the exception anyway? I agree that it is not rational to spend time for time consuming refactoring kafka connector of 8 version. Current version of kafka is 0.10 and hardly, users will widely use 8 version.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3035

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3035
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          @BrainLogic I'll incorporate your comment into a followup commit...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 @BrainLogic I'll incorporate your comment into a followup commit...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          Quick question: I am not sure if this scenario can happen like that:

          > But in this approach an unlikely bug will be still occurred:
          zkHandler.prepareAndCommitOffsets(offsets); throws important exception when running is true
          then running becomes false and we get a swallowed exception in this rare case.
          Maybe at least, should log the exception anyway?

          When the exception is thrown in `zkHandler.prepareAndCommitOffsets(offsets)` and `running = true` then the exception does not get swallowed. The `running` flag can become false only after the exception was thrown. If the flag becomes false due to another reason, then there is either another root cause for the failure, or the operator is shutting down anyways. In both cases, it is probably okay to not report the exception.

          Please let me know if you think I have overlooked something here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 Quick question: I am not sure if this scenario can happen like that: > But in this approach an unlikely bug will be still occurred: zkHandler.prepareAndCommitOffsets(offsets); throws important exception when running is true then running becomes false and we get a swallowed exception in this rare case. Maybe at least, should log the exception anyway? When the exception is thrown in `zkHandler.prepareAndCommitOffsets(offsets)` and `running = true` then the exception does not get swallowed. The `running` flag can become false only after the exception was thrown. If the flag becomes false due to another reason, then there is either another root cause for the failure, or the operator is shutting down anyways. In both cases, it is probably okay to not report the exception. Please let me know if you think I have overlooked something here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user BrainLogic commented on the issue:

          https://github.com/apache/flink/pull/3035

          Distributed systems and multithreading environments make us think in term of logical clock, like Lamport clock, step by step:
          Thread1 - fetcher is running `running = true`
          Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)`
          Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` throws an exception
          Thread1 stop fetcher and change flag `running = false` in normal way without any exception
          Thread2 read the flag `running = false` and `return` although, there is a reason of the commit failure that is different from "Client was closed and running= false"
          Thread1 fetcher is stopped successfully
          There is no exception or any information in log regarding the exception from `zkHandler.prepareAndCommitOffsets(offsets)`

          Again this is a rare case when we can lose a root cause of strange behavior - offset will not be committed although there are no any exceptions. Am I wrong?

          Show
          githubbot ASF GitHub Bot added a comment - Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 Distributed systems and multithreading environments make us think in term of logical clock, like Lamport clock, step by step: Thread1 - fetcher is running `running = true` Thread2 performs `zkHandler.prepareAndCommitOffsets(offsets)` Thread2 `running = true` and `zkHandler.prepareAndCommitOffsets(offsets)` throws an exception Thread1 stop fetcher and change flag `running = false` in normal way without any exception Thread2 read the flag `running = false` and `return` although, there is a reason of the commit failure that is different from "Client was closed and running= false" Thread1 fetcher is stopped successfully There is no exception or any information in log regarding the exception from `zkHandler.prepareAndCommitOffsets(offsets)` Again this is a rare case when we can lose a root cause of strange behavior - offset will not be committed although there are no any exceptions. Am I wrong?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          I this case, the exception would not be logged, true.
          It is a very rare corner case that should not affect correctness, and not really distinguishable from the case where an exception is thrown after the source went into finishing mode.

          My feeling is to not log here.

          • The advantage is that we don't pollute the log with meaningless exception in the common case (many users would be led onto a false track).
          • Given that the corner case you described does not affect any expected behavior (the committing action might as well not have happened at all it it were started a few msecs later) it is okay to "swallow" the exception.
          • If it does in fact affect the offset committing in more cases, it will surely also occur at another committing attempt during execution.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I this case, the exception would not be logged, true. It is a very rare corner case that should not affect correctness, and not really distinguishable from the case where an exception is thrown after the source went into finishing mode. My feeling is to not log here. The advantage is that we don't pollute the log with meaningless exception in the common case (many users would be led onto a false track). Given that the corner case you described does not affect any expected behavior (the committing action might as well not have happened at all it it were started a few msecs later) it is okay to "swallow" the exception. If it does in fact affect the offset committing in more cases, it will surely also occur at another committing attempt during execution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user BrainLogic commented on the issue:

          https://github.com/apache/flink/pull/3035

          I caught yours idea and together with the argument which I mentioned above - users will not extensively use 8 version of kafka connector, I agree with this proposal.
          Let me finish this jira, I can create fix based on yours proposal and try to implement test tonight.

          Show
          githubbot ASF GitHub Bot added a comment - Github user BrainLogic commented on the issue: https://github.com/apache/flink/pull/3035 I caught yours idea and together with the argument which I mentioned above - users will not extensively use 8 version of kafka connector, I agree with this proposal. Let me finish this jira, I can create fix based on yours proposal and try to implement test tonight.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3035

          I pushed a fix for that to master in e7cda75b8594417559d6aac6229b5893f5459f0f

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3035 I pushed a fix for that to master in e7cda75b8594417559d6aac6229b5893f5459f0f
          Hide
          Andrew Efimov Andrew Efimov added a comment -

          Fixed by Stephan Ewen

          Show
          Andrew Efimov Andrew Efimov added a comment - Fixed by Stephan Ewen

            People

            • Assignee:
              Andrew Efimov Andrew Efimov
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development