Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
Description
One of the integration tests is failing in PostCommits:
Error MessageAssertionError: Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.) but: Expected 4 messages. Got 4 messages. Diffs (item, count): Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)]) Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)]) Stripped attributes: ['id', 'timestamp']Stacktraceself = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_streaming_with_attributes> @pytest.mark.it_postcommit def test_streaming_with_attributes(self): > self._test_streaming(with_attributes=True) apache_beam/io/gcp/pubsub_integration_test.py:213: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming timestamp_attribute=self.TIMESTAMP_ATTRIBUTE) apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline result = p.run() apache_beam/pipeline.py:573: in run return self.runner.run_pipeline(self, self._options) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7fa1d1561950> pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750> options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fa1d15614d0> def run_pipeline(self, pipeline, options): """Execute test pipeline and verify test matcher""" test_options = options.view_as(TestOptions) on_success_matcher = test_options.on_success_matcher wait_duration = test_options.wait_until_finish_duration is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. test_options.on_success_matcher = None self.result = super().run_pipeline(pipeline, options) if self.result.has_job: # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print('Worker logs: %s' % self.build_console_url(options)) try: self.wait_until_in_state(PipelineState.RUNNING) if is_streaming and not wait_duration: _LOGGER.warning('Waiting indefinitely for streaming job.') self.result.wait_until_finish(duration=wait_duration) if on_success_matcher: from hamcrest import assert_that as hc_assert_that > hc_assert_that(self.result, pickler.loads(on_success_matcher)) E AssertionError: E Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.) E but: Expected 4 messages. Got 4 messages. Diffs (item, count): E Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)]) E Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)]) E Stripped attributes: ['id', 'timestamp'] apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError
Attachments
Issue Links
- relates to
-
BEAM-13480 apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_data_only is flaky
- Open
- links to