Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9118

apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

Details

    Description

      Sample errors:

      https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373

      4:30:12  self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_unfusable_side_inputs>
      14:30:12  
      14:30:12      def test_pardo_unfusable_side_inputs(self):
      14:30:12        def cross_product(elem, sides):
      14:30:12          for side in sides:
      14:30:12            yield elem, side
      14:30:12        with self.create_pipeline() as p:
      14:30:12          pcoll = p | beam.Create(['a', 'b'])
      14:30:12          assert_that(
      14:30:12              pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(pcoll)),
      14:30:12              equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
      14:30:12      
      14:30:12        with self.create_pipeline() as p:
      14:30:12          pcoll = p | beam.Create(['a', 'b'])
      14:30:12          derived = ((pcoll,) | beam.Flatten()
      14:30:12                     | beam.Map(lambda x: (x, x))
      14:30:12                     | beam.GroupByKey()
      14:30:12                     | 'Unkey' >> beam.Map(lambda kv: kv[0]))
      14:30:12          assert_that(
      14:30:12              pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(derived)),
      14:30:12  >           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
      14:30:12  
      14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
      14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      14:30:12  apache_beam/pipeline.py:481: in __exit__
      14:30:12      self.run().wait_until_finish()
      14:30:12  apache_beam/runners/portability/portable_runner.py:445: in wait_until_finish
      14:30:12      for state_response in self._state_stream:
      14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416: in __next__
      14:30:12      return self._next()
      14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694: in _next
      14:30:12      _common.wait(self._state.condition.wait, _response_ready)
      14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140: in wait
      14:30:12      _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
      14:30:12  target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105: in _wait_once
      14:30:12      wait_fn(timeout=timeout)
      14:30:12  /usr/lib/python3.6/threading.py:299: in wait
      14:30:12      gotit = waiter.acquire(True, timeout)
      14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      14:30:12  
      14:30:12  signum = 14, frame = <frame object at 0x31daf68>
      14:30:12  
      14:30:12      def handler(signum, frame):
      14:30:12        msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
      14:30:12        print('=' * 20, msg, '=' * 20)
      14:30:12        traceback.print_stack(frame)
      14:30:12        threads_by_id = {th.ident: th for th in threading.enumerate()}
      14:30:12        for thread_id, stack in sys._current_frames().items():
      14:30:12          th = threads_by_id.get(thread_id)
      14:30:12          print()
      14:30:12          print('# Thread:', th or thread_id)
      14:30:12          traceback.print_stack(stack)
      14:30:12  >     raise BaseException(msg)
      14:30:12  E     BaseException: Timed out after 60 seconds.
      14:30:12  
      14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: BaseException
      

      https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/

      09:06:01  self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers testMethod=test_assert_that>
      09:06:01  
      09:06:01      def test_assert_that(self):
      09:06:01        # TODO: figure out a way for fn_api_runner to parse and raise the
      09:06:01        # underlying exception.
      09:06:01        with self.assertRaisesRegex(Exception, 'Failed assert'):
      09:06:01          with self.create_pipeline() as p:
      09:06:01  >         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
      09:06:01  E         AssertionError: "Failed assert" does not match "Pipeline timed out waiting for job service subprocess."
      
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tvalentyn Valentyn Tymofieiev
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h

                  Slack

                    Issue deployment