Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28896 [Umbrella] Test Flink Release 1.16
  3. FLINK-28920

Release Testing: Verify Python DataStream Window

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Done
    • 1.16.0
    • 1.16.0
    • API / Python

    Description

      • Build flink source code and compile source code
        $ cd {flink-source-code}
        $ mvn clean install -DskipTests
        
      • Prepare a Python Virtual Environment
      $ cd flink-python/dev
      $ ./lint-python.sh -s basic
      $ source .conda/bin/activate
      
      • Install PyFlink from source code. For more details, you can refer to the doc
        $ cd flink-python/apache-flink-libraries
        $ python setup.py sdist
        $ pip install dist/*.tar.gz
        $ cd ..
        $ pip install -r dev/dev-requirements.txt
        $ python setpy.py sdist
        $ pip install dist/*.tar.gz
        

      Test

      • Write a python datastream window job in thread mode. For details of Window, you can refer to the doc.
      from typing import Tuple
      
      from pyflink.common import Configuration
      from pyflink.common.time import Time
      from pyflink.common.typeinfo import Types
      from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.datastream.functions import AggregateFunction
      from pyflink.datastream.window import EventTimeSessionWindows
      
      
      class SecondColumnTimestampAssigner(TimestampAssigner):
      
          def extract_timestamp(self, value, record_timestamp) -> int:
              return int(value[1])
      
      
      def main():
          config = Configuration()
          # thread mode
          config.set_string("python.execution-mode", "thread")
          # process mode
          # config.set_string("python.execution-mode", "process")
          env = StreamExecutionEnvironment.get_execution_environment(config)
      
          data_stream = env.from_collection([
              ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
              type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: DataStream
          watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
              .with_timestamp_assigner(SecondColumnTimestampAssigner())
      
          class MyAggregateFunction(AggregateFunction):
      
              def create_accumulator(self) -> Tuple[int, str]:
                  return 0, ''
      
              def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
                  return value[1] + accumulator[0], value[0]
      
              def get_result(self, accumulator: Tuple[str, int]):
                  return accumulator[1], accumulator[0]
      
              def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
                  return acc_a[0] + acc_b[0], acc_a[1]
      
          ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
              .key_by(lambda x: x[0], key_type=Types.STRING()) \
              .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
              .aggregate(MyAggregateFunction(),
                         accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
                         output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
      
          ds.print()
          env.execute('test_window_aggregate_accumulator_type')
      
      
      if __name__ == '__main__':
          main()
      
      • run the python datastream window job and watch the result
        $ python demo.py
        

      Attachments

        Activity

          People

            ana4 Luning Wang
            hxbks2ks Huang Xingbo
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: