Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31043

KeyError exception is thrown in CachedMapState

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.15.0
    • 1.17.0, 1.15.4, 1.16.2
    • API / Python
    • None

    Description

      Have seen the following exception in a PyFlink job which runs in Flink 1.15. It happens occasionally and may indicate a bug of the state cache of MapState:

      Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 131: Traceback (most recent call last):
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
          response = task()
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
          element.data)
        File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
        File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 170, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 417, in finish_bundle
          return self.group_agg_function.finish_bundle()
        File "pyflink/fn_execution/table/aggregate_fast.pyx", line 597, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
        File "pyflink/fn_execution/table/aggregate_fast.pyx", line 652, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
        File "pyflink/fn_execution/table/aggregate_fast.pyx", line 389, in pyflink.fn_execution.table.aggregate_fast.SimpleTableAggsHandleFunction.emit_value
        File "/tmp/pyflink/17360444-8c0b-46a5-90a4-689c376ea4ed/0e2967b5-181c-4663-bd7a-267d47509cf5/whms_dws_stock_python_sps_1_output.py", line 29, in emit_value
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/state_data_view.py", line 147, in get
          return self._map_state.get(key)
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
          return self.get_internal_state().get(key)
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 773, in get
          self._state_key, map_key, self._map_key_encoder, self._map_value_decoder)
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
          cached_map_state.put(map_key, (exists, value))
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
          super(CachedMapState, self).put(key, exists_and_value)
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
          self._on_evict(name, value)
        File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
          self._cached_keys.remove(key)
      KeyError: 'SPAREPARTS_M11F010L4L1_01'
      

      Attachments

        Activity

          People

            hxb Xingbo Huang
            dianfu Dian Fu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: