Details
-
Test
-
Status: Resolved
-
P1
-
Resolution: Fixed
-
None
-
None
Description
Failed test results can be found [here|https://builds.apache.org/job/beam_PreCommit_Python_Commit/12525/]
A stack trace:
self = <apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithMultiWorkers testMethod=test_read> def test_read(self): # Can't use NamedTemporaryFile as a context # due to https://bugs.python.org/issue14243 temp_file = tempfile.NamedTemporaryFile(delete=False) try: temp_file.write(b'a\nb\nc') temp_file.close() with self.create_pipeline() as p: assert_that( > p | beam.io.ReadFromText(temp_file.name), equal_to(['a', 'b', 'c'])) apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:625: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/pipeline.py:529: in __exit__ self.run().wait_until_finish() apache_beam/pipeline.py:502: in run self._options).run(False) apache_beam/pipeline.py:515: in run return self.runner.run_pipeline(self, self._options) apache_beam/runners/portability/fn_api_runner/fn_runner.py:173: in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) apache_beam/runners/portability/fn_api_runner/fn_runner.py:183: in run_via_runner_api return self.run_stages(stage_context, stages) apache_beam/runners/portability/fn_api_runner/fn_runner.py:331: in run_stages bundle_context_manager, apache_beam/runners/portability/fn_api_runner/fn_runner.py:508: in _run_stage bundle_manager) apache_beam/runners/portability/fn_api_runner/fn_runner.py:546: in _run_bundle data_input, data_output, input_timers, expected_timer_output) apache_beam/runners/portability/fn_api_runner/fn_runner.py:927: in process_bundle timer_inputs)): /usr/lib/python3.5/concurrent/futures/_base.py:556: in result_iterator yield future.result() /usr/lib/python3.5/concurrent/futures/_base.py:405: in result return self.__get_result() /usr/lib/python3.5/concurrent/futures/_base.py:357: in __get_result raise self._exception apache_beam/utils/thread_pool_executor.py:44: in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) apache_beam/runners/portability/fn_api_runner/fn_runner.py:923: in execute dry_run) apache_beam/runners/portability/fn_api_runner/fn_runner.py:826: in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) apache_beam/runners/portability/fn_api_runner/worker_handlers.py:353: in push response = self.worker.do_instruction(request) apache_beam/runners/worker/sdk_worker.py:471: in do_instruction getattr(request, request_type), request.instruction_id) apache_beam/runners/worker/sdk_worker.py:500: in process_bundle instruction_id, request.process_bundle_descriptor_id) apache_beam/runners/worker/sdk_worker.py:374: in get self.data_channel_factory) apache_beam/runners/worker/bundle_processor.py:782: in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) apache_beam/runners/worker/bundle_processor.py:837: in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) apache_beam/runners/worker/bundle_processor.py:836: in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( apache_beam/runners/worker/bundle_processor.py:726: in wrapper result = cache[args] = func(*args) apache_beam/runners/worker/bundle_processor.py:820: in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() apache_beam/runners/worker/bundle_processor.py:820: in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() apache_beam/runners/worker/bundle_processor.py:818: in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] apache_beam/runners/worker/bundle_processor.py:726: in wrapper result = cache[args] = func(*args) apache_beam/runners/worker/bundle_processor.py:823: in get_operation transform_id, transform_consumers) apache_beam/runners/worker/bundle_processor.py:1108: in create_operation return creator(self, transform_id, transform_proto, payload, consumers) apache_beam/runners/worker/bundle_processor.py:1341: in create_pair_with_restriction return _create_sdf_operation(PairWithRestriction, *args) apache_beam/runners/worker/bundle_processor.py:1404: in _create_sdf_operation parameter) apache_beam/runners/worker/bundle_processor.py:1501: in _create_pardo_operation output_coders = factory.get_output_coders(transform_proto) apache_beam/runners/worker/bundle_processor.py:1154: in get_output_coders pcoll_id in transform_proto.outputs.items() apache_beam/runners/worker/bundle_processor.py:1154: in <dictcomp> pcoll_id in transform_proto.outputs.items() apache_beam/runners/worker/bundle_processor.py:1139: in get_windowed_coder coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.runners.worker.bundle_processor.BeamTransformFactory object at 0x7f4248bf5160> coder_id = '' def get_coder(self, coder_id): # type: (str) -> coders.Coder if coder_id not in self.descriptor.coders: > raise KeyError("No such coder: %s" % coder_id) E KeyError: 'No such coder: ' apache_beam/runners/worker/bundle_processor.py:1128: KeyError
Attachments
Issue Links
- links to