Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6290

SharedBuffer is improperly released when multiple edges between entries

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      Below test right now fails:

      	@Test
      	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
      		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
      		int numberEvents = 8;
      		Event[] events = new Event[numberEvents];
      		final long timestamp = 1L;
      
      		for (int i = 0; i < numberEvents; i++) {
      			events[i] = new Event(i + 1, "e" + (i + 1), i);
      		}
      
      		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
      		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
      		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
      		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
      		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
      		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
      
      		//simulate IGNORE (next event can point to events[2])
      		sharedBuffer.lock("branching", events[2], timestamp);
      
      		sharedBuffer.release("branching", events[4], timestamp);
      
      		//There should be still events[1] and events[2] in the buffer
      		assertFalse(sharedBuffer.isEmpty());
      	}
      

      The problem is with the SharedBuffer#internalRemove method:

      private void internalRemove(final SharedBufferEntry<K, V> entry) {
      		Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
      		entriesToRemove.add(entry);
      
      		while (!entriesToRemove.isEmpty()) {
      			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
      
      			if (currentEntry.getReferenceCounter() == 0) {
      				currentEntry.remove();
      
      				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
      					if (edge.getTarget() != null) {
      						edge.getTarget().decreaseReferenceCounter();
      						entriesToRemove.push(edge.getTarget());
      					}
      				}
      			}
      		}
      	}
      

      When currentEntry has multiple edges to the same entry. The entry will be added twice to the entriesToRemove and it's edges will be removed twice and the second edge can potentially change the referenceCounter for both of those entries to 0. Resulting in removing this entry twice.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                dawidwys Dawid Wysakowicz
                Reporter:
                dawidwys Dawid Wysakowicz
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: