I am using Flink 1.3.1 and I have found a strange behavior on running the following logic:
- Read data from file and store into DataSet<POJO>
- Split dataset in two, by checking if "field1" of POJOs is empty or not, so that the first dataset contains only elements with non empty "field1", and the second dataset will contain the other elements.
- Each dataset is then grouped by, one by "field1" and other by another field, and subsequently reduced.
- The 2 datasets are merged together by union.
- The final dataset is written as json.
What I was expected, from output, was to find only one element with a specific value of "field1" because:
- Reducing the first dataset grouped by "field1" should generate only one element with a specific value of "field1".
- The second dataset should contain only elements with empty "field1".
- Making an union of them should not duplicate any record.
This does not happen. When i read the generated jsons i see some duplicate (non empty) values of "field1".
Strangely this does not happen when the union between the two datasets is not computed. In this case the first dataset produces elements only with distinct values of "field1", while second dataset produces only records with empty field "value1".
The user has not enable object reuse.
Later he reports that the problem disappears when he injects a rebalance() after a union resolves the problem. I had a look at the execution plans for both cases (attached to this issue) but could not identify a problem.
Hence I assume, this might be an issue with the runtime code but we need to look deeper into this. The user also provided an example program consisting of two classes which are attached to the issue as well.