Details
-
Bug
-
Status: Open
-
P1
-
Resolution: Unresolved
-
2.30.0, 2.31.0, 2.32.0
-
None
-
None
Description
This occurs in Dataflow when trying to deploy a workflow from Pubsub -> SlidingWindows -> beam.ParDo(KeyValues()) -> beam.GroupByKey -> beam.CombinePerKey
During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 633, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 462, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 862, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 919, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 918, in <listcomp> get_operation(transform_id))) for transform_id in sorted( File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation transform_id, transform_consumers) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1192, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1776, in create_combine_per_key_convert_to_accumulators factory, transform_id, transform_proto, payload, consumers, 'convert') File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1798, in _create_combine_phase_operation factory.context), [], {})) File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 186, in from_runner_api proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context) File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 160, in <lambda> unused_context: pickler.loads(proto.value)) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) TypeError: code() takes at most 15 arguments (16 given)