Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20947

Idle source doesn't work when pushing watermark into the source

    XMLWordPrintableJSON

    Details

      Description

      I use table sql to create stream with kafka source, and write data from Kafka into a Hive partitioned table.

      The following sql to create kafka table:

      // code placeholder
      tableEnv.executeSql(
          "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` STRING,`payload` STRING,`timestamp` BIGINT, " +
                  "  procTime AS PROCTIME()," +
                  "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss'))," +
                  "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" +
                  "  WITH ('connector' = 'kafka'," +
                  " 'topic' = 'XXX-topic'," +
                  " 'properties.bootstrap.servers'='kafka-server:9092'," +
                  " 'properties.group.id' = 'XXX-group_id'," +
                  " 'scan.startup.mode' = 'latest-offset'," +
                  " 'format' = 'json'," +
                  " 'json.fail-on-missing-field' = 'false'," +
                  " 'json.ignore-parse-errors' = 'true' )"
                  );

        

      And, the following sql to create Hive table:

      // code placeholder
      tableEnv.executeSql(
           "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
                   " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute STRING)" +
                   " STORED AS PARQUET TBLPROPERTIES (" +
                   " 'sink.partition-commit.trigger' = 'partition-time'," +
                   " 'sink.partition-commit.delay' = '1 min'," +
                   " 'sink.partition-commit.policy.kind'='metastore,success-file'," +
                   " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
                   " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00')");
      

       

      For the kafka topic used above,  which has multi partitions,  with some of the partitions there's data coming in, while other partitions with no data coming.

      I noticed that there's config "table.exec.source.idle-timeout" can handle the situation for the "idle" source partition, but event though set this config, it still cannot trigger the Hive partition commit (that means the "_SUCCESS" file will not be created for the partition).

      After do the analysis for this issue, find the root cause is that the watermark for the "idle" partition will not advance, which cause the Hive partition cannot be committed.

       

       

        Attachments

          Activity

            People

            • Assignee:
              fsk119 Shengkai Fang
              Reporter:
              weijiaxu Weijia Xu
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: