Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29494

ChangeLogNormalize operator causes unexpected firing of past windows after state restoration

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.14.2
    • None
    • None
    • Flink version: 1.14.2

      API: Flink SQL

    Description

      Issue Summary:

      While doing GroupWindowAggregation on stream produced by `upsert-kafka` connector, I am facing an unexpected behaviour, where restoring a job from checkpoint/savepoint is causing past windows(wrt last watermark generated by previous job run) to fire.

      Detailed Description: 

      My program is written in Flink SQL.

      Watermark Strategy: max-bounded-out-of-orderness with periodic generation (with default 200ms interval)

      Rowtime field: `updated_at_ts` which is monotonically increasing field in changelog stream produced by debezium.

      Below is the runtime topology of Flink Job

      Kafka Source (upsert mode) >>  ChangeLogNormalize >> GroupWindowAggregate >> PostgresSink

      Job Logic Context:
      I am reading a cdc-stream from kafka and record schema looks something like this:

      (pk, loan_acc_no, status, created_at, updated_at, __op).

      Now I want to count number of distinct loan_acc_no with hourly window. So I have created watermark on updated_at field and hence tumbling also on updated_at

      Usual scenario which triggers unexpected late windows:

      Now suppose that for the previous job run, the latest running window was 2022-09-10 08:59:59 (win_end time) and job had processed events till 08:30.

      Now upon restarting a job, suppose I got a first cdc event like (pk1, loan_1, "approved", 2022-09-02 00:00:002022-09-10 08:45:00, "u")  say it E1, which is not a late event wrt the last watermark generated by source operator in previous job run.

      Now there is ChangeLogNormalize operator in between kafka source and window operator. So, when kafka source forwards this E1 to ChangeLogNormalize, it will emit two records which will be of type -U and +U, and will be passed as input to window operator.

       -U (pk1, loan_1, "pending", 2022-09-02 00:00:002022-09-05 00:00:00, "u") => previous state of record with key `pk1`
      +U (pk1, loan_1, "approved", 2022-09-02 00:00:002022-09-10 08:45:00, "u") => same as E1

      So this -U type of events are causing the problem since their updated_at can be of any timestamp in the past and we are tumbling on this field. As per periodic watermarks, during the first watermark interval (i.e 200 ms default), no events will be considered late, so these -U events will create their window state and upon receiving first high watermark, windows created by these events will fire.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rashminpatel405 Rashmin Patel
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: