Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Resolved
-
1.3.1
-
None
Description
When using the CoGroup api and enable the checkpoint, Job will failed when performing checkpoint, e.g:
input1.coGroup(input2) .where(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }) .equalTo(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }) .window(SlothJoinWindow.create()) .trigger(new SlothWindowTrigger(0)) .apply(new CoGroupFunction<String, String, String>() { @Override public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception { String outputStr = "first:" + first + " , second:" + second; System.out.println(outputStr); out.collect(outputStr); } }) .keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }) .print();