Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31099

Chained WindowOperator throws NPE in PyFlink ThreadMode

    XMLWordPrintableJSON

Details

    Description

      Test case

      config = Configuration()
      config.set_string("python.execution-mode", "process")
      env = StreamExecutionEnvironment.get_execution_environment(config)
      
      class MyTimestampAssigner(TimestampAssigner, ABC):
          def extract_timestamp(self, value: tuple, record_timestamp: int) -> int:
              return value[0]
      
      ds = env.from_collection(
          [(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1),
           (1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1),
           (1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)]
      ).assign_timestamps_and_watermarks(
          WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner())
      )
      ds.key_by(
          lambda x: (x[0], x[1], x[2])
      ).window(
          TumblingEventTimeWindows.of(Time.minutes(1))
      ).reduce(
          lambda x, y: (x[0], x[1], x[2], x[3] + y[3]),
          output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), Types.INT()])
      # ).filter(
      #     lambda x: x[1] == "a1"
      ).map(
          lambda x: (x[0], x[1], x[3]),
          output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()])
      ).print()
      env.execute()
      

      Attachments

        Issue Links

          Activity

            People

              hxbks2ks Huang Xingbo
              hxbks2ks Huang Xingbo
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: