Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22190

no guarantee on Flink exactly_once sink to Kafka

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.12.2
    • Fix Version/s: None
    • Component/s: API / DataStream
    • Environment:

      flink: 1.12.2

      kafka: 2.7.0

      Description

      When I tried to test the function of flink exactly_once sink to kafka, I found it can not run as expectation.  here's the pipline of the flink applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka topic2 -> flink app2, flink tasks may met / byZeroException in random. Below shows the codes:

      //代码占位符
      raw data, flink app0:
      class SimpleSource1 extends SourceFunction[String] {
       var switch = true
       val students: Array[String] = Array("Tom", "Jerry", "Gory")
      
       override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
       var i = 0
       while (switch) {
       sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
       i += 1
       Thread.sleep(5000)
       }
       }
       override def cancel(): Unit = switch = false
      }
      val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
      val dataStream = streamEnv.addSource(new SimpleSource1)
      
      dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", "single-partition-topic-2", new SimpleStringSchema()))
      
      streamEnv.execute("sink kafka")
       
      flink-app1:
      val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
      streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
      val prop = new Properties()
      prop.setProperty("bootstrap.servers", "xfy:9092")
      prop.setProperty("group.id", "test")
      val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
       "single-partition-topic-2",
       new SimpleStringSchema,
       prop
      ))
      val resultStream = dataStream.map(x => {
       val data = x.split(",")
       (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
      }
      )
      resultStream.print().setParallelism(1)
      val propProducer = new Properties()
      propProducer.setProperty("bootstrap.servers", "xfy:9092")
      propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
      resultStream.addSink(new FlinkKafkaProducer[String](
       "single-partition-topic",
       new MyKafkaSerializationSchema("single-partition-topic"),
       propProducer,
       Semantic.EXACTLY_ONCE))
      streamEnv.execute("sink kafka")
       
      flink-app2:
      val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
      val prop = new Properties()
      prop.setProperty("bootstrap.servers", "xfy:9092")
      prop.setProperty("group.id", "test")
      prop.setProperty("isolation_level", "read_committed")
      val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
       "single-partition-topic",
       new SimpleStringSchema,
       prop
      ))
      dataStream.print().setParallelism(1)
      streamEnv.execute("consumer kafka")

       

      flink app1 will print some duplicate numbers, and to my expectation flink app2 will deduplicate them but the fact shows not.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              SpongebobZ Spongebob
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: