Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27223

State access doesn't work as expected when cache size is set to 0

    XMLWordPrintableJSON

Details

    Description

      For the following job:

      import json
      import logging
      import sys
      
      from pyflink.common import Types, Configuration
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.util.java_utils import get_j_env_configuration
      
      if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
      
          env = StreamExecutionEnvironment.get_execution_environment()
          config = Configuration(
              j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
          config.set_integer("python.state.cache-size", 0)
          env.set_parallelism(1)
      
          # define the source
          ds = env.from_collection(
              collection=[
                  (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
                  (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
                  (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
                  (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
              ],
              type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
          )
      
          # key by
          ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'],
                                    json.loads(data.info)['tel'])) \
                 .key_by(lambda data: data[0]).sum(1)
          ds.print()
          env.execute()
      

      The expected result should be:

      ('Germany', 123)
      ('China', 135)
      ('USA', 124)
      ('China', 167)
      

      However, the actual result is:

      ('Germany', 123)
      ('China', 135)
      ('USA', 124)
      ('China', 32)
      

      Attachments

        Issue Links

          Activity

            People

              dianfu Dian Fu
              dianfu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: