Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.13.0
Description
For the following job:
import logging from pyflink.common import WatermarkStrategy, Row from pyflink.common.serialization import Encoder from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource from pyflink.datastream.execution_mode import RuntimeExecutionMode from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import MapStateDescriptor env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(2) env.set_runtime_mode(RuntimeExecutionMode.BATCH) seq_num_source = NumberSequenceSource(1, 1000) file_sink = FileSink \ .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state', Encoder.simple_string_encoder()) \ .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \ .build() ds = env.from_source( source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='file_source', type_info=Types.LONG()) class MyKeyedProcessFunction(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): logging.info("open") state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG()) self.state = runtime_context.get_map_state(state_desc) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): existing = self.state.get(value[0]) if existing is None: result = value[1] self.state.put(value[0], result) elif existing <= 10: result = value[1] + existing self.state.put(value[0], result) else: result = existing yield result ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \ .key_by(lambda a: a[0]) \ .process(MyKeyedProcessFunction(), Types.LONG()) \ .sink_to(file_sink) env.execute('data_stream_batch_state')
As it will encounter KeyError in `self.state.get(value[0])` if value[0] doesn't exist in the state, the job finished without any error message. This issue should be addressed. We should make sure the error message appears in the log file to help users to figure out what happens.