GitHub user tillrohrmann opened a pull request:
FLINK-6078 Remove CuratorFramework#close calls from ZooKeeper based HA services
This PR is based on #3622.
The main goal of this PR is to prevent the ZooKeeper based leader election and retrieval services from closing the underlying `CuratorFramework` instance when a election/retrieval service is closed. This will allow to share a single `CuratorFramework` instance among multiple election/retrieval services. This is a strict requirement for the Flip-6 work where all election/retrieval services are created by a `HighAvailabilityServices` implementation which shares the `CuratorFramework` among the created services. The respective changes can be found in the `ZooKeeperLeader[Election, Retrieval]Service` classes.
In the existing code we now use as well an instance of `HighAvailabilityServices` in order to create the election/retrieval services and to manage the `CuratorFramework` instances. The respective changes are contained in `JobManager.scala:2036`, `TaskManager.scala:1643`, `MesosApplicationMasterRunner.java:299` and `YarnApplicationMasterRunner.java:343`.
In order to create `Leader[Retrieval, Election]Services` for the `JobManager`, we need to provide a `JobID` to the `HighAvailabilityServices`. Since there is no such `JobID` defined a priori for a `JobManager`, we have introduced the `HighAvailabilityServices.DEFAULT_JOB_ID` which is to be used with the old distributed components.
We also changed the `FlinkMiniCluster` to use the `EmbeddedHaServices` or the `ZooKeeperHaServices` in case of HA. The former service has HA like capabilities which allow to dynamically elect new leaders and notify retrievers about these changes. This allows to write better integration tests. The downside is that we can no longer connect via a `RemoteExecutionEnvironment` to a `FlinkMiniCluster`, because there is no way to obtain the current leader session id remotely. In order to execute Flink jobs on the `FlinkMiniCluster`, we have extended the `TestEnvironment` and the `TestStreamEnvironment` to be used in combination with the changed `FlinkMiniCluster`.
Most of the remaining changes adapt test cases to use the `EmbeddedHaServices` or to work with the changed `FlinkMiniCluster` implementation.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink refactorZooKeeperServices
Alternatively you can review and apply these changes as the patch at:
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3781