Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6578

SharedBuffer creates self-loops when having elements with same value/timestamp.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      This is a test that fails with the current implementation due to the fact that the looping state accepts the two middleEvent1 elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory.

      @Test
      	public void testEagerZeroOrMoreSameElement() {
      		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
      
      		Event startEvent = new Event(40, "c", 1.0);
      		Event middleEvent1 = new Event(41, "a", 2.0);
      		Event middleEvent2 = new Event(42, "a", 3.0);
      		Event middleEvent3 = new Event(43, "a", 4.0);
      		Event end1 = new Event(44, "b", 5.0);
      
      		inputEvents.add(new StreamRecord<>(startEvent, 1));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
      		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(end1, 7));
      
      		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("c");
      			}
      		}).followedBy("middle").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("a");
      			}
      		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("b");
      			}
      		});
      
      		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
      
      		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
      
      		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, end1),
      				Lists.newArrayList(startEvent, end1)
      		));
      	}
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                kkl0u Kostas Kloudas
                Reporter:
                kkl0u Kostas Kloudas
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: