Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10019

singleton row type objects returned from sub query cannot be chained with other operators

Details

    Description

      If explicitly return a CompositeType in udf.getResultType, will result in some failures in chained operators.
      For example: consider a simple UDF,

      object Func extends ScalarFunction {
        def eval(row: Row): Row = {
          row
        }
        override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
          Array(Types.ROW(Types.INT))
        override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
          Types.ROW(Types.INT)
      }
      

      This should work perfectly since it's just a simple pass through, however

        @Test
        def testRowType(): Unit = {
          val data = List(
            Row.of(Row.of(12.asInstanceOf[Integer]), "1")
          )
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING))
      
          val tEnv = TableEnvironment.getTableEnvironment(env)
          val table = stream.toTable(tEnv, 'a, 'b)
          tEnv.registerFunction("func", Func)
          tEnv.registerTable("t", table)
      
          // This works perfectly
          val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
          result1.addSink(new StreamITCase.StringSink[Row])
      
          // This throws exception
          val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row]
          result2.addSink(new StreamITCase.StringSink[Row])
      
          env.execute()
        }
      

      Exception code:

      java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
      
      	at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
      	at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
      	at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
      	at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
      	at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
      ...
      

      This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType.

      Attachments

        Issue Links

          Activity

            rongr Rong Rong added a comment - - edited

            Dug a little deeper in Calcite and found that in:
            https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

            Seems like if the return type is inferred as a record, or .isStruct() == true, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: AS(func(a), "myRow") only passes over the func(a) for type inference, but not the alias "myRow"

            rongr Rong Rong added a comment - - edited Dug a little deeper in Calcite and found that in: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68 Seems like if the return type is inferred as a record, or .isStruct() == true , "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: AS(func(a), "myRow") only passes over the func(a) for type inference, but not the alias "myRow"
            flink-jira-bot Flink Jira Bot added a comment -

            This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

            flink-jira-bot Flink Jira Bot added a comment - This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.
            flink-jira-bot Flink Jira Bot added a comment -

            This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work.

            flink-jira-bot Flink Jira Bot added a comment - This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work.
            godfreyhe godfrey he added a comment -

            feel free to reopen this issue if anyone reproduce this bug on latest version

            godfreyhe godfrey he added a comment - feel free to reopen this issue if anyone reproduce this bug on latest version

            People

              Unassigned Unassigned
              rongr Rong Rong
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m