Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15262

kafka connector doesn't read from beginning immediately when 'connector.startup-mode' = 'earliest-offset'

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.0
    • Fix Version/s: 1.11.0, 1.10.2
    • Component/s: Connectors / Kafka
    • Labels:
      None

      Description

      I created a kafka table in Flink to read from my kakfa topic (already has messages in it) in earliest offset, but `select * from test` query in Flink doesn't start to read until a new message comes. If no new message arrives, the query just sit there and never produce result.

      What I expect is that the query should immediate produce result on all existing message without having to wait for a new message to "trigger" data processing.

      DDL that I used according to DDL document at https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

      create table test(name String) with (
         'connector.type' = 'kafka',
         'connector.version' = 'universal',
         'connector.topic' = 'test',
         'connector.properties.zookeeper.connect' = 'localhost:2181',
         'connector.properties.bootstrap.servers' = 'localhost:9092',
         'connector.startup-mode' = 'earliest-offset',
         'format.type' = 'csv',
         'update-mode' = 'append'
      );
      

      repro steps:
      1) start a local kafka cluster following https://kafka.apache.org/quickstart with a topic named "test"
      2) produce some records in kafka with simple strings as "john", "marry", etc, into the topic
      3) start flink sql cli, add kafka cli dependency, create a Flink table as

      create table test(name String) with (
         'connector.type' = 'kafka',
         'connector.version' = 'universal',
         'connector.topic' = 'test',
         'connector.properties.zookeeper.connect' = 'localhost:2181',
         'connector.properties.bootstrap.servers' = 'localhost:9092',
         'connector.startup-mode' = 'earliest-offset',
         'format.type' = 'csv'
      );
      

      4) run "select * from test" in SQL CLI

      Expected: upon running the query, we should immediately see records already in kafka, like "john" and "marry"

      Reality: upon running the query, no record shows up. we have to produce some new records like "kitty" into the kafka topic to be able to see old records "john" and "marry"

        Attachments

          Activity

            People

            • Assignee:
              becket_qin Jiangjie Qin
              Reporter:
              phoenixjiangnan Bowen Li
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated: