Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.9.0
Description
The java version of the ExampleIntegrationTest is incorrect since it assumes elements to arrive in the sink in order, but this isn't guaranteed since there are 2 sink subtasks mutating a shared collection.
The scala example was modified correctly; it checks that elements are contained without verifying the order.
public class ExampleIntegrationTest { ... // configure your test environment env.setParallelism(2); ... // create a stream of custom elements and apply transformations env.fromElements(1L, 21L, 22L) .map(new IncrementMapFunction()) .addSink(new CollectSink()); // execute env.execute(); // verify your results assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values); } // create a testing sink private static class CollectSink implements SinkFunction<Long> { // must be static public static final List<Long> values = new ArrayList<>(); @Override public synchronized void invoke(Long value) throws Exception { values.add(value); } } }
Attachments
Issue Links
- is caused by
-
FLINK-12508 Expand Documentation on Testing DataStream API programs
- Resolved
- links to