Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9119

apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerTest[...].test_large_elements is flaky

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: P3
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: sdk-py-core
    • Labels:

      Description

      Saw 3 errors today, all manifest with:

      IndexError: index out of range in apache_beam/coders/slow_stream.py", line 169, in read_byte_py3.

      https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1369
      https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1365
      https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1370

      Sample logs:

      12:10:27  =================================== FAILURES ===================================
      12:10:27  ____________ FnApiRunnerTestWithDisabledCaching.test_large_elements ____________
      12:10:27  [gw0] linux -- Python 3.6.8 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py36/build/srcs/sdks/python/target/.tox-py36-gcp-pytest/py36-gcp-pytest/bin/python
      12:10:27  
      12:10:27  self = <apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerTestWithDisabledCaching testMethod=test_large_elements>
      12:10:27  
      12:10:27      def test_large_elements(self):
      12:10:27        with self.create_pipeline() as p:
      12:10:27          big = (p
      12:10:27                 | beam.Create(['a', 'a', 'b'])
      12:10:27                 | beam.Map(lambda x: (
      12:10:27                     x, x * data_plane._DEFAULT_SIZE_FLUSH_THRESHOLD)))
      12:10:27      
      12:10:27          side_input_res = (
      12:10:27              big
      12:10:27              | beam.Map(lambda x, side: (x[0], side.count(x[0])),
      12:10:27                         beam.pvalue.AsList(big | beam.Map(lambda x: x[0]))))
      12:10:27          assert_that(side_input_res,
      12:10:27                      equal_to([('a', 2), ('a', 2), ('b', 1)]), label='side')
      12:10:27      
      12:10:27          gbk_res = (
      12:10:27              big
      12:10:27              | beam.GroupByKey()
      12:10:27              | beam.Map(lambda x: x[0]))
      12:10:27  >       assert_that(gbk_res, equal_to(['a', 'b']), label='gbk')
      12:10:27  
      12:10:27  apache_beam/runners/portability/fn_api_runner_test.py:617: 
      12:10:27  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      12:10:27  apache_beam/pipeline.py:479: in __exit__
      12:10:27      self.run().wait_until_finish()
      12:10:27  apache_beam/pipeline.py:459: in run
      12:10:27      self._options).run(False)
      12:10:27  apache_beam/pipeline.py:472: in run
      12:10:27      return self.runner.run_pipeline(self, self._options)
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:472: in run_pipeline
      12:10:27      default_environment=self._default_environment))
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:480: in run_via_runner_api
      12:10:27      return self.run_stages(stage_context, stages)
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:569: in run_stages
      12:10:27      stage_context.safe_coders)
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:889: in _run_stage
      12:10:27      result, splits = bundle_manager.process_bundle(data_input, data_output)
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:2076: in process_bundle
      12:10:27      part, expected_outputs), part_inputs):
      12:10:27  /usr/lib/python3.6/concurrent/futures/_base.py:586: in result_iterator
      12:10:27      yield fs.pop().result()
      12:10:27  /usr/lib/python3.6/concurrent/futures/_base.py:432: in result
      12:10:27      return self.__get_result()
      12:10:27  /usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
      12:10:27      raise self._exception
      12:10:27  apache_beam/utils/thread_pool_executor.py:44: in run
      12:10:27      self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:2076: in <lambda>
      12:10:27      part, expected_outputs), part_inputs):
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:2020: in process_bundle
      12:10:27      expected_outputs[output.transform_id]).append(output.data)
      12:10:27  apache_beam/runners/portability/fn_api_runner.py:285: in append
      12:10:27      windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
      12:10:27  apache_beam/coders/coder_impl.py:1153: in decode_from_stream
      12:10:27      value = self._value_coder.decode_from_stream(in_stream, nested)
      12:10:27  apache_beam/coders/coder_impl.py:749: in decode_from_stream
      12:10:27      for i, c in enumerate(self._coder_impls)])
      12:10:27  apache_beam/coders/coder_impl.py:749: in <listcomp>
      12:10:27      for i, c in enumerate(self._coder_impls)])
      12:10:27  apache_beam/coders/coder_impl.py:493: in decode_from_stream
      12:10:27      return in_stream.read_all(nested)
      12:10:27  apache_beam/coders/slow_stream.py:158: in read_all
      12:10:27      return self.read(self.read_var_int64() if nested else self.size())
      12:10:27  apache_beam/coders/slow_stream.py:175: in read_var_int64
      12:10:27      byte = self.read_byte()
      12:10:27  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      12:10:27  
      12:10:27  self = <apache_beam.coders.slow_stream.InputStream object at 0x7f2390d80c88>
      12:10:27  
      12:10:27      def read_byte_py3(self):
      12:10:27        # type: () -> int
      12:10:27        self.pos += 1
      12:10:27  >     return self.data[self.pos - 1]
      12:10:27  E     IndexError: index out of range
      12:10:27  
      12:10:27  apache_beam/coders/slow_stream.py:169: IndexError
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                tvalentyn Valentyn Tymofieiev
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h