Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12297

Make ClosureCleaner recursive

    XMLWordPrintableJSON

Details

    Description

      Right now we do not invoke closure cleaner on output tags. Therefore such code:

      	@Test
      	public void testFlatSelectSerialization() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);
      		OutputTag<Integer> outputTag = new OutputTag<Integer>("AAA") {};
      		CEP.pattern(elements, Pattern.begin("A")).flatSelect(
      			outputTag,
      			new PatternFlatTimeoutFunction<Integer, Integer>() {
      				@Override
      				public void timeout(
      					Map<String, List<Integer>> pattern,
      					long timeoutTimestamp,
      					Collector<Integer> out) throws Exception {
      
      				}
      			},
      			new PatternFlatSelectFunction<Integer, Object>() {
      				@Override
      				public void flatSelect(Map<String, List<Integer>> pattern, Collector<Object> out) throws Exception {
      
      				}
      			}
      		);
      
      		env.execute();
      	}
      

      will fail with {{The implementation of the PatternFlatSelectAdapter is not serializable. }} exception

      Attachments

        Issue Links

          Activity

            People

              aitozi WenJun Min
              dwysakowicz Dawid Wysakowicz
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 20m
                  1h 20m