Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6578

SharedBuffer creates self-loops when having elements with same value/timestamp.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      This is a test that fails with the current implementation due to the fact that the looping state accepts the two middleEvent1 elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory.

      @Test
      	public void testEagerZeroOrMoreSameElement() {
      		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
      
      		Event startEvent = new Event(40, "c", 1.0);
      		Event middleEvent1 = new Event(41, "a", 2.0);
      		Event middleEvent2 = new Event(42, "a", 3.0);
      		Event middleEvent3 = new Event(43, "a", 4.0);
      		Event end1 = new Event(44, "b", 5.0);
      
      		inputEvents.add(new StreamRecord<>(startEvent, 1));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
      		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(end1, 7));
      
      		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("c");
      			}
      		}).followedBy("middle").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("a");
      			}
      		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("b");
      			}
      		});
      
      		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
      
      		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
      
      		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, end1),
      				Lists.newArrayList(startEvent, end1)
      		));
      	}
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3909

          FLINK-6578 Fix self-loop handling in SharedBuffer.

          This RP addresses: FLINK-6371, FLINK-6536, FLINK-6255, and FLINK-6578.

          R @dawidwys

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-uber-pr

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3909.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3909


          commit dd1784e20baeab97c6af36bd110cff95a2aa1731
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-05-05T11:55:07Z

          FLINK-6371 [cep] NFA return matched patterns as Map<String, List<T>>.

          commit bd7b5270a37a3f134e214aea0e17f1efd9395bbf
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-05-11T09:39:00Z

          FLINK-6536 [cep] Improve error message in SharedBuffer::put().

          commit 72a23bd98cee6a94d31c31b2ea8ff0ddb9757186
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-05-12T14:01:38Z

          FLINK-6255 [cep] Remove PatternStream.getSideOutput().

          commit 96299aa50307119f7bc0654c3ec2acb00f06c68f
          Author: kkloudas <kkloudas@gmail.com>
          Date: 2017-05-15T12:33:09Z

          FLINK-6578 [cep] Fix self-loop handling in SharedBuffer.

          commit 07f73391e5c80afda9800b4dc2cde41da1a37acf
          Author: kkloudas <kkloudas@gmail.com>
          Date: 2017-05-15T12:49:00Z

          [hotfix] [cep] Remove unused keySelector in operator.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3909 FLINK-6578 Fix self-loop handling in SharedBuffer. This RP addresses: FLINK-6371 , FLINK-6536 , FLINK-6255 , and FLINK-6578 . R @dawidwys You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-uber-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3909 commit dd1784e20baeab97c6af36bd110cff95a2aa1731 Author: kl0u <kkloudas@gmail.com> Date: 2017-05-05T11:55:07Z FLINK-6371 [cep] NFA return matched patterns as Map<String, List<T>>. commit bd7b5270a37a3f134e214aea0e17f1efd9395bbf Author: kl0u <kkloudas@gmail.com> Date: 2017-05-11T09:39:00Z FLINK-6536 [cep] Improve error message in SharedBuffer::put(). commit 72a23bd98cee6a94d31c31b2ea8ff0ddb9757186 Author: kl0u <kkloudas@gmail.com> Date: 2017-05-12T14:01:38Z FLINK-6255 [cep] Remove PatternStream.getSideOutput(). commit 96299aa50307119f7bc0654c3ec2acb00f06c68f Author: kkloudas <kkloudas@gmail.com> Date: 2017-05-15T12:33:09Z FLINK-6578 [cep] Fix self-loop handling in SharedBuffer. commit 07f73391e5c80afda9800b4dc2cde41da1a37acf Author: kkloudas <kkloudas@gmail.com> Date: 2017-05-15T12:49:00Z [hotfix] [cep] Remove unused keySelector in operator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3909#discussion_r116700008

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -83,26 +84,29 @@ public SharedBuffer(final TypeSerializer<V> valueSerializer) {

          • @param previousTimestamp Timestamp of the value for the previous relation
          • @param version Version of the previous relation
            */
          • public void put(
            + public int put(
              • End diff –

          missing javadoc for return

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3909#discussion_r116700008 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -83,26 +84,29 @@ public SharedBuffer(final TypeSerializer<V> valueSerializer) { @param previousTimestamp Timestamp of the value for the previous relation @param version Version of the previous relation */ public void put( + public int put( End diff – missing javadoc for return
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3909#discussion_r116699827

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -593,15 +596,24 @@ private boolean checkFilterCondition(ComputationState<T> computationState, Itera
          return condition == null || condition.filter(event, computationState.getConditionContext());
          }

          + /**
          + * Extracts all the sequences of events from the start to the given computation state. An event
          — End diff –

          With the `Preconditions.checkState(paths.size() <= 1);` all = 1

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3909#discussion_r116699827 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -593,15 +596,24 @@ private boolean checkFilterCondition(ComputationState<T> computationState, Itera return condition == null || condition.filter(event, computationState.getConditionContext()); } + /** + * Extracts all the sequences of events from the start to the given computation state. An event — End diff – With the `Preconditions.checkState(paths.size() <= 1);` all = 1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3909#discussion_r116700283

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -256,8 +271,8 @@ public void prune(long pruningTimestamp) {

          • @param value Value to lock
          • @param timestamp Timestamp of the value to lock
            */
          • public void lock(final K key, final V value, final long timestamp) {
          • SharedBufferEntry<K, V> entry = get(key, value, timestamp);
            + public void lock(final K key, final V value, final long timestamp, int counter) {
              • End diff –

          missing javadoc for counter

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3909#discussion_r116700283 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -256,8 +271,8 @@ public void prune(long pruningTimestamp) { @param value Value to lock @param timestamp Timestamp of the value to lock */ public void lock(final K key, final V value, final long timestamp) { SharedBufferEntry<K, V> entry = get(key, value, timestamp); + public void lock(final K key, final V value, final long timestamp, int counter) { End diff – missing javadoc for counter
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3909#discussion_r116700334

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -271,8 +286,8 @@ public void lock(final K key, final V value, final long timestamp) {

          • @param value Value to release
          • @param timestamp Timestamp of the value to release
            */
          • public void release(final K key, final V value, final long timestamp) {
          • SharedBufferEntry<K, V> entry = get(key, value, timestamp);
            + public void release(final K key, final V value, final long timestamp, int counter) {
              • End diff –

          missing javadoc for counter

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3909#discussion_r116700334 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -271,8 +286,8 @@ public void lock(final K key, final V value, final long timestamp) { @param value Value to release @param timestamp Timestamp of the value to release */ public void release(final K key, final V value, final long timestamp) { SharedBufferEntry<K, V> entry = get(key, value, timestamp); + public void release(final K key, final V value, final long timestamp, int counter) { End diff – missing javadoc for counter
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3909#discussion_r116700094

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -114,16 +118,16 @@ public void put(

          • @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
          • @param version Version of the previous relation
            */
          • public void put(
            + public int put(
              • End diff –

          missing javadoc for return

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3909#discussion_r116700094 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -114,16 +118,16 @@ public void put( @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) @param version Version of the previous relation */ public void put( + public int put( End diff – missing javadoc for return
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged at 8e4db423b79580de0cf66e905f8a66c12ea3748a in master.

          Show
          kkl0u Kostas Kloudas added a comment - Merged at 8e4db423b79580de0cf66e905f8a66c12ea3748a in master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3909

          Merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3909 Merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3909

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3909
          Hide
          yuzhihong@gmail.com Ted Yu added a comment -

          SharedBuffer.java isn't shown in the PR, so allow me to comment here.

              public int hashCode() {
                return (int) (31 * (timestamp ^ timestamp >>> 32) + 31 * value.hashCode()) + counter;
          

          Multiplier of 31 is applied to both (timestamp ^ timestamp >>> 32) and value.hashCode().
          The following is probably the right expression:

                return (int) 31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter;
          
          Show
          yuzhihong@gmail.com Ted Yu added a comment - SharedBuffer.java isn't shown in the PR, so allow me to comment here. public int hashCode() { return ( int ) (31 * (timestamp ^ timestamp >>> 32) + 31 * value.hashCode()) + counter; Multiplier of 31 is applied to both (timestamp ^ timestamp >>> 32) and value.hashCode(). The following is probably the right expression: return ( int ) 31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter;

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development