Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6578

SharedBuffer creates self-loops when having elements with same value/timestamp.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.3.0
    • 1.3.0
    • Library / CEP
    • None

    Description

      This is a test that fails with the current implementation due to the fact that the looping state accepts the two middleEvent1 elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory.

      @Test
      	public void testEagerZeroOrMoreSameElement() {
      		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
      
      		Event startEvent = new Event(40, "c", 1.0);
      		Event middleEvent1 = new Event(41, "a", 2.0);
      		Event middleEvent2 = new Event(42, "a", 3.0);
      		Event middleEvent3 = new Event(43, "a", 4.0);
      		Event end1 = new Event(44, "b", 5.0);
      
      		inputEvents.add(new StreamRecord<>(startEvent, 1));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
      		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
      		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
      		inputEvents.add(new StreamRecord<>(end1, 7));
      
      		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("c");
      			}
      		}).followedBy("middle").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("a");
      			}
      		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
      			private static final long serialVersionUID = 5726188262756267490L;
      
      			@Override
      			public boolean filter(Event value) throws Exception {
      				return value.getName().equals("b");
      			}
      		});
      
      		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
      
      		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
      
      		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
      				Lists.newArrayList(startEvent, middleEvent1, end1),
      				Lists.newArrayList(startEvent, end1)
      		));
      	}
      

      Attachments

        Issue Links

          Activity

            People

              kkl0u Kostas Kloudas
              kkl0u Kostas Kloudas
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: