Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2008

PersistentKafkaSource is sometimes emitting tuples multiple times

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9
    • 0.10.0
    • Connectors / Kafka
    • None

    Description

      The PersistentKafkaSource is expected to emit records exactly once.

      Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times.
      Affected tests:
      testPersistentSourceWithOffsetUpdates(), after the offsets have been changed manually in ZK:

      java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2]
      

      brokerFailureTest() also fails:

      05/13/2015 08:13:16	Custom source -> Stream Sink(1/1) switched to FAILED 
      java.lang.AssertionError: Received tuple with value 21 twice
      	at org.junit.Assert.fail(Assert.java:88)
      	at org.junit.Assert.assertTrue(Assert.java:41)
      	at org.junit.Assert.assertFalse(Assert.java:64)
      	at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
      	at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
      	at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
      	at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
      	at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
      	at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
      	at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
      	at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
      	at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      	at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Activity

          People

            rmetzger Robert Metzger
            rmetzger Robert Metzger
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: