Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13234

Flake in StreamingWordCountIT.test_streaming_wordcount_it

Details

    Description

      https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/20516/consoleFull

      22:50:10 =================================== FAILURES ===================================
      22:50:10 _______________ StreamingWordCountIT.test_streaming_wordcount_it _______________
      22:50:10 [gw0] linux -- Python 3.7.10 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
      22:50:10 
      22:50:10 self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it>
      22:50:10 
      22:50:10     @pytest.mark.it_postcommit
      22:50:10     def test_streaming_wordcount_it(self):
      22:50:10       # Build expected dataset.
      22:50:10       expected_msg = [('%d: 1' % num).encode('utf-8')
      22:50:10                       for num in range(DEFAULT_INPUT_NUMBERS)]
      22:50:10     
      22:50:10       # Set extra options to the pipeline for test purpose
      22:50:10       state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
      22:50:10       pubsub_msg_verifier = PubSubMessageMatcher(
      22:50:10           self.project, self.output_sub.name, expected_msg, timeout=400)
      22:50:10       extra_opts = {
      22:50:10           'input_subscription': self.input_sub.name,
      22:50:10           'output_topic': self.output_topic.name,
      22:50:10           'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
      22:50:10           'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
      22:50:10       }
      22:50:10     
      22:50:10       # Generate input data and inject to PubSub.
      22:50:10       self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
      22:50:10     
      22:50:10       # Get pipeline options from command argument: --test-pipeline-options,
      22:50:10       # and start pipeline job by calling pipeline main function.
      22:50:10       streaming_wordcount.run(
      22:50:10           self.test_pipeline.get_full_options_as_args(**extra_opts),
      22:50:10 >         save_main_session=False)
      22:50:10 
      22:50:10 apache_beam/examples/streaming_wordcount_it_test.py:104: 
      22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      22:50:10 apache_beam/examples/streaming_wordcount.py:103: in run
      22:50:10     output | beam.io.WriteToPubSub(known_args.output_topic)
      22:50:10 apache_beam/pipeline.py:596: in __exit__
      22:50:10     self.result = self.run()
      22:50:10 apache_beam/pipeline.py:573: in run
      22:50:10     return self.runner.run_pipeline(self, self._options)
      22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      22:50:10 
      22:50:10 self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7f1ac01efc90>
      22:50:10 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f1afd515190>
      22:50:10 options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7f1ac0298490>
      22:50:10 
      22:50:10     def run_pipeline(self, pipeline, options):
      22:50:10       """Execute test pipeline and verify test matcher"""
      22:50:10       test_options = options.view_as(TestOptions)
      22:50:10       on_success_matcher = test_options.on_success_matcher
      22:50:10       wait_duration = test_options.wait_until_finish_duration
      22:50:10       is_streaming = options.view_as(StandardOptions).streaming
      22:50:10     
      22:50:10       # [BEAM-1889] Do not send this to remote workers also, there is no need to
      22:50:10       # send this option to remote executors.
      22:50:10       test_options.on_success_matcher = None
      22:50:10     
      22:50:10       self.result = super().run_pipeline(pipeline, options)
      22:50:10       if self.result.has_job:
      22:50:10         # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
      22:50:10         # in some cases.
      22:50:10         print('Worker logs: %s' % self.build_console_url(options))
      22:50:10     
      22:50:10       try:
      22:50:10         self.wait_until_in_state(PipelineState.RUNNING)
      22:50:10     
      22:50:10         if is_streaming and not wait_duration:
      22:50:10           _LOGGER.warning('Waiting indefinitely for streaming job.')
      22:50:10         self.result.wait_until_finish(duration=wait_duration)
      22:50:10     
      22:50:10         if on_success_matcher:
      22:50:10           from hamcrest import assert_that as hc_assert_that
      22:50:10 >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
      22:50:10 E         AssertionError: 
      22:50:10 E         Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.)
      22:50:10 E              but: Expected 500 messages. Got 528 messages. Diffs (item, count):
      22:50:10 E           Expected but not in actual: dict_items([(b'18: 1', 1), (b'23: 1', 1), (b'152: 1', 1), (b'162: 1', 1), (b'168: 1', 1), (b'184: 1', 1), (b'206: 1', 1), (b'208: 1', 1), (b'215: 1', 1), (b'247: 1', 1), (b'255: 1', 1), (b'265: 1', 1), (b'276: 1', 1), (b'278: 1', 1), (b'294: 1', 1), (b'350: 1', 1), (b'356: 1', 1), (b'395: 1', 1), (b'428: 1', 1), (b'450: 1', 1), (b'474: 1', 1)])
      22:50:10 E           Unexpected: dict_items([(b'384: 1', 1), (b'237: 1', 1), (b'166: 1', 1), (b'262: 1', 1), (b'5: 1', 1), (b'13: 1', 1), (b'437: 1', 1), (b'263: 1', 1), (b'423: 1', 1), (b'317: 1', 1), (b'447: 1', 1), (b'125: 1', 1), (b'270: 1', 1), (b'116: 1', 1), (b'102: 1', 1), (b'326: 1', 1), (b'21: 1', 1), (b'244: 1', 1), (b'400: 1', 1), (b'117: 1', 1), (b'393: 1', 1), (b'225: 1', 1), (b'187: 1', 1), (b'210: 1', 1), (b'258: 1', 1), (b'226: 1', 1), (b'127: 1', 1), (b'84: 1', 1), (b'182: 1', 1), (b'373: 1', 1), (b'104: 1', 1), (b'382: 1', 1), (b'295: 1', 1), (b'325: 1', 1), (b'113: 1', 1), (b'470: 1', 1), (b'14: 1', 1), (b'353: 1', 1), (b'333: 1', 1), (b'413: 1', 1), (b'445: 1', 1), (b'115: 1', 1), (b'109: 1', 1), (b'386: 1', 1), (b'274: 1', 1), (b'303: 1', 1), (b'77: 1', 1), (b'455: 1', 1), (b'223: 1', 1)])
      22:50:10 
      22:50:10 apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError
      22:50:10 ------------------------------ Captured log call -------------------------------
      

      Attachments

        Issue Links

          Activity

            People

              Ryan.Thompson Ryan Thompson
              tvalentyn Valentyn Tymofieiev
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Slack

                  Issue deployment