Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9832

KeyError: 'No such coder: ' in fn_runner_test

Details

    • Test
    • Status: Resolved
    • P1
    • Resolution: Fixed
    • None
    • 2.22.0
    • sdk-py-core, test-failures
    • None

    Description

      Failed test results can be found [here|https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]

       

      A stack trace:

      self = <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithMultiWorkers testMethod=test_read>
      
          def test_read(self):
            # Can't use NamedTemporaryFile as a context
            # due to https://bugs.python.org/issue14243
            temp_file = tempfile.NamedTemporaryFile(delete=False)
            try:
              temp_file.write(b'a\nb\nc')
              temp_file.close()
              with self.create_pipeline() as p:
                assert_that(
      >             p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 'c']))
      
      apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      apache_beam/pipeline.py:529: in __exit__
          self.run().wait_until_finish()
      apache_beam/pipeline.py:502: in run
          self._options).run(False)
      apache_beam/pipeline.py:515: in run
          return self.runner.run_pipeline(self, self._options)
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in run_via_runner_api
          return self.run_stages(stage_context, stages)
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages
          bundle_context_manager,
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage
          bundle_manager)
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle
          data_input, data_output, input_timers, expected_timer_output)
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in process_bundle
          timer_inputs)):
      /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator
          yield future.result()
      /usr/lib/python3.5/concurrent/futures/_base.py:405: in result
          return self.__get_result()
      /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result
          raise self._exception
      apache_beam/utils/thread_pool_executor.py:44: in run
          self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute
          dry_run)
      apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in process_bundle
          result_future = self._worker_handler.control_conn.push(process_bundle_req)
      apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push
          response = self.worker.do_instruction(request)
      apache_beam/runners/worker/sdk_worker.py:471: in do_instruction
          getattr(request, request_type), request.instruction_id)
      apache_beam/runners/worker/sdk_worker.py:500: in process_bundle
          instruction_id, request.process_bundle_descriptor_id)
      apache_beam/runners/worker/sdk_worker.py:374: in get
          self.data_channel_factory)
      apache_beam/runners/worker/bundle_processor.py:782: in __init__
          self.ops = self.create_execution_tree(self.process_bundle_descriptor)
      apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree
          descriptor.transforms, key=topological_height, reverse=True)
      apache_beam/runners/worker/bundle_processor.py:836: in <listcomp>
          (transform_id, get_operation(transform_id)) for transform_id in sorted(
      apache_beam/runners/worker/bundle_processor.py:726: in wrapper
          result = cache[args] = func(*args)
      apache_beam/runners/worker/bundle_processor.py:820: in get_operation
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
      apache_beam/runners/worker/bundle_processor.py:820: in <dictcomp>
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
      apache_beam/runners/worker/bundle_processor.py:818: in <listcomp>
          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
      apache_beam/runners/worker/bundle_processor.py:726: in wrapper
          result = cache[args] = func(*args)
      apache_beam/runners/worker/bundle_processor.py:823: in get_operation
          transform_id, transform_consumers)
      apache_beam/runners/worker/bundle_processor.py:1108: in create_operation
          return creator(self, transform_id, transform_proto, payload, consumers)
      apache_beam/runners/worker/bundle_processor.py:1341: in create_pair_with_restriction
          return _create_sdf_operation(PairWithRestriction, *args)
      apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation
          parameter)
      apache_beam/runners/worker/bundle_processor.py:1501: in _create_pardo_operation
          output_coders = factory.get_output_coders(transform_proto)
      apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders
          pcoll_id in transform_proto.outputs.items()
      apache_beam/runners/worker/bundle_processor.py:1154: in <dictcomp>
          pcoll_id in transform_proto.outputs.items()
      apache_beam/runners/worker/bundle_processor.py:1139: in get_windowed_coder
          coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <apache_beam.runners.worker.bundle_processor.BeamTransformFactory object at 0x7f4248bf5160>
      coder_id = ''
      
          def get_coder(self, coder_id):
            # type: (str) -> coders.Coder
            if coder_id not in self.descriptor.coders:
      >       raise KeyError("No such coder: %s" % coder_id)
      E       KeyError: 'No such coder: '
      
      apache_beam/runners/worker/bundle_processor.py:1128: KeyError
      

      Attachments

        Issue Links

          Activity

            People

              pabloem Pablo Estrada
              ningk Ning
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m