Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.15.0
-
None
Description
Have seen the following exception in a PyFlink job which runs in Flink 1.15. It happens occasionally and may indicate a bug of the state cache of MapState:
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 131: Traceback (most recent call last): File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle element.data) File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 170, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 417, in finish_bundle return self.group_agg_function.finish_bundle() File "pyflink/fn_execution/table/aggregate_fast.pyx", line 597, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle File "pyflink/fn_execution/table/aggregate_fast.pyx", line 652, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle File "pyflink/fn_execution/table/aggregate_fast.pyx", line 389, in pyflink.fn_execution.table.aggregate_fast.SimpleTableAggsHandleFunction.emit_value File "/tmp/pyflink/17360444-8c0b-46a5-90a4-689c376ea4ed/0e2967b5-181c-4663-bd7a-267d47509cf5/whms_dws_stock_python_sps_1_output.py", line 29, in emit_value File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/state_data_view.py", line 147, in get return self._map_state.get(key) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get return self.get_internal_state().get(key) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 773, in get self._state_key, map_key, self._map_key_encoder, self._map_value_decoder) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get cached_map_state.put(map_key, (exists, value)) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put super(CachedMapState, self).put(key, exists_and_value) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put self._on_evict(name, value) File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict self._cached_keys.remove(key) KeyError: 'SPAREPARTS_M11F010L4L1_01'