GitHub user BrainLogic opened a pull request:
FLINK-4905] Kafka test instability IllegalStateException: Client is not started
Root cause of the issue:
`notifyCheckpointComplete` can occur during the cancellation or `runFetchLoop` fail and call `commitOffset` on closed `curatorClient`, so use `CheckpointLock` to close `curatorClient`.
There is a diagram in the jira that describes behaviour of using `Kafka08Fetcher`.
1. I don't like approach where `checkPointLock` is leaked into `SourceContext`, this may lead to deadlock.
2. Work with `ZookeeperOffsetHandler` can continue even after the call Kafka08Fetcher.cancel until the `Handler` will not be null.
3. `ZookeeperOffsetHandler` could have `ReadWriteLock` and use `writeLock` only for close operation, but I have doubt, Flink code base does not contain any `ReentrantLocks`. There is possibility to implement such logic without any locks by using lock-free approach.
4. Also in *jdk8, we have powerful tool `StampedLock`. In which version of Flink we will be able to use **jdk8* features?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/BrainLogic/flink
Alternatively you can review and apply these changes as the patch at:
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3035
Author: cube <email@example.com>
FLINK-4905 Kafka test instability IllegalStateException: Client is not started