Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22124

The job finished without any exception if error was thrown during state access

    XMLWordPrintableJSON

Details

    Description

      For the following job:

      import logging
      
      from pyflink.common import WatermarkStrategy, Row
      from pyflink.common.serialization import Encoder
      from pyflink.common.typeinfo import Types
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
      from pyflink.datastream.execution_mode import RuntimeExecutionMode
      from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
      from pyflink.datastream.state import MapStateDescriptor
      
      
      env = StreamExecutionEnvironment.get_execution_environment()
      env.set_parallelism(2)
      env.set_runtime_mode(RuntimeExecutionMode.BATCH)
      
      seq_num_source = NumberSequenceSource(1, 1000)
      
      file_sink = FileSink \
          .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
                          Encoder.simple_string_encoder()) \
          .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
          .build()
      
      ds = env.from_source(
          source=seq_num_source,
          watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
          source_name='file_source',
          type_info=Types.LONG())
      
      
      class MyKeyedProcessFunction(KeyedProcessFunction):
      
          def __init__(self):
              self.state = None
      
          def open(self, runtime_context: RuntimeContext):
              logging.info("open")
              state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
              self.state = runtime_context.get_map_state(state_desc)
      
          def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
              existing = self.state.get(value[0])
              if existing is None:
                  result = value[1]
                  self.state.put(value[0], result)
              elif existing <= 10:
                  result = value[1] + existing
                  self.state.put(value[0], result)
              else:
                  result = existing
              yield result
      
      
      ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
          .key_by(lambda a: a[0]) \
          .process(MyKeyedProcessFunction(), Types.LONG()) \
          .sink_to(file_sink)
      
      env.execute('data_stream_batch_state')
      

      As it will encounter KeyError in `self.state.get(value[0])` if value[0] doesn't exist in the state, the job finished without any error message. This issue should be addressed. We should make sure the error message appears in the log file to help users to figure out what happens.

      Attachments

        Activity

          People

            hxbks2ks Huang Xingbo
            dian.fu Dian Fu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: