Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13218

apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest.test_streaming_with_attributes is failing

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.38.0
    • test-failures

    Description

      One of the integration tests is failing in PostCommits

      https://ci-beam.apache.org/job/beam_PostCommit_Python37/4496/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/

       

      https://ci-beam.apache.org/job/beam_PostCommit_Python38/1893/testReport/junit/apache_beam.io.gcp.pubsub_integration_test/PubSubIntegrationTest/test_streaming_with_attributes/

      Error MessageAssertionError:  Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.)      but: Expected 4 messages. Got 4 messages. Diffs (item, count):   Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])   Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)])   Stripped attributes: ['id', 'timestamp']Stacktraceself = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_streaming_with_attributes>
      
          @pytest.mark.it_postcommit
          def test_streaming_with_attributes(self):
      >     self._test_streaming(with_attributes=True)
      
      apache_beam/io/gcp/pubsub_integration_test.py:213: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      apache_beam/io/gcp/pubsub_integration_test.py:205: in _test_streaming
          timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
      apache_beam/io/gcp/pubsub_it_pipeline.py:93: in run_pipeline
          result = p.run()
      apache_beam/pipeline.py:573: in run
          return self.runner.run_pipeline(self, self._options)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7fa1d1561950>
      pipeline = <apache_beam.pipeline.Pipeline object at 0x7fa1d1561750>
      options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fa1d15614d0>
      
          def run_pipeline(self, pipeline, options):
            """Execute test pipeline and verify test matcher"""
            test_options = options.view_as(TestOptions)
            on_success_matcher = test_options.on_success_matcher
            wait_duration = test_options.wait_until_finish_duration
            is_streaming = options.view_as(StandardOptions).streaming
          
            # [BEAM-1889] Do not send this to remote workers also, there is no need to
            # send this option to remote executors.
            test_options.on_success_matcher = None
          
            self.result = super().run_pipeline(pipeline, options)
            if self.result.has_job:
              # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
              # in some cases.
              print('Worker logs: %s' % self.build_console_url(options))
          
            try:
              self.wait_until_in_state(PipelineState.RUNNING)
          
              if is_streaming and not wait_duration:
                _LOGGER.warning('Waiting indefinitely for streaming job.')
              self.result.wait_until_finish(duration=wait_duration)
          
              if on_success_matcher:
                from hamcrest import assert_that as hc_assert_that
      >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
      E         AssertionError: 
      E         Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4 messages.)
      E              but: Expected 4 messages. Got 4 messages. Diffs (item, count):
      E           Expected but not in actual: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}), 1)])
      E           Unexpected: dict_items([(PubsubMessage(b'data001-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1), (PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT', 'timestamp': 'PubSubMessageMatcher error: expected attribute not found.'}), 1)])
      E           Stripped attributes: ['id', 'timestamp']
      
      apache_beam/runners/dataflow/test_dataflow_runner.py:68: AssertionError 

      Attachments

        Issue Links

          Activity

            People

              yeandy Andy Ye
              Anand Inguva Anand Inguva
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10h 20m
                  10h 20m