Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29927

AkkaUtils#getAddress may cause memory leak




      We found a slow memory leak in JM. When MetricFetcherImpl tries to retrieve metrics, it always call MetricQueryServiceRetriever#retrieveService first. And the method will acquire the address of a task manager, which will use AkkaUtil#getAddress internally. While the getAddress method is implemented like this:

          public static Address getAddress(ActorSystem system) {
              return new RemoteAddressExtension().apply(system).getAddress();

      and the RemoteAddressExtension#apply is like this:

        def apply(system: ActorSystem): T = {
          java.util.Objects.requireNonNull(system, "system must not be null!").registerExtension(this)

      This means every call of AkkaUtils#getAddress will register a new extension to the ActorSystem, and can never be released until the ActorSystem exits.

      Most of the usage of the method are called only once while initializing, but as described above, MetricFetcherImpl will also use the method. It can happens periodically while users open the WebUI, or happens when the users call the RESTful API directly to get metrics. This means the memory may keep leaking.

      The leak may be introduced in FLINK-23662 when porting the scala version of AkkaUtils to the java one, while I'm not sure if the scala version has the same issue.

      The leak seems very slow. We observed it on a job running for more than one month with only 1G memory for job manager. So I suppose it's not an emergency one but still needs to fix.


        Issue Links



              chesnay Chesnay Schepler
              pltbkd Gen Luo
              0 Vote for this issue
              5 Start watching this issue