Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27776

Throw exception when UDAF used in sliding window does not implement merge method in PyFlink

    XMLWordPrintableJSON

Details

    Description

      We use the pane state to optimize the result of calculating the window state, which requires udaf to implement the merge method. However, due to the lack of detection of whether the merge method of udaf is implemented, the user's output result did not meet his expectations and there is no exception. Below is an example of a UDAF that implements the merge method:

      class SumAggregateFunction(AggregateFunction):
      
          def get_value(self, accumulator):
              return accumulator[0]
      
          def create_accumulator(self):
              return [0]
      
          def accumulate(self, accumulator, *args):
              accumulator[0] = accumulator[0] + args[0]
      
          def retract(self, accumulator, *args):
              accumulator[0] = accumulator[0] - args[0]
      
          def merge(self, accumulator, accumulators):
              for other_acc in accumulators:
                  accumulator[0] = accumulator[0] + other_acc[0]
      
          def get_accumulator_type(self):
              return DataTypes.ARRAY(DataTypes.BIGINT())
      
          def get_result_type(self):
              return DataTypes.BIGINT()
      

      Attachments

        Issue Links

          Activity

            People

              hxbks2ks Huang Xingbo
              hxbks2ks Huang Xingbo
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: