Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9132

State request handler is removed prematurely when closing ActiveBundle

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.20.0
    • java-fn-execution
    • None

    Description

      We have observed these errors in a state-intense application:

      Error processing instruction 107. Original traceback is
      Traceback (most recent call last):
        File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
        File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
        File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "redacted.py", line 56, in process
          recent_events_map = load_recent_events_map(recent_events_state)
        File "redacted.py", line 128, in _load_recent_events_map
          items_in_recent_events_bag = list(recent_events_state.read())
        File "apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
          for elem in self.first:
        File "apache_beam/runners/worker/bundle_processor.py", line 214, in __iter__
          self._state_key, self._coder_impl, is_cached=self._is_cached)
        File "apache_beam/runners/worker/sdk_worker.py", line 692, in blocking_get
          self._materialize_iter(state_key, coder))
        File "apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
          self._underlying.get_raw(state_key, continuation_token)
        File "apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
          continuation_token=continuation_token)))
        File "apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
          raise RuntimeError(response.error)
      RuntimeError: Unknown process bundle instruction id '107'
      

      Notice that the error is thrown on the Runner side. It seems to originate from the ActiveBundle de-registering the state request handler too early when the processing may still be going on in the SDK Harness.

      Attachments

        Activity

          People

            mxm Maximilian Michels
            mxm Maximilian Michels
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h 20m
                3h 20m