Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9803

test_streaming_wordcount flaky

Details

    Description

      Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 sec.Error MessageAssertionError: DataFrame are different  DataFrame shape mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest testMethod=test_streaming_wordcount>
      
          @unittest.skipIf(
              sys.version_info < (3, 5, 3),
              'The tests require at least Python 3.6 to work.')
          def test_streaming_wordcount(self):
            class WordExtractingDoFn(beam.DoFn):
              def process(self, element):
                text_line = element.strip()
                words = text_line.split()
                return words
          
            # Add the TestStream so that it can be cached.
            ib.options.capturable_sources.add(TestStream)
            ib.options.capture_duration = timedelta(seconds=5)
          
            p = beam.Pipeline(
                runner=interactive_runner.InteractiveRunner(),
                options=StandardOptions(streaming=True))
          
            data = (
                p
                | TestStream()
                    .advance_watermark_to(0)
                    .advance_processing_time(1)
                    .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
                    .advance_watermark_to(20)
                    .advance_processing_time(1)
                    .add_elements(['that', 'is', 'the', 'question'])
                | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
          
            counts = (
                data
                | 'split' >> beam.ParDo(WordExtractingDoFn())
                | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                | 'group' >> beam.GroupByKey()
                | 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1]))))
          
            # Watch the local scope for Interactive Beam so that referenced PCollections
            # will be cached.
            ib.watch(locals())
          
            # This is normally done in the interactive_utils when a transform is
            # applied but needs an IPython environment. So we manually run this here.
            ie.current_env().track_user_pipelines()
          
            # Create a fake limiter that cancels the BCJ once the main job receives the
            # expected amount of results.
            class FakeLimiter:
              def __init__(self, p, pcoll):
                self.p = p
                self.pcoll = pcoll
          
              def is_triggered(self):
                result = ie.current_env().pipeline_result(self.p)
                if result:
                  try:
                    results = result.get(self.pcoll)
                  except ValueError:
                    return False
                  return len(results) >= 10
                return False
          
            # This sets the limiters to stop reading when the test receives 10 elements
            # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
            ie.current_env().options.capture_control.set_limiters_for_test(
                [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
          
            # This tests that the data was correctly cached.
            pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
            expected_data_df = pd.DataFrame([
                ('to', 0, [IntervalWindow(0, 10)], pane_info),
                ('be', 0, [IntervalWindow(0, 10)], pane_info),
                ('or', 0, [IntervalWindow(0, 10)], pane_info),
                ('not', 0, [IntervalWindow(0, 10)], pane_info),
                ('to', 0, [IntervalWindow(0, 10)], pane_info),
                ('be', 0, [IntervalWindow(0, 10)], pane_info),
                ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
                ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
                ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
                ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
            ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
          
            data_df = ib.collect(data, include_window_info=True)
      >     pd.testing.assert_frame_equal(expected_data_df, data_df)
      E     AssertionError: DataFrame are different
      E     
      E     DataFrame shape mismatch
      E     [left]:  (10, 4)
      E     [right]: (6, 4)
      
      apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
      

      Attachments

        Issue Links

          Activity

            People

              rohdesam Sam Rohde
              ningk Ning
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: