Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3377

assert_that not working for streaming

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.2.0
    • None
    • sdk-py-core
    • None

    Description

      assert_that does not work for AfterWatermark timers.

      Easy way to reproduce: modify test_gbk_execution [1] in this form:

       def test_this(self):
          test_stream = (TestStream()
                         .add_elements(['a', 'b', 'c'])
                         .advance_watermark_to(20))
      
          def fnc(x):
            print 'fired_elem:', x
            return x
      
          options = PipelineOptions()
          options.view_as(StandardOptions).streaming = True
          p = TestPipeline(options=options)
          records = (p
                     | test_stream
                     | beam.WindowInto(
                         FixedWindows(15),
                         trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
                         accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
                     | beam.Map(lambda x: ('k', x))
                     | beam.GroupByKey())
          assert_that(records, equal_to([
              ('k', ['a', 'b', 'c'])]))
          p.run()
      
      

      This test will pass, but if the .advance_watermark_to(20) is removed, the test will fail. However, both cases fire the same elements:
      fired_elem: ('k', ['a', 'b', 'c'])
      fired_elem: ('k', ['a', 'b', 'c'])

      In the passing case, they correspond to the sorted_actual inside the assert_that. In the failing case:
      sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
      sorted_actual: []

      [1] https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120

      Attachments

        Activity

          People

            Unassigned Unassigned
            mariagh MarĂ­a GH
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 6h 20m
                6h 20m