Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18352

org.apache.flink.core.execution.DefaultExecutorServiceLoader not thread safe

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The singleton nature of the  org.apache.flink.core.execution.DefaultExecutorServiceLoader class is not thread-safe due to the fact that java.util.ServiceLoader class is not thread-safe.

      https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/ServiceLoader.html#Concurrency

       

      This can result in ServiceLoader class entering into an inconsistent state for processes which attempt to self-heal. This then requires bouncing the process/container in the hopes the race condition does not re-occur.

      https://stackoverflow.com/questions/60391499/apache-flink-cannot-find-compatible-factory-for-specified-execution-target-lo

       

      Additionally the following stack traces have been seen when using a org.apache.flink.streaming.api.environment.RemoteStreamEnvironment instances.

      java.lang.ArrayIndexOutOfBoundsException: 2
          at sun.misc.CompoundEnumeration.nextElement(CompoundEnumeration.java:61)
          at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:357)
          at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
          at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
          at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:60)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1724)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1706)
      

       

      java.util.NoSuchElementException: null
          at sun.misc.CompoundEnumeration.nextElement(CompoundEnumeration.java:59)
          at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:357)
          at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
          at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
          at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:60)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1724)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1706)
      

      The workaround for using the **StreamExecutionEnvironment implementations is to write a custom implementation of DefaultExecutorServiceLoader which is thread-safe and pass that to the StreamExecutionEnvironment constructors.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kkl0u Kostas Kloudas
            mklein0 Marcos Klein
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment