Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18575

Failed to send data to Kafka

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.10.0
    • None
    • Connectors / Kafka
    • None
    • kafka has some error,its not about flink

    Description

      Flink version: 1.10.0
      Kafka version: 2.2

      code:

       private def producerKafka(aggs_result: DataStream[String], topic: String, parallelism: Int) = {
          val kafkaPro = new Properties()
          kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
          kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
          kafkaPro.setProperty("request.timeout.ms", "10000")
          kafkaPro.setProperty("compression.type", "snappy")
          kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
          // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
          kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
          val kafka = new FlinkKafkaProducer[String](topic, new ResultDtSerialization(topic), kafkaPro, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
          aggs_result.addSink(kafka).setParallelism(parallelism)
        }
      

      *when i use this code to produce to kafka ,its report a Error :
      *

      2020-07-13 10:25:47,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
      org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 1
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 1
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
      	... 8 more
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            mzz_q mzz
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: