Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7793

BagState drops Rows when triggered by timer

Details

    Description

      If a BagState contains Rows, it appears to be empty when read in a timer. This is for example used by GroupIntoBatches, causing the following to fail, as the GroupIntoBatches transform will not output anything:

      public class BagStateTests implements Serializable {
          @Rule
          public final transient TestPipeline pipeline = TestPipeline.create();
      
          @Test
          public void canBatchRows() {
              Schema schema = Schema.builder().addInt32Field("a").build();
      
              PCollection<KV<Integer, Iterable<Row>>> pcoll = pipeline
                      .apply(Create
                              .of(Row.withSchema(schema).addValue(1).build())
                              .withType(TypeDescriptors.rows()))
                      .setRowSchema(schema)
                      .apply(WithKeys.<Integer, Row>of(0).withKeyType(TypeDescriptors.integers()))
                      .apply(GroupIntoBatches.ofSize(10));
      
              PAssert.that(pcoll).containsInAnyOrder(
                      KV.of(0, Collections.singletonList(
                              Row.withSchema(schema).addValue(1).build()
                      )));
      
              pipeline.run().waitUntilFinish();
          }
      }
      

      The above uses GroupIntoBatches as an example, but the problem seem to be with all DoFn having BagState.

      Also, the above test might not be the best since it assumes the iterable outputted by GroupIntoBatches is a list. Regardless, the test should not fail due to no values in the output collection.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mpedersen Mike Pedersen
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: