Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10696

unable to explicitly use element coder in CombiningValueStateSpec constructor

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: P2
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: sdk-py-core
    • Labels:

      Description

      looks like the example:
       COUNT_STATE = CombiningValueStateSpec('count',
      VarIntCoder(),
      combiners.SumCombineFn())
      in https://beam.apache.org/blog/timely-processing/ doesn't work because of coder error.

      Error message from worker: generic::unknown: Traceback (most recent call last):
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute
          response = task()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
          op.finish()
        File "apache_beam/runners/worker/operations.py", line 697, in apache_beam.runners.worker.operations.DoOperation.finish
        File "apache_beam/runners/worker/operations.py", line 699, in apache_beam.runners.worker.operations.DoOperation.finish
        File "apache_beam/runners/worker/operations.py", line 702, in apache_beam.runners.worker.operations.DoOperation.finish
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 775, in commit
          state.commit()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 492, in commit
          self._underlying_bag_state.commit()
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 553, in commit
          self._state_key, self._value_coder.get_impl(), self._added_elements)
        File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1012, in extend
          coder.encode_to_stream(element, out, True)
        File "apache_beam/coders/coder_impl.py", line 777, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
        File "apache_beam/coders/coder_impl.py", line 779, in apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
      TypeError: an integer is required
      

      The actual element type here was a list of int, not just int.

      The API document mentions that

      coder (Coder): Coder specifying how to encode the values to be combined.
        May be inferred.
      

      which is seemingly wrong.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              heejong Heejong Lee
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: