XMLWordPrintableJSON

Details

    Description

      Since pyflink has its own data cache on python side, it might still read the data from python side even TTL has expired.

      Scripts below could reproduce this:

      from pyflink.common.time import Time
      from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, ListStateDescriptor, MapStateDescriptor
      from pyflink.common.typeinfo import Types
      from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \
          EmbeddedRocksDBStateBackend
      import time
      from datetime import datetime
      
      def test_keyed_process_function_with_state():
          env = StreamExecutionEnvironment.get_execution_environment()
          env.get_config().set_auto_watermark_interval(2000)
          env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
          env.set_state_backend(EmbeddedRocksDBStateBackend())
          data_stream = env.from_collection([(1, 'hi', '1603708211000'),
                                                  (3, 'hi', '1603708226000'),
                                                  (10, 'hi', '1603708226000'),
                                                  (6, 'hello', '1603708293000')],
                                                 type_info=Types.ROW([Types.INT(), Types.STRING(),
                                                                      Types.STRING()]))
      
      
          class MyProcessFunction(KeyedProcessFunction):
      
              def __init__(self):
                  self.value_state = None
                  self.list_state = None
                  self.map_state = None
      
              def open(self, runtime_context: RuntimeContext):
                  state_ttl_config = StateTtlConfig \
                      .new_builder(Time.seconds(1)) \
                      .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
                      .never_return_expired() \
                      .build()
                  value_state_descriptor = ValueStateDescriptor('value_state', Types.INT())
                  value_state_descriptor.enable_time_to_live(state_ttl_config)
                  self.value_state = runtime_context.get_state(value_state_descriptor)
                  list_state_descriptor = ListStateDescriptor('list_state', Types.INT())
                  list_state_descriptor.enable_time_to_live(state_ttl_config)
                  self.list_state = runtime_context.get_list_state(list_state_descriptor)
                  map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), Types.STRING())
                  map_state_descriptor.enable_time_to_live(state_ttl_config)
                  self.map_state = runtime_context.get_map_state(map_state_descriptor)
      
              def process_element(self, value, ctx):
                  time.sleep(20)
                  current_value = self.value_state.value()
                  self.value_state.update(value[0])
                  current_list = [_ for _ in self.list_state.get()]
                  self.list_state.add(value[0])
                  map_entries_string = []
                  for k, v in self.map_state.items():
                      map_entries_string.append(str(k) + ': ' + str(v))
                  map_entries_string = '{' + ', '.join(map_entries_string) + '}'
                  self.map_state.put(value[0], value[1])
                  current_key = ctx.get_current_key()
                  yield "time: {}, current key: {}, current value state: {}, current list state: {}, " \
                        "current map state: {}, current value: {}".format(str(datetime.now().time()),
                                                                          str(current_key),
                                                                          str(current_value),
                                                                          str(current_list),
                                                                          map_entries_string,
                                                                          str(value))
      
              def on_timer(self, timestamp, ctx):
                  pass
      
      
          data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
              .process(MyProcessFunction(), output_type=Types.STRING()) \
              .print()
          env.execute('test time stamp assigner with keyed process function')
      
      if __name__ == '__main__':
          test_keyed_process_function_with_state()
      

      Attachments

        Issue Links

          Activity

            People

              hxbks2ks Huang Xingbo
              yunta Yun Tang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: