Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14012

Failed to start job for consuming Secure Kafka after the job cancel

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • 1.9.0
    • None
    • Connectors / Kafka
    • None
      • Kubernetes 1.13.2
      • Flink 1.9.0
      • Kafka client libary 2.2.0

    Description

      Hello, this is Daebeom Lee.

      Background

      I installed Flink 1.9.0 at this our Kubernetes cluster.

      We use Flink session cluster. - build fatJar file and upload it at the UI, run serval jobs.

      At first, our jobs are good to start.

      But, when we cancel some jobs, the job failed

      This is the error code.

      // code placeholder
      java.lang.NoClassDefFoundError: org/apache/kafka/common/security/scram/internals/ScramSaslClient
          at org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
          at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
          at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
          at java.security.AccessController.doPrivileged(Native Method)
          at javax.security.auth.Subject.doAs(Subject.java:422)
          at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
          at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
          at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
          at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
          at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
          at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
          at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
          at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
          at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
          at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
          at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
          at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
          at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
          at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
          at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
          at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
          at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
          at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
          at java.lang.Thread.run(Thread.java:748)
      

      Our workaround

      • I think that this is Flink JVM classloader issue.
      • Classloader unloads when job cancels by the way kafka client library is included fatJar.
      • So, I located Kafka client library to /opt/flink/lib 
        • /opt/flink/lib/kafka-clients-2.2.0.jar
      • And then all issue solved.
      • But there are weird points

       

      Suggestion

      • I'd like to know why this error occurred exactly reason after upgrade 1.9.0.
      • Does anybody know a better solution in this case?

       

      Thank you in advance.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Daebeom Daebeom Lee
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: