Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.13.6
-
Linux version 5.10.0-60.18.0.50.oe2203.aarch64
(abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1 SMP Wed Mar 30 02:43:08 UTC 2022
pyflink-version:1.13.6
Description
In linux-arch64 environment, “window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source code has returned the wrong result.
For example, when window_type is 6, it represents the window type of ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address of window_type has changed. It will lead to the wrong type of window, such as row sliding window, so, the wrong input data of python udf have been assembled and wrong results of that have appeared.
Specifically, the pyflink unit testcase is ‘test_over_window_aggregate_function’ in ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I execute it by pytest on linux-aarch64 system. I cut this unit use case to the following code and executed it in the flink standalone mode of aarch64 system, and got the same error result:
import unittest from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings from pyflink.table.udf import udaf, AggregateFunction class MaxAdd(AggregateFunction, unittest.TestCase): def open(self, function_context): mg = function_context.get_metric_group() self.counter = mg.add_group("key", "value").counter("my_counter") self.counter_sum = 0 def get_value(self, accumulator): # counter self.counter.inc(10) self.counter_sum += 10 return accumulator[0] def create_accumulator(self): return [] def accumulate(self, accumulator, *args): result = 0 for arg in args: result += arg.max() accumulator.append(result) @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") def mean_udaf(v): import logging logging.error("debug") logging.error(v) return v.mean() t_env = TableEnvironment.create( EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()) t_env.get_config().get_configuration().set_string("parallelism.default", "2") t_env.get_config().get_configuration().set_string( "python.fn-execution.bundle.size", "1") import datetime t = t_env.from_elements( [ (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)) ], DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.TINYINT()), DataTypes.FIELD("b", DataTypes.SMALLINT()), DataTypes.FIELD("c", DataTypes.INT()), DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))])) # sink t_env.execute_sql(""" CREATE TABLE mySink ( c INT, d FLOAT ) WITH ( 'connector' = 'print' ) """) t_env.create_temporary_system_function("mean_udaf", mean_udaf) t_env.register_function("max_add", udaf(MaxAdd(), result_type=DataTypes.INT(), func_type="pandas")) t_env.register_table("T", t) t_env.execute_sql(""" insert into mySink select max_add(b, c) over (PARTITION BY a ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING), mean_udaf(b) over (PARTITION BY a ORDER BY rowtime ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) from T """).wait() ''' assert_equals(actual, ["5,4.3333335", "13,5.5", "6,4.3333335"]) '''
The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' also represents 6, the following code from pyflink source code returned false.
// pyflink\fn_execution\operations.py (line 273) elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING: # row unbounded following window window_start = window.lower_boundary for j in range(input_cnt): start = max(j + window_start, 0) series_slices = [s.iloc[start: input_cnt] for s in input_series] result.append(func(series_slices))
And it It finally chose row sliding window to assemble input data of mean_udaf:
// pyflink\fn_execution\operations.py (line 280) else: # row sliding window window_start = window.lower_boundary window_end = window.upper_boundary for j in range(input_cnt): start = max(j + window_start, 0) end = min(j + window_end + 1, input_cnt) series_slices = [s.iloc[start: end] for s in input_series] result.append(func(series_slices))
Obviously, that's not right. The right choice will be made in x86 environment.
The reason is window_ type‘s memory address is different from ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the contrary, they are the same in the linux-x86 environment. The reason why the memory address is different is unknown yet. But I observed that window_type comes from 'serialized_fn.windows':
def __init__(self, spec): super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) self.windows = [window for window in self.spec.serialized_fn.windows]
Perhaps grpc, protobuf dependencies or serialization operations in the arrch environment have affected the memory address of the int variables, I'll explore the underlying reasons later.
Solution and suggestion:
Since the window selections need to compare the values of two integer variables(window_type, OverWindow.ROW_ UNBOUNDED_ FOLLOWING), I recommend replacing ‘is’ with ‘==’ at the window type matching. That can also prevents erroneous results caused by python small integer object pool failure which may also affects the memory address. And this modification has been verified to perform correctly on both x86 and aarch64 environments, either this unit test case or the case I cut.
Attachments
Issue Links
- links to