Details
-
Bug
-
Status: Resolved
-
P1
-
Resolution: Duplicate
-
None
Description
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/20516/consoleFull
22:50:10 =================================== FAILURES =================================== 22:50:10 _______________ StreamingWordCountIT.test_streaming_wordcount_it _______________ 22:50:10 [gw0] linux -- Python 3.7.10 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7 22:50:10 22:50:10 self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it> 22:50:10 22:50:10 @pytest.mark.it_postcommit 22:50:10 def test_streaming_wordcount_it(self): 22:50:10 # Build expected dataset. 22:50:10 expected_msg = [('%d: 1' % num).encode('utf-8') 22:50:10 for num in range(DEFAULT_INPUT_NUMBERS)] 22:50:10 22:50:10 # Set extra options to the pipeline for test purpose 22:50:10 state_verifier = PipelineStateMatcher(PipelineState.RUNNING) 22:50:10 pubsub_msg_verifier = PubSubMessageMatcher( 22:50:10 self.project, self.output_sub.name, expected_msg, timeout=400) 22:50:10 extra_opts = { 22:50:10 'input_subscription': self.input_sub.name, 22:50:10 'output_topic': self.output_topic.name, 22:50:10 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 22:50:10 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier) 22:50:10 } 22:50:10 22:50:10 # Generate input data and inject to PubSub. 22:50:10 self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS) 22:50:10 22:50:10 # Get pipeline options from command argument: --test-pipeline-options, 22:50:10 # and start pipeline job by calling pipeline main function. 22:50:10 streaming_wordcount.run( 22:50:10 self.test_pipeline.get_full_options_as_args(**extra_opts), 22:50:10 > save_main_session=False) 22:50:10 22:50:10 apache_beam/examples/streaming_wordcount_it_test.py:104: 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 22:50:10 apache_beam/examples/streaming_wordcount.py:103: in run 22:50:10 output | beam.io.WriteToPubSub(known_args.output_topic) 22:50:10 apache_beam/pipeline.py:596: in __exit__ 22:50:10 self.result = self.run() 22:50:10 apache_beam/pipeline.py:573: in run 22:50:10 return self.runner.run_pipeline(self, self._options) 22:50:10 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 22:50:10 22:50:10 self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7f1ac01efc90> 22:50:10 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f1afd515190> 22:50:10 options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7f1ac0298490> 22:50:10 22:50:10 def run_pipeline(self, pipeline, options): 22:50:10 """Execute test pipeline and verify test matcher""" 22:50:10 test_options = options.view_as(TestOptions) 22:50:10 on_success_matcher = test_options.on_success_matcher 22:50:10 wait_duration = test_options.wait_until_finish_duration 22:50:10 is_streaming = options.view_as(StandardOptions).streaming 22:50:10 22:50:10 # [BEAM-1889] Do not send this to remote workers also, there is no need to 22:50:10 # send this option to remote executors. 22:50:10 test_options.on_success_matcher = None 22:50:10 22:50:10 self.result = super().run_pipeline(pipeline, options) 22:50:10 if self.result.has_job: 22:50:10 # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs 22:50:10 # in some cases. 22:50:10 print('Worker logs: %s' % self.build_console_url(options)) 22:50:10 22:50:10 try: 22:50:10 self.wait_until_in_state(PipelineState.RUNNING) 22:50:10 22:50:10 if is_streaming and not wait_duration: 22:50:10 _LOGGER.warning('Waiting indefinitely for streaming job.') 22:50:10 self.result.wait_until_finish(duration=wait_duration) 22:50:10 22:50:10 if on_success_matcher: 22:50:10 from hamcrest import assert_that as hc_assert_that 22:50:10 > hc_assert_that(self.result, pickler.loads(on_success_matcher)) 22:50:10 E AssertionError: 22:50:10 E Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.) 22:50:10 E but: Expected 500 messages. Got 528 messages. Diffs (item, count): 22:50:10 E Expected but not in actual: dict_items([(b'18: 1', 1), (b'23: 1', 1), (b'152: 1', 1), (b'162: 1', 1), (b'168: 1', 1), (b'184: 1', 1), (b'206: 1', 1), (b'208: 1', 1), (b'215: 1', 1), (b'247: 1', 1), (b'255: 1', 1), (b'265: 1', 1), (b'276: 1', 1), (b'278: 1', 1), (b'294: 1', 1), (b'350: 1', 1), (b'356: 1', 1), (b'395: 1', 1), (b'428: 1', 1), (b'450: 1', 1), (b'474: 1', 1)]) 22:50:10 E Unexpected: dict_items([(b'384: 1', 1), (b'237: 1', 1), (b'166: 1', 1), (b'262: 1', 1), (b'5: 1', 1), (b'13: 1', 1), (b'437: 1', 1), (b'263: 1', 1), (b'423: 1', 1), (b'317: 1', 1), (b'447: 1', 1), (b'125: 1', 1), (b'270: 1', 1), (b'116: 1', 1), (b'102: 1', 1), (b'326: 1', 1), (b'21: 1', 1), (b'244: 1', 1), (b'400: 1', 1), (b'117: 1', 1), (b'393: 1', 1), (b'225: 1', 1), (b'187: 1', 1), (b'210: 1', 1), (b'258: 1', 1), (b'226: 1', 1), (b'127: 1', 1), (b'84: 1', 1), (b'182: 1', 1), (b'373: 1', 1), (b'104: 1', 1), (b'382: 1', 1), (b'295: 1', 1), (b'325: 1', 1), (b'113: 1', 1), (b'470: 1', 1), (b'14: 1', 1), (b'353: 1', 1), (b'333: 1', 1), (b'413: 1', 1), (b'445: 1', 1), (b'115: 1', 1), (b'109: 1', 1), (b'386: 1', 1), (b'274: 1', 1), (b'303: 1', 1), (b'77: 1', 1), (b'455: 1', 1), (b'223: 1', 1)]) 22:50:10 22:50:10 apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError 22:50:10 ------------------------------ Captured log call -------------------------------
Attachments
Issue Links
- duplicates
-
BEAM-12673 apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it flakey
- Open