Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21434

When UDAF return ROW type, and the number of fields is more than 14, the crash happend

    XMLWordPrintableJSON

Details

    Description

       Code(a simple udaf to return a Row containing 15 fields):

      from pyflink.common import Row
      from pyflink.table.udf import AggregateFunction, udaf
      from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
      
      class Test(AggregateFunction):
      
        def create_accumulator(self):
          return Row(0, 0)
      
        def get_value(self, accumulator):
          return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23,
                     1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23)
      
        def accumulate(self, accumulator, a, b):
          pass
      
        def get_result_type(self):
          return DataTypes.ROW([
              DataTypes.FIELD("f1", DataTypes.FLOAT()),
              DataTypes.FIELD("f2", DataTypes.FLOAT()),
              DataTypes.FIELD("f3", DataTypes.FLOAT()),
              DataTypes.FIELD("f4", DataTypes.FLOAT()),
              DataTypes.FIELD("f5", DataTypes.FLOAT()),
              DataTypes.FIELD("f6", DataTypes.FLOAT()),
              DataTypes.FIELD("f7", DataTypes.FLOAT()),
              DataTypes.FIELD("f8", DataTypes.FLOAT()),
              DataTypes.FIELD("f9", DataTypes.FLOAT()),
              DataTypes.FIELD("f10", DataTypes.FLOAT()),
              DataTypes.FIELD("f11", DataTypes.FLOAT()),
              DataTypes.FIELD("f12", DataTypes.FLOAT()),
              DataTypes.FIELD("f13", DataTypes.FLOAT()),
              DataTypes.FIELD("f14", DataTypes.FLOAT()),
              DataTypes.FIELD("f15", DataTypes.FLOAT())
          ])
      
        def get_accumulator_type(self):
          return DataTypes.ROW([
              DataTypes.FIELD("f1", DataTypes.BIGINT()),
              DataTypes.FIELD("f2", DataTypes.BIGINT())])
      
      
      def udaf_test():
        env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
        table_env = StreamTableEnvironment.create(environment_settings=env_settings)
        test = udaf(Test())
        table_env.execute_sql("""
            CREATE TABLE print_sink (
                `name` STRING,
                `agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT,
                          f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT,
                          f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT,
                          f13 FLOAT, f14 FLOAT, f15 FLOAT>
            ) WITH (
                'connector' = 'print'
            )
        """)
        table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name'])
        result_table = table.group_by(table.name)\
                            .select(table.name, test(table.value, table.count))
        result_table.execute_insert("print_sink").wait()
      
      
      if __name__ == "__main__":
        udaf_test()
      
      

      Exception:

      Caused by: java.io.EOFException
      	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
      	at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451)
      	at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72)
      	at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30)
      	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
      	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
      	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106
      

      Attachments

        Issue Links

          Activity

            People

              zhongwei Wei Zhong
              awayne awayne
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: