Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30562

CEP Operator misses patterns on SideOutputs and parallelism >1 since 1.15.x+

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.16.0, 1.15.3
    • None
    • None
    • Problem observed in:

      Production:
      Dockerised Flink cluster running in AWS Fargate, sourced from AWS Kinesis and sink to AWS SQS

      Local:
      Completely local MiniCluster based test with no external sinks or sources

    Description

      (Apologies for the speculative and somewhat vague ticket, but I wanted to raise this while I am investigating to see if anyone has suggestions to help me narrow down the problem.)

      We are encountering an issue where our streaming Flink job has stopped working correctly since Flink 1.15.3. This problem is also present on Flink 1.16.0. The Keyed CEP operators that our job uses are no longer emitting Patterns reliably, but critically this is only happening when parallelism is set to a value greater than 1.

      Our local build tests were previously set up using in-JVM `MiniCluster` instances, or dockerised Flink clusters all set with a parallelism of 1, so this problem was not caught and it caused an outage when we upgraded the cluster version in production.

      Observing the job using the Flink console in production, I can see that events are arriving into the Keyed CEP operators, but no Pattern events are being emitted out of any of the operators. Furthermore, all the reported Watermark values are zero, though I don't know if that is a red herring as it seems Watermark reporting seems to have changed since 1.14.x.

      I am currently attempting to create a stripped down version of our streaming job to demonstrate the problem, but this is quite tricky to set up. In the meantime I would appreciate any hints that could point me in the right direction.

      I have isolated the problem to the Keyed CEP operator by removing our real sinks and sources from the failing test. I am still seeing the erroneous behaviour when setting up a job as:

      1. Events are read from a list using `env.fromCollection( ... )`
      2. CEP operator processes events
      3. Output is captured in another list for assertions

      My best guess at the moment is something to do with Watermark emission? There seems to have been changes related to watermark alignment, perhaps this has caused some kind of regression in the CEP library? To reiterate, this problem only occurs with parallelism of 2 or more. Setting the parallelism to 1 immediately fixes the issue

      Attachments

        1. flink-asf-30562-clean.zip
          205 kB
          Thomas Wozniakowski

        Activity

          People

            Unassigned Unassigned
            Jamalarm Thomas Wozniakowski
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: