Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      Currently the CEP library had a somehow fuzzy way to handle late events. Essentially:
      1) it accepts all events (late and early)
      2) it sorts them based on event time
      3) whenever a watermark arrives, it feeds them into the NFA.

      This does not respect event time, as late events are still processed.

      In addition, given that the order in which elements are processed matters, this could lead to wrong results as events may be processed by the NFA out-of-order with respect to their timestamps.

      This issue proposes to assume correctness of the watermark and consider as late, events that arrive having a timestamp smaller than that of the last seen watermark. In addition, late events are not silently dropped, but the user can specify to send them to a side output, as done in the case of the WindowOperator.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3644

          FLINK-6205 FLINK-6069 [cep] Correct watermark/late events in side-output

          With this, the CEP library assumes correctness of the watermark
          and considers as late, events that arrive having a timestamp
          smaller than that of the last seen watermark. Late events are not
          silently dropped, but the user can specify to send them to a side
          output.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink late-element-cep

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3644.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3644


          commit 827bd85916206e36ed23564e77e5ff119bfadead
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-03-23T18:01:15Z

          FLINK-6205 FLINK-6069 [cep] Correct watermark/late events in side output.

          With this, the CEP library assumes correctness of the watermark
          and considers as late, events that arrive having a timestamp
          smaller than that of the last seen watermark. Late events are not
          silently dropped, but the user can specify to send them to a side
          output.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3644 FLINK-6205 FLINK-6069 [cep] Correct watermark/late events in side-output With this, the CEP library assumes correctness of the watermark and considers as late, events that arrive having a timestamp smaller than that of the last seen watermark. Late events are not silently dropped, but the user can specify to send them to a side output. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink late-element-cep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3644.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3644 commit 827bd85916206e36ed23564e77e5ff119bfadead Author: kl0u <kkloudas@gmail.com> Date: 2017-03-23T18:01:15Z FLINK-6205 FLINK-6069 [cep] Correct watermark/late events in side output. With this, the CEP library assumes correctness of the watermark and considers as late, events that arrive having a timestamp smaller than that of the last seen watermark. Late events are not silently dropped, but the user can specify to send them to a side output.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3644#discussion_r108741567

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java —
          @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception

          { updateNFA(nfa); }

          else {

          • getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
          • PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + // In event-time processing we assume correctness of the watermark.
            + // Events with timestamp smaller than the last seen watermark are considered late.
            + // Late events are put in a dedicated side output, if the user has specified one.
            +
            + if (element.getTimestamp() >= lastWatermark) {
          • // event time processing
          • // we have to buffer the elements until we receive the proper watermark
          • if (getExecutionConfig().isObjectReuseEnabled()) {
          • // copy the StreamRecord so that it cannot be changed
          • priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
            + // we have an event with a valid timestamp, so
            + // we buffer it until we receive the proper watermark.
            +
            + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
            +
            + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + }

            else

            { + priorityQueue.offer(element); + }

            + updatePriorityQueue(priorityQueue);
            } else

            { - priorityQueue.offer(element); + sideOutputLateElement(element); }
          • updatePriorityQueue(priorityQueue);
            + }
            + }
            +
            + private void updateLastSeenWatermark(Watermark watermark) {
            + this.lastWatermark = watermark.getTimestamp();
              • End diff –

          Shouldn't the lastWatermark be stored in a StateBackend? How will it behave after restoring from a checkpoint?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3644#discussion_r108741567 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java — @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception { updateNFA(nfa); } else { getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + // In event-time processing we assume correctness of the watermark. + // Events with timestamp smaller than the last seen watermark are considered late. + // Late events are put in a dedicated side output, if the user has specified one. + + if (element.getTimestamp() >= lastWatermark) { // event time processing // we have to buffer the elements until we receive the proper watermark if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + // we have an event with a valid timestamp, so + // we buffer it until we receive the proper watermark. + + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + updatePriorityQueue(priorityQueue); } else { - priorityQueue.offer(element); + sideOutputLateElement(element); } updatePriorityQueue(priorityQueue); + } + } + + private void updateLastSeenWatermark(Watermark watermark) { + this.lastWatermark = watermark.getTimestamp(); End diff – Shouldn't the lastWatermark be stored in a StateBackend? How will it behave after restoring from a checkpoint?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3644#discussion_r108742662

          — Diff: docs/dev/libs/cep.md —
          @@ -711,6 +711,57 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
          </div>
          </div>

          +### Handling Lateness in Event Time
          +
          +In `CEP` the order in which elements are processed matters. To this end and when working in event time, an incoming
          — End diff –

          I don't get the first part of the sentence : "To this end and when working in event time"

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3644#discussion_r108742662 — Diff: docs/dev/libs/cep.md — @@ -711,6 +711,57 @@ DataStream[Either [TimeoutEvent, ComplexEvent] ] result = patternStream.flatSelect </div> </div> +### Handling Lateness in Event Time + +In `CEP` the order in which elements are processed matters. To this end and when working in event time, an incoming — End diff – I don't get the first part of the sentence : "To this end and when working in event time"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3644#discussion_r108866698

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java —
          @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception

          { updateNFA(nfa); }

          else {

          • getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
          • PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + // In event-time processing we assume correctness of the watermark.
            + // Events with timestamp smaller than the last seen watermark are considered late.
            + // Late events are put in a dedicated side output, if the user has specified one.
            +
            + if (element.getTimestamp() >= lastWatermark) {
          • // event time processing
          • // we have to buffer the elements until we receive the proper watermark
          • if (getExecutionConfig().isObjectReuseEnabled()) {
          • // copy the StreamRecord so that it cannot be changed
          • priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
            + // we have an event with a valid timestamp, so
            + // we buffer it until we receive the proper watermark.
            +
            + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
            +
            + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + }

            else

            { + priorityQueue.offer(element); + }

            + updatePriorityQueue(priorityQueue);
            } else

            { - priorityQueue.offer(element); + sideOutputLateElement(element); }
          • updatePriorityQueue(priorityQueue);
            + }
            + }
            +
            + private void updateLastSeenWatermark(Watermark watermark) {
            + this.lastWatermark = watermark.getTimestamp();
              • End diff –

          Anyway would be nice to see a testcase like watermark(10) restoreFromCheckpoint event(7)

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3644#discussion_r108866698 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java — @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception { updateNFA(nfa); } else { getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + // In event-time processing we assume correctness of the watermark. + // Events with timestamp smaller than the last seen watermark are considered late. + // Late events are put in a dedicated side output, if the user has specified one. + + if (element.getTimestamp() >= lastWatermark) { // event time processing // we have to buffer the elements until we receive the proper watermark if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + // we have an event with a valid timestamp, so + // we buffer it until we receive the proper watermark. + + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + updatePriorityQueue(priorityQueue); } else { - priorityQueue.offer(element); + sideOutputLateElement(element); } updatePriorityQueue(priorityQueue); + } + } + + private void updateLastSeenWatermark(Watermark watermark) { + this.lastWatermark = watermark.getTimestamp(); End diff – Anyway would be nice to see a testcase like watermark(10) restoreFromCheckpoint event(7)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3644#discussion_r108958127

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java —
          @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception

          { updateNFA(nfa); }

          else {

          • getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
          • PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + // In event-time processing we assume correctness of the watermark.
            + // Events with timestamp smaller than the last seen watermark are considered late.
            + // Late events are put in a dedicated side output, if the user has specified one.
            +
            + if (element.getTimestamp() >= lastWatermark) {
          • // event time processing
          • // we have to buffer the elements until we receive the proper watermark
          • if (getExecutionConfig().isObjectReuseEnabled()) {
          • // copy the StreamRecord so that it cannot be changed
          • priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
            + // we have an event with a valid timestamp, so
            + // we buffer it until we receive the proper watermark.
            +
            + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
            +
            + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
            + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + }

            else

            { + priorityQueue.offer(element); + }

            + updatePriorityQueue(priorityQueue);
            } else

            { - priorityQueue.offer(element); + sideOutputLateElement(element); }
          • updatePriorityQueue(priorityQueue);
            + }
            + }
            +
            + private void updateLastSeenWatermark(Watermark watermark) {
            + this.lastWatermark = watermark.getTimestamp();
              • End diff –

          I believe that the `Watermark` handling should be the responsibility of the sources. Checkpointing it here would mean special logic for handling it in case of scaling down for example. In addition, when scaling down e.g from 2 tasks to 1 and the 2 tasks had different last watermarks, then we still may have issues with elements being processed out-of-order.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3644#discussion_r108958127 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java — @@ -196,19 +213,45 @@ public void processElement(StreamRecord<IN> element) throws Exception { updateNFA(nfa); } else { getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + // In event-time processing we assume correctness of the watermark. + // Events with timestamp smaller than the last seen watermark are considered late. + // Late events are put in a dedicated side output, if the user has specified one. + + if (element.getTimestamp() >= lastWatermark) { // event time processing // we have to buffer the elements until we receive the proper watermark if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + // we have an event with a valid timestamp, so + // we buffer it until we receive the proper watermark. + + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + updatePriorityQueue(priorityQueue); } else { - priorityQueue.offer(element); + sideOutputLateElement(element); } updatePriorityQueue(priorityQueue); + } + } + + private void updateLastSeenWatermark(Watermark watermark) { + this.lastWatermark = watermark.getTimestamp(); End diff – I believe that the `Watermark` handling should be the responsibility of the sources. Checkpointing it here would mean special logic for handling it in case of scaling down for example. In addition, when scaling down e.g from 2 tasks to 1 and the 2 tasks had different last watermarks, then we still may have issues with elements being processed out-of-order.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3644

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3644
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged with 48890285d4b1c285bebb971ae0dbfc310c6fcc0e

          Show
          kkl0u Kostas Kloudas added a comment - Merged with 48890285d4b1c285bebb971ae0dbfc310c6fcc0e

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development