Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18352

org.apache.flink.core.execution.DefaultExecutorServiceLoader not thread safe

    XMLWordPrintableJSON

    Details

      Description

      The singleton nature of the  org.apache.flink.core.execution.DefaultExecutorServiceLoader class is not thread-safe due to the fact that java.util.ServiceLoader class is not thread-safe.

      https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/ServiceLoader.html#Concurrency

       

      This can result in ServiceLoader class entering into an inconsistent state for processes which attempt to self-heal. This then requires bouncing the process/container in the hopes the race condition does not re-occur.

      https://stackoverflow.com/questions/60391499/apache-flink-cannot-find-compatible-factory-for-specified-execution-target-lo

       

      Additionally the following stack traces have been seen when using a org.apache.flink.streaming.api.environment.RemoteStreamEnvironment instances.

      java.lang.ArrayIndexOutOfBoundsException: 2
          at sun.misc.CompoundEnumeration.nextElement(CompoundEnumeration.java:61)
          at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:357)
          at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
          at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
          at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:60)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1724)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1706)
      

       

      java.util.NoSuchElementException: null
          at sun.misc.CompoundEnumeration.nextElement(CompoundEnumeration.java:59)
          at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:357)
          at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
          at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
          at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:60)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1724)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1706)
      

      The workaround for using the **StreamExecutionEnvironment implementations is to write a custom implementation of DefaultExecutorServiceLoader which is thread-safe and pass that to the StreamExecutionEnvironment constructors.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                kkl0u Kostas Kloudas
                Reporter:
                mklein0 Marcos Klein
              • Votes:
                1 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: