Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20273

Fix Table api Kafka connector Sink Partitioner Document Error

    XMLWordPrintableJSON

Details

    Description

      The doc tells us that the kafka sink uses fixed partitioner by default. However, in my local test, the it uses sticky partitioner to get the record partition id if key is not set.

      You can add the test in the KafkaTableITCase, the code follows

      public void testKafkaSourceSinkWithDefaultPartitioner() throws Exception {
      		if (isLegacyConnector) {
      			return;
      		}
      		// we always use a different topic name for each parameterized topic,
      		// in order to make sure the topic can be created.
      		final String topic = "key_full_value_topic_" + format;
      		createTestTopic(topic, 3, 1);
      
      		// ---------- Produce an event time stream into Kafka -------------------
      		String groupId = standardProps.getProperty("group.id");
      		String bootstraps = standardProps.getProperty("bootstrap.servers");
      
      		// compared to the partial value test we cannot support both k_user_id and user_id in a full
      		// value due to duplicate names after key prefix stripping,
      		// fields are reordered on purpose,
      		// fields for keys and values are overlapping
      		final String createSourceTable = String.format(
      				"CREATE TABLE kafkaSource (\n"
      						+ "  `user_id` BIGINT,\n"
      						+ "  `name` STRING,\n"
      						+ "  `partition` INT METADATA"
      						+ ") WITH (\n"
      						+ "  'connector' = 'kafka',\n"
      						+ "  'topic' = '%s',\n"
      						+ "  'properties.bootstrap.servers' = '%s',\n"
      						+ "  'properties.group.id' = '%s',\n"
      						+ "  'scan.startup.mode' = 'earliest-offset',\n"
      						+ "  'format' = '%s'\n"
      						+ ")",
      				topic,
      				bootstraps,
      				groupId,
      				format);
      		final String createSinkTable = String.format(
      				"CREATE TABLE kafkaSink (\n"
      						+ "  `user_id` BIGINT,\n"
      						+ "  `name` STRING\n"
      						+ ") WITH (\n"
      						+ "  'connector' = 'kafka',\n"
      						+ "  'topic' = '%s',\n"
      						+ "  'properties.bootstrap.servers' = '%s',\n"
      						+ "  'properties.group.id' = '%s',\n"
      						+ "  'scan.startup.mode' = 'earliest-offset',\n"
      						+ "  'format' = '%s'\n"
      						+ ")",
      				topic,
      				bootstraps,
      				groupId,
      				format);
      
      		tEnv.executeSql(createSourceTable);
      		tEnv.executeSql(createSinkTable);
      
      		String initialValues = "INSERT INTO kafkaSink\n"
      									+ "VALUES\n"
      									+ " (1, 'name 1'),\n"
      									+ " (2, 'name 2'),\n"
      									+ " (3, 'name 3')";
      		tEnv.executeSql(initialValues).await();
      
      		initialValues = "INSERT INTO kafkaSink\n"
      				+ "VALUES\n"
      				+ " (4, 'name 4'),\n"
      				+ " (5, 'name 5'),\n"
      				+ " (6, 'name 6')";
      		tEnv.executeSql(initialValues).await();
      
      		initialValues = "INSERT INTO kafkaSink\n"
      				+ "VALUES\n"
      				+ " (7, 'name 7'),\n"
      				+ " (8, 'name 8'),\n"
      				+ " (9, 'name 9')";
      		tEnv.executeSql(initialValues).await();
      
      
      		// ---------- Consume stream from Kafka -------------------
      
      		final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM kafkaSource"), 9);
      
      
      
      		// ------------- cleanup -------------------
      
      		deleteTestTopic(topic);
      	}
      

      The test will use the kafka default partitioner and sends record to kafka topic. After insert, we can read the record with the parititon id. If it uses the fixed partitioner, all records will has the same partition id. I repeat the test 3 times and the results are

      // the first result
      <1,name 1,1>
      <2,name 2,1>
      <3,name 3,1>
      <7,name 7,1>
      <4,name 4,0>
      <5,name 5,0>
      <6,name 6,0>
      <8,name 8,0>
      <9,name 9,0>
      // the second result
      <1,name 1,1>
      <2,name 2,1>
      <3,name 3,1>
      <4,name 4,0>
      <5,name 5,0>
      <6,name 6,0>
      <7,name 7,0>
      <8,name 8,0>
      <9,name 9,0>
      // the third result
      <9,name 9,2>
      <1,name 1,0>
      <2,name 2,0>
      <3,name 3,0>
      <4,name 4,0>
      <5,name 5,0>
      <6,name 6,0>
      <7,name 7,1>
      <8,name 8,1>
      

      The last column is the partition-id and we have 3 partitions in the test. The results show the default partitioner is sticky paritioner rather than fixed partitioner.

      By the way, the sink partitioning section in the doc only works when the key is null. If we set the key fields, the round-robin strategy will not work.

      Attachments

        Activity

          People

            fsk119 Shengkai Fang
            fsk119 Shengkai Fang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: