Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31272

Duplicate operators appear in the StreamGraph for Python DataStream API jobs

    XMLWordPrintableJSON

Details

    • Hide
      It may produce duplicate operators for Python DataStream API jobs of versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1. It has addressed this issue since 1.15.4, 1.16.2 and 1.17.0. For jobs which are not affected by this issue, there are no backward compatibility issues. However, for jobs which are affected, it may not be possible to restore from savepoints generated from versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1.
      Show
      It may produce duplicate operators for Python DataStream API jobs of versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1. It has addressed this issue since 1.15.4, 1.16.2 and 1.17.0. For jobs which are not affected by this issue, there are no backward compatibility issues. However, for jobs which are affected, it may not be possible to restore from savepoints generated from versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1.

    Description

      For the following job:

      import argparse
      import json
      import sys
      import time
      from typing import Iterable, cast
      
      from pyflink.common import Types, Time, Encoder
      from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
          PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, ExternalizedCheckpointCleanup
      from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig
      from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
      from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, TumblingProcessingTimeWindows, \
          ProcessingTimeTrigger
      
      
      class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
      
          def __init__(self, window_size: int):
              self._window_size = window_size
              self._count_state_descriptor = ReducingStateDescriptor(
                  "count", lambda a, b: a + b, Types.LONG())
      
          @staticmethod
          def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
              return CountWithProcessTimeoutTrigger(window_size)
      
          def on_element(self,
                         element: T,
                         timestamp: int,
                         window: TimeWindow,
                         ctx: 'Trigger.TriggerContext') -> TriggerResult:
              count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor))
              count_state.add(1)
              # print("element arrive:", element, "count_state:", count_state.get(), window.max_timestamp(),
              #       ctx.get_current_watermark())
      
              if count_state.get() >= self._window_size:  # 必须fire&purge!!!!
                  print("fire element count", element, count_state.get(), window.max_timestamp(),
                        ctx.get_current_watermark())
                  count_state.clear()
                  return TriggerResult.FIRE_AND_PURGE
              if timestamp >= window.end:
                  count_state.clear()
                  return TriggerResult.FIRE_AND_PURGE
              else:
                  return TriggerResult.CONTINUE
      
          def on_processing_time(self,
                                 timestamp: int,
                                 window: TimeWindow,
                                 ctx: Trigger.TriggerContext) -> TriggerResult:
              if timestamp >= window.end:
                  return TriggerResult.CONTINUE
              else:
                  print("fire with process_time:", timestamp)
                  count_state = cast(ReducingState, ctx.get_partitioned_state(self._count_state_descriptor))
                  count_state.clear()
                  return TriggerResult.FIRE_AND_PURGE
      
          def on_event_time(self,
                            timestamp: int,
                            window: TimeWindow,
                            ctx: 'Trigger.TriggerContext') -> TriggerResult:
              return TriggerResult.CONTINUE
      
          def clear(self,
                    window: TimeWindow,
                    ctx: 'Trigger.TriggerContext') -> None:
              count_state = ctx.get_partitioned_state(self._count_state_descriptor)
              count_state.clear()
      
      
      def to_dict_map(v):
          time.sleep(1)
          dict_value = json.loads(v)
          return dict_value
      
      
      def get_group_key(value, keys):
          group_key_values = []
          for key in keys:
              one_key_value = 'null'
              if key in value:
                  list_value = value[key]
                  if list_value:
                      one_key_value = str(list_value[0])
              group_key_values.append(one_key_value)
          group_key = '_'.join(group_key_values)
          # print("group_key=", group_key)
          return group_key
      
      
      class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, TimeWindow]):
      
          def __init__(self, uf):
              self._user_function = uf
      
          def process(self,
                      key: str,
                      context: ProcessWindowFunction.Context[TimeWindow],
                      elements: Iterable[dict]) -> Iterable[dict]:
              result_list = self._user_function.process_after_group_by_function(elements)
              return result_list
      
      
      if __name__ == '__main__':
          parser = argparse.ArgumentParser()
          parser.add_argument(
              '--output',
              dest='output',
              required=False,
              help='Output file to write results to.')
      
          argv = sys.argv[1:]
          known_args, _ = parser.parse_known_args(argv)
          output_path = known_args.output
      
          env = StreamExecutionEnvironment.get_execution_environment()
          # write all the data to one file
          env.set_parallelism(1)
      
          # process time
          env.get_config().set_auto_watermark_interval(0)
          state_backend = EmbeddedRocksDBStateBackend(True)
          state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
          env.set_state_backend(state_backend)
          config = env.get_checkpoint_config()
          # config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
          config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
          config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
          config.set_checkpoint_interval(5000)
          config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
      
          # define the source
          data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
                                              '{"user_id": ["1"], "goods_id": [1,0]}',
                                              '{"user_id": ["2"], "goods_id": [2,0]}',
                                              '{"user_id": ["1"], "goods_id": [3,0]}',
                                              '{"user_id": ["2"], "goods_id": [4,0]}',
                                              '{"user_id": ["1"], "goods_id": [5,0]}',
                                              '{"user_id": ["2"], "goods_id": [6,0]}',
                                              '{"user_id": ["1"], "goods_id": [7,0]}',
                                              '{"user_id": ["2"], "goods_id": [8,0]}',
                                              '{"user_id": ["1"], "goods_id": [9,0]}',
                                              '{"user_id": ["2"], "goods_id": [10,0]}',
                                              '{"user_id": ["1"], "goods_id": [11,0]}',
                                              '{"user_id": ["2"], "goods_id": [12,0]}',
                                              '{"user_id": ["1"], "goods_id": [13,0]}',
                                              '{"user_id": ["2"], "goods_id": [14,0]}',
                                              '{"user_id": ["1"], "goods_id": [15,0]}',
                                              '{"user_id": ["2"], "goods_id": [16,0]}',
                                              '{"user_id": ["1"], "goods_id": [17,0]}',
                                              '{"user_id": ["2"], "goods_id": [18,0]}',
                                              '{"user_id": ["1"], "goods_id": [19,0]}',
                                              '{"user_id": ["2"], "goods_id": [20,0]}',
                                              '{"user_id": ["1"], "goods_id": [21,0]}',
                                              '{"user_id": ["2"], "goods_id": [22,0]}',
                                              '{"user_id": ["1"], "goods_id": [23,0]}',
                                              '{"user_id": ["2"], "goods_id": [24,0]}',
                                              '{"user_id": ["1"], "goods_id": [25,0]}',
                                              '{"user_id": ["2"], "goods_id": [26,0]}',
                                              '{"user_id": ["1"], "goods_id": [27,0]}',
                                              '{"user_id": ["2"], "goods_id": [28,0]}',
                                              '{"user_id": ["1"], "goods_id": [29,0]}',
                                              '{"user_id": ["2"], "goods_id": [30,0]}'])
      
          data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
                                              '{"user_id": ["1"], "goods_id": [1,0]}',
                                              '{"user_id": ["2"], "goods_id": [2,0]}',
                                              '{"user_id": ["1"], "goods_id": [3,0]}',
                                              '{"user_id": ["2"], "goods_id": [4,0]}',
                                              '{"user_id": ["1"], "goods_id": [5,0]}',
                                              '{"user_id": ["2"], "goods_id": [6,0]}',
                                              '{"user_id": ["1"], "goods_id": [7,0]}',
                                              '{"user_id": ["2"], "goods_id": [8,0]}',
                                              '{"user_id": ["1"], "goods_id": [9,0]}',
                                              '{"user_id": ["2"], "goods_id": [10,0]}',
                                              '{"user_id": ["1"], "goods_id": [11,0]}',
                                              '{"user_id": ["2"], "goods_id": [12,0]}',
                                              '{"user_id": ["1"], "goods_id": [13,0]}',
                                              '{"user_id": ["2"], "goods_id": [14,0]}',
                                              '{"user_id": ["1"], "goods_id": [15,0]}',
                                              '{"user_id": ["2"], "goods_id": [16,0]}',
                                              '{"user_id": ["1"], "goods_id": [17,0]}',
                                              '{"user_id": ["2"], "goods_id": [18,0]}',
                                              '{"user_id": ["1"], "goods_id": [19,0]}',
                                              '{"user_id": ["2"], "goods_id": [20,0]}',
                                              '{"user_id": ["1"], "goods_id": [21,0]}',
                                              '{"user_id": ["2"], "goods_id": [22,0]}',
                                              '{"user_id": ["1"], "goods_id": [23,0]}',
                                              '{"user_id": ["2"], "goods_id": [24,0]}',
                                              '{"user_id": ["1"], "goods_id": [25,0]}',
                                              '{"user_id": ["2"], "goods_id": [26,0]}',
                                              '{"user_id": ["1"], "goods_id": [27,0]}',
                                              '{"user_id": ["2"], "goods_id": [28,0]}',
                                              '{"user_id": ["1"], "goods_id": [29,0]}',
                                              '{"user_id": ["2"], "goods_id": [30,0]}'])
      
          # group_keys = ['user_id', 'goods_id']
          group_keys = ['user_id']
      
          sink_to_file_flag = True
      
          data_stream = data_stream1.union(data_stream2)
      
          # user_function = __import__("UserFunction")
      
          ds = data_stream.map(lambda v: to_dict_map(v)) \
              .filter(lambda v: v) \
              .map(lambda v: v) \
              .key_by(lambda v: get_group_key(v, group_keys)) \
              .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
              .process(CountWindowProcessFunction(lambda v: v), Types.STRING())
      
          ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))
      
          base_path = "/tmp/1.txt"
          encoder = Encoder.simple_string_encoder()
          file_sink_builder = FileSink.for_row_format(base_path, encoder)
          file_sink = file_sink_builder \
              .with_bucket_check_interval(1000) \
              .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
              .with_output_file_config(
              OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build()) \
              .build()
          ds.sink_to(file_sink)
      
          # submit for execution
          env.execute()
      

      The stream graph is as following:

      {
        "nodes" : [ {
          "id" : 1,
          "type" : "Source: Collection Source",
          "pact" : "Data Source",
          "contents" : "Source: Collection Source",
          "parallelism" : 1
        }, {
          "id" : 2,
          "type" : "Source: Collection Source",
          "pact" : "Data Source",
          "contents" : "Source: Collection Source",
          "parallelism" : 1
        }, {
          "id" : 9,
          "type" : "TumblingProcessingTimeWindows",
          "pact" : "Operator",
          "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), ProcessingTimeTrigger, CountWindowProcessFunction)",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 15,
            "ship_strategy" : "HASH",
            "side" : "second"
          } ]
        }, {
          "id" : 10,
          "type" : "Map",
          "pact" : "Operator",
          "contents" : "Map",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 9,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 15,
          "type" : "Map, Filter, Map, _stream_key_by_map_operator",
          "pact" : "Operator",
          "contents" : "Map, Filter, Map, _stream_key_by_map_operator",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 1,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          }, {
            "id" : 2,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 16,
          "type" : "TumblingProcessingTimeWindows, Map",
          "pact" : "Operator",
          "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), ProcessingTimeTrigger, CountWindowProcessFunction)",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 15,
            "ship_strategy" : "HASH",
            "side" : "second"
          } ]
        }, {
          "id" : 18,
          "type" : "Sink: Writer",
          "pact" : "Operator",
          "contents" : "Sink: Writer",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 10,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 20,
          "type" : "Sink: Committer",
          "pact" : "Operator",
          "contents" : "Sink: Committer",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 18,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        } ]
      }
      

      The plan is incorrect as we can see that TumblingProcessingTimeWindows appears twice.

      Attachments

        Issue Links

          Activity

            People

              dianfu Dian Fu
              dianfu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: