Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12959

Dataflow error in CombinePerKey operation

Details

    • Bug
    • Status: Open
    • P1
    • Resolution: Unresolved
    • 2.30.0, 2.31.0, 2.32.0
    • None
    • sdk-py-core
    • None

    Description

      This occurs in Dataflow when trying to deploy a workflow from Pubsub -> SlidingWindows -> beam.ParDo(KeyValues()) -> beam.GroupByKey -> beam.CombinePerKey

      During handling of the above exception, another exception occurred:
      
      Traceback (most recent call last):
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
          response = task()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 633, in process_bundle
          instruction_id, request.process_bundle_descriptor_id)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 462, in get
          self.data_channel_factory)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 862, in __init__
          self.ops = self.create_execution_tree(self.process_bundle_descriptor)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 919, in create_execution_tree
          descriptor.transforms, key=topological_height, reverse=True)])
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 918, in <listcomp>
          get_operation(transform_id))) for transform_id in sorted(
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
          result = cache[args] = func(*args)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
          result = cache[args] = func(*args)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
          result = cache[args] = func(*args)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
          result = cache[args] = func(*args)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in get_operation
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 900, in <dictcomp>
          pcoll_id in descriptor.transforms[transform_id].outputs.items()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 898, in <listcomp>
          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 806, in wrapper
          result = cache[args] = func(*args)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 903, in get_operation
          transform_id, transform_consumers)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1192, in create_operation
          return creator(self, transform_id, transform_proto, payload, consumers)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1776, in create_combine_per_key_convert_to_accumulators
          factory, transform_id, transform_proto, payload, consumers, 'convert')
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1798, in _create_combine_phase_operation
          factory.context), [], {}))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 186, in from_runner_api
          proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py", line 160, in <lambda>
          unused_context: pickler.loads(proto.value))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 287, in loads
          return dill.loads(s)
        File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
          return load(file, ignore, **kwds)
        File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
          return Unpickler(file, ignore=ignore, **kwds).load()
        File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
          obj = StockUnpickler.load(self)
      TypeError: code() takes at most 15 arguments (16 given)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            eddiewang Eddie Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: