Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30637

In linux-aarch64 environment, using “is” judgment to match the window type of overwindow have returned incorrect matching results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.13.6
    • 1.17.0, 1.16.1, 1.15.4
    • API / Python
    • Linux version 5.10.0-60.18.0.50.oe2203.aarch64

      (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1 SMP Wed Mar 30 02:43:08 UTC 2022

       

      pyflink-version:1.13.6

    Description

      In  linux-arch64 environment, “window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING” in  in the PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source code has returned the wrong result.

      For example, when window_type is 6, it represents the window type of ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address of window_type has changed. It will lead to the wrong type of window, such as row sliding window, so, the wrong input data of python udf have been assembled and wrong results of that have appeared.

       

      Specifically, the pyflink unit testcase is ‘test_over_window_aggregate_function’ in ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I execute it by pytest on linux-aarch64 system. I cut this unit use case to the following code and executed it in the flink standalone mode of aarch64 system, and got the same error result:

       

      import unittest
      from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
      from pyflink.table.udf import udaf, AggregateFunction
      
      
      class MaxAdd(AggregateFunction, unittest.TestCase):
      
          def open(self, function_context):
              mg = function_context.get_metric_group()
              self.counter = mg.add_group("key", "value").counter("my_counter")
              self.counter_sum = 0
      
          def get_value(self, accumulator):
              # counter
              self.counter.inc(10)
              self.counter_sum += 10
              return accumulator[0]
      
          def create_accumulator(self):
              return []
      
          def accumulate(self, accumulator, *args):
              result = 0
              for arg in args:
                  result += arg.max()
              accumulator.append(result)
      
      
      @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
      def mean_udaf(v):
          import logging
          logging.error("debug")
          logging.error(v)
          return v.mean()
      
      
      t_env = TableEnvironment.create(
          EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build())
      t_env.get_config().get_configuration().set_string("parallelism.default", "2")
      t_env.get_config().get_configuration().set_string(
          "python.fn-execution.bundle.size", "1")
      
      import datetime
      
      t = t_env.from_elements(
          [
              (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
              (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
              (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0))
          ],
          DataTypes.ROW(
              [DataTypes.FIELD("a", DataTypes.TINYINT()),
               DataTypes.FIELD("b", DataTypes.SMALLINT()),
               DataTypes.FIELD("c", DataTypes.INT()),
               DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
      
      # sink
      t_env.execute_sql("""
              CREATE TABLE mySink (
                c INT,
                d FLOAT 
              ) WITH (
                'connector' = 'print'
              )
          """)
      
      t_env.create_temporary_system_function("mean_udaf", mean_udaf)
      t_env.register_function("max_add", udaf(MaxAdd(),
                                              result_type=DataTypes.INT(),
                                              func_type="pandas"))
      t_env.register_table("T", t)
      t_env.execute_sql("""
              insert into mySink
              select
               max_add(b, c)
               over (PARTITION BY a ORDER BY rowtime
               ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
               mean_udaf(b)
               over (PARTITION BY a ORDER BY rowtime
               ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING)
              from T
          """).wait()
      '''
      assert_equals(actual,
                    ["5,4.3333335",
                     "13,5.5",
                     "6,4.3333335"])
      '''

      The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' also represents 6, the following code from pyflink source code returned false.

      // pyflink\fn_execution\operations.py (line 273)
      elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING:
          # row unbounded following window
          window_start = window.lower_boundary
          for j in range(input_cnt):
              start = max(j + window_start, 0)
              series_slices = [s.iloc[start: input_cnt] for s in input_series]
              result.append(func(series_slices))

      And it It finally chose row sliding window to assemble input data of mean_udaf:

      // pyflink\fn_execution\operations.py (line 280) 
      else:
          # row sliding window
          window_start = window.lower_boundary
          window_end = window.upper_boundary
          for j in range(input_cnt):
              start = max(j + window_start, 0)
              end = min(j + window_end + 1, input_cnt)
              series_slices = [s.iloc[start: end] for s in input_series]
              result.append(func(series_slices))

      Obviously, that's not right. The right choice will be made in x86 environment.

      The reason is window_ type‘s memory address  is different from ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the contrary, they are the same in the linux-x86 environment. The reason why the memory address is different is unknown yet. But I observed that window_type comes from 'serialized_fn.windows':

      def __init__(self, spec): 
      super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) 
      self.windows = [window for window in self.spec.serialized_fn.windows]
      

      Perhaps grpc, protobuf dependencies or serialization operations in the arrch environment have affected the memory address of the int variables, I'll explore the underlying reasons later.

       

      Solution and suggestion:

      Since the window selections need to compare the values of two integer variables(window_type, OverWindow.ROW_ UNBOUNDED_ FOLLOWING), I recommend replacing ‘is’ with ‘==’ at the window type matching. That can also prevents erroneous results caused by python small integer object pool failure which may also affects the memory address. And this modification has been verified to perform correctly on both x86 and aarch64 environments, either this unit test case or the case I cut.

       

       

       

       

       

      Attachments

        Issue Links

          Activity

            People

              xinchen147 Xin Chen
              xinchen147 Xin Chen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: