Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20617

Kafka Consumer Deserializer Exception on application mode

    XMLWordPrintableJSON

Details

    Description

      Kafka source may has some issues on application mode
       
      when i run it with application mode on  flink 1.11.2 it can't startup
      the detail Excetion is:
      org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
          at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789)
          at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643)
          at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
          at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
          at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
          at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
          at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
          at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
          at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:688)
          ... 15 more
      The pom is:
      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
      <exclusions>
      <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      </exclusion>
      <exclusion>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      </exclusion>
      </exclusions>
      </dependency>
      <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.1</version>
      </dependency>

      Attachments

        1. taskmanager.out
          1.60 MB
          Georger

        Activity

          People

            Unassigned Unassigned
            Georger Georger
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: