Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25979

Suspicious Classloading error during close of KafkaEnumerator

    XMLWordPrintableJSON

Details

    Description

      A user reported kafka logging a warning when the KafkaEnumerator was being closed.

      2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils                          [] - Failed to close KafkaClient with type org.apache.kafka.clients.NetworkClient
      java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode
              at org.apache.kafka.common.network.Selector.close(Selector.java:806) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
              at org.apache.kafka.common.network.Selector.close(Selector.java:365) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
              at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
              at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
              at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
              at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Selector$CloseMode
              at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?]
              at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
              at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
              at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
              at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
              at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
              ... 6 more
      2022-02-04 15:16:30,802 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka Source -> Sink: Print to Std. Out closed.
      
      KafkaSource<String> source = KafkaSource
              .<String>builder()
              .setBootstrapServers(brokers)
      .setGroupId(groupId)
      .setTopics(kafkaInputTopic)
              .setValueOnlyDeserializer(new SimpleStringSchema())
      //.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
      .setStartingOffsets(OffsetsInitializer.earliest())
              .setBounded(OffsetsInitializer.latest())
              .build();
      
      //withIdleness.duration()
      //env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
      DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
      

      This error is overall a bit weird; I don't think I've ever seen a class being unable to load one of it's inner classes. intuitively I would think this is caused by the classloader being closed prematurely.
      ds.print();

      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            chesnay Chesnay Schepler
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated: