Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10809

Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state after restore

    XMLWordPrintableJSON

    Details

      Description

      I've tried using DataStreamUtils.reinterpretAsKeyedStream for results of windowed aggregation:

      		DataStream<Tuple2<Integer, List<Event>>> eventStream4 = eventStream2.keyBy(Event::getKey)
      			.window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), Time.milliseconds(150)))
      			.apply(new WindowFunction<Event, Tuple2<Integer, List<Event>>, Integer, TimeWindow>() {
      				private static final long serialVersionUID = 3166250579972849440L;
      
      				@Override
      				public void apply(
      					Integer key, TimeWindow window, Iterable<Event> input,
      					Collector<Tuple2<Integer, List<Event>>> out) throws Exception {
      
      					out.collect(Tuple2.of(key, StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
      				}
      			});
      
      		DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> events.f0)
      			.flatMap(createSlidingWindowCheckMapper(pt))
      			.addSink(new PrintSinkFunction<>());
      

      and then in the createSlidingWindowCheckMapper I verify that each event belongs to 3 consecutive windows, for which I keep contents of last window in ValueState. In a non-failure setup this check runs fine, but it misses few windows after restore at the beginning.

      public class SlidingWindowCheckMapper extends RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
      
      	private static final long serialVersionUID = -744070793650644485L;
      
      	/** This value state tracks previously seen events with the number of windows they appeared in. */
      	private transient ValueState<List<Tuple2<Event, Integer>>> previousWindow;
      
      	private final int slideFactor;
      
      	SlidingWindowCheckMapper(int slideFactor) {
      		this.slideFactor = slideFactor;
      	}
      
      	@Override
      	public void open(Configuration parameters) throws Exception {
      		ValueStateDescriptor<List<Tuple2<Event, Integer>>> previousWindowDescriptor =
      			new ValueStateDescriptor<>("previousWindow",
      				new ListTypeInfo<>(new TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));
      
      		previousWindow = getRuntimeContext().getState(previousWindowDescriptor);
      	}
      
      	@Override
      	public void flatMap(Tuple2<Integer, List<Event>> value, Collector<String> out) throws Exception {
      		List<Tuple2<Event, Integer>> previousWindowValues = Optional.ofNullable(previousWindow.value()).orElseGet(
      			Collections::emptyList);
      
      		List<Event> newValues = value.f1;
      		newValues.stream().reduce(new BinaryOperator<Event>() {
      			@Override
      			public Event apply(Event event, Event event2) {
      				if (event2.getSequenceNumber() - 1 != event.getSequenceNumber()) {
      					out.collect("Alert: events in window out ouf order!");
      				}
      
      				return event2;
      			}
      		});
      
      		List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
      		for (Tuple2<Event, Integer> windowValue : previousWindowValues) {
      			if (!newValues.contains(windowValue.f0)) {
      				out.collect(String.format("Alert: event %s did not belong to %d consecutive windows. Event seen so far %d times.Current window: %s",
      					windowValue.f0,
      					slideFactor,
      					windowValue.f1,
      					value.f1));
      			} else {
      				newValues.remove(windowValue.f0);
      				if (windowValue.f1 + 1 != slideFactor) {
      					newWindow.add(Tuple2.of(windowValue.f0, windowValue.f1 + 1));
      				}
      			}
      		}
      
      		newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));
      
      		previousWindow.update(newWindow);
      	}
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                srichter Stefan Richter
                Reporter:
                dawidwys Dawid Wysakowicz
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: