Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Duplicate
-
2.13.0
Description
If a BagState contains Rows, it appears to be empty when read in a timer. This is for example used by GroupIntoBatches, causing the following to fail, as the GroupIntoBatches transform will not output anything:
public class BagStateTests implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Test public void canBatchRows() { Schema schema = Schema.builder().addInt32Field("a").build(); PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline .apply(Create .of(Row.withSchema(schema).addValue(1).build()) .withType(TypeDescriptors.rows())) .setRowSchema(schema) .apply(WithKeys.<Integer, Row>of(0).withKeyType(TypeDescriptors.integers())) .apply(GroupIntoBatches.ofSize(10)); PAssert.that(pcoll).containsInAnyOrder( KV.of(0, Collections.singletonList( Row.withSchema(schema).addValue(1).build() ))); pipeline.run().waitUntilFinish(); } }
The above uses GroupIntoBatches as an example, but the problem seem to be with all DoFn having BagState.
Also, the above test might not be the best since it assumes the iterable outputted by GroupIntoBatches is a list. Regardless, the test should not fail due to no values in the output collection.