Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: None
    • Labels:
      None

      Description

      As a sub-task of FLINK-5826. We would like to support the ScalarFunction first and make the review a little bit easier.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user clarkyzl opened a pull request:

          https://github.com/apache/flink/pull/3389

          FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments

          Type: New Feature
          Priority: Major
          Components: table, udf, ScalarFunction
          Problem Definition: FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments
          Design:
          1. Modified the getSignature() method in UserDefinedFunctionUtils, made trailing style of variable arguments can be found. The "(TypeA a, Type B b, TypeC... c)" or "(a: TypeA, b: TypeB, c: TypeC*)" with annotation will pass the method.
          2. Modified the SqlOperandTypeChecker, made the count range of sql operands flexible. So it will pass the sql node validation of calcite.
          3. Modified the checkAndExtractEvalMethods() method, and throw a human readable VaidataionException if the user specified the variable arguments in Scala and forgot to add the "@varargs" annotation. Please see the discussion in FLINK-5826.
          Impact Analysis:
          It's a minor modification and it's a new feature. It impacts minimal in UDF.
          Test:
          Added both scala tests and java tests for all apis.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/clarkyzl/flink flink-5881

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3389.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3389


          commit 60b68fdd66f8021f6f090e7372987d43362d5ef3
          Author: Zhuoluo Yang <zhuoluo.yzl@alibaba-inc.com>
          Date: 2017-02-22T10:53:34Z

          FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3389 FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments Type: New Feature Priority: Major Components: table, udf, ScalarFunction Problem Definition: FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments Design: 1. Modified the getSignature() method in UserDefinedFunctionUtils, made trailing style of variable arguments can be found. The "(TypeA a, Type B b, TypeC... c)" or "(a: TypeA, b: TypeB, c: TypeC*)" with annotation will pass the method. 2. Modified the SqlOperandTypeChecker, made the count range of sql operands flexible. So it will pass the sql node validation of calcite. 3. Modified the checkAndExtractEvalMethods() method, and throw a human readable VaidataionException if the user specified the variable arguments in Scala and forgot to add the "@varargs" annotation. Please see the discussion in FLINK-5826 . Impact Analysis: It's a minor modification and it's a new feature. It impacts minimal in UDF. Test: Added both scala tests and java tests for all apis. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-5881 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3389.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3389 commit 60b68fdd66f8021f6f090e7372987d43362d5ef3 Author: Zhuoluo Yang <zhuoluo.yzl@alibaba-inc.com> Date: 2017-02-22T10:53:34Z FLINK-5881 [table] ScalarFunction(UDF) should support variable types and variable arguments
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102470461

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
          // go over all signatures and find one matching actual signature
          .find { curSig =>
          // match parameters of signature to actual parameters

          • actualSignature.length == curSig.length &&
            + (actualSignature.length == curSig.length &&
            curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - }

            + }) ||
            + // matching the style which last argument is variable, eg. "Type..." "Type*"
            + (actualSignature.length >= curSig.length &&
            + curSig.zipWithIndex.forall { case (clazz, i) =>
            + parameterTypeEquals(actualSignature, clazz) ||
            + (i == curSig.length - 1 && clazz.isArray)

              • End diff –

          Even if the last parameter type is an array type, it is possible not a varargs method. Such as `public void eval(int[] arrays)` , it is not a varargs method but a general method, we should add a test to check this feature not affect general array method.

          The best way to check whether a method is varargs is using `Method.isVarArgs()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102470461 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - } + }) || + // matching the style which last argument is variable, eg. "Type..." "Type*" + (actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature , clazz) || + (i == curSig.length - 1 && clazz.isArray) End diff – Even if the last parameter type is an array type, it is possible not a varargs method. Such as `public void eval(int[] arrays)` , it is not a varargs method but a general method, we should add a test to check this feature not affect general array method. The best way to check whether a method is varargs is using `Method.isVarArgs()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102448156

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var trailingArray = false
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + val trailingArg = signatures(signatures.length - 1)
          + if (trailingArg.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          else if (trailingArg.isArray)

          { + trailingArray = true + }

          + }
          + })
          + if (trailingSeq && !trailingArray) {
          + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..."
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " +
          — End diff –

          change `@varargs` to `@scala.annotation.varargs` will be more clear ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102448156 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } else if (trailingArg.isArray) { + trailingArray = true + } + } + }) + if (trailingSeq && !trailingArray) { + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." + throw new ValidationException("The 'eval' method do not support Scala type of " + + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " + — End diff – change `@varargs` to `@scala.annotation.varargs` will be more clear ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102475557

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
          .getOrElse(throw new CodeGenException("No matching signature found."))
          val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length) {
          + paramToOperands = paramToOperands :+ (matchingSignature.head, operands)
          — End diff –

          Why take head of `matchingSignature`? Isn't the vararg the last one of signatures ?

          And I think we should take the component type of the vararg array type to be the operand class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102475557 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands ) — End diff – Why take head of `matchingSignature`? Isn't the vararg the last one of signatures ? And I think we should take the component type of the vararg array type to be the operand class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102453140

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala —
          @@ -136,8 +136,18 @@ object ScalarSqlFunction {
          }

          override def getOperandCountRange: SqlOperandCountRange = {

          • val signatureLengths = signatures.map(_.length)
          • SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
            + var min = 255
            + var max = -1
            + signatures.foreach(sig => {
            + var len = sig.length
            + if (len > 0 && sig(sig.length -1).isArray) {
            + max = 254 // according to JVM spec 4.3.3
              • End diff –

          The varargs can only have 254 parameters ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102453140 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala — @@ -136,8 +136,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { val signatureLengths = signatures.map(_.length) SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) + var min = 255 + var max = -1 + signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { + max = 254 // according to JVM spec 4.3.3 End diff – The varargs can only have 254 parameters ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102448630

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala —
          @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
          }

          @Test
          + def testVariableArgs(): Unit = {
          + testAllApis(
          — End diff –

          Can you add a test to check whether zero-param can pass ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102448630 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala — @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } @Test + def testVariableArgs(): Unit = { + testAllApis( — End diff – Can you add a test to check whether zero-param can pass ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102465907

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
          // go over all signatures and find one matching actual signature
          .find { curSig =>
          // match parameters of signature to actual parameters

          • actualSignature.length == curSig.length &&
            + (actualSignature.length == curSig.length &&
            curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - }

            + }) ||
            + // matching the style which last argument is variable, eg. "Type..." "Type*"
            + (actualSignature.length >= curSig.length &&
            + curSig.zipWithIndex.forall { case (clazz, i) =>
            + parameterTypeEquals(actualSignature, clazz) ||
            + (i == curSig.length - 1 && clazz.isArray)

              • End diff –

          We should make sure that the component type of the array class is equal to all of the types of the last `actualSignature .length - curSig.length + 1` actual parameters.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102465907 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - } + }) || + // matching the style which last argument is variable, eg. "Type..." "Type*" + (actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature , clazz) || + (i == curSig.length - 1 && clazz.isArray) End diff – We should make sure that the component type of the array class is equal to all of the types of the last `actualSignature .length - curSig.length + 1` actual parameters.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102472512

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var trailingArray = false
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + val trailingArg = signatures(signatures.length - 1)
          + if (trailingArg.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          else if (trailingArg.isArray)

          { + trailingArray = true + }

          + }
          + })
          + if (trailingSeq && !trailingArray) {
          — End diff –

          The `eval` method can be overloaded. Such as :
          ```scala
          def eval(args: Array[Int]): Int = {}
          def eval(args: Int*): Int = {}
          ```
          In this case, the methods can satisfy the condition, but the exception should be thrown.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102472512 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } else if (trailingArg.isArray) { + trailingArray = true + } + } + }) + if (trailingSeq && !trailingArray) { — End diff – The `eval` method can be overloaded. Such as : ```scala def eval(args: Array [Int] ): Int = {} def eval(args: Int*): Int = {} ``` In this case, the methods can satisfy the condition, but the exception should be thrown.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102464697

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
          // go over all signatures and find one matching actual signature
          .find { curSig =>
          // match parameters of signature to actual parameters

          • actualSignature.length == curSig.length &&
            + (actualSignature.length == curSig.length &&
            curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - }

            + }) ||
            + // matching the style which last argument is variable, eg. "Type..." "Type*"
            + (actualSignature.length >= curSig.length &&

              • End diff –

          varargs can be empty, so `actualSignature.length` could be `curSig.length - 1`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102464697 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - } + }) || + // matching the style which last argument is variable, eg. "Type..." "Type*" + (actualSignature.length >= curSig.length && End diff – varargs can be empty, so `actualSignature.length` could be `curSig.length - 1`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102483144

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala —
          @@ -136,8 +136,18 @@ object ScalarSqlFunction {
          }

          override def getOperandCountRange: SqlOperandCountRange = {

          • val signatureLengths = signatures.map(_.length)
          • SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
            + var min = 255
            + var max = -1
            + signatures.foreach(sig => {
            + var len = sig.length
            + if (len > 0 && sig(sig.length -1).isArray) {
            + max = 254 // according to JVM spec 4.3.3
              • End diff –

          Hi @wuchong . Yes, according to JVM specification. It may have 255 parameters, if the method is static. If the method is not static, the pointer "this" will be one of the parameters, so it is 254.

          FYI, http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.3.3

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102483144 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala — @@ -136,8 +136,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { val signatureLengths = signatures.map(_.length) SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) + var min = 255 + var max = -1 + signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { + max = 254 // according to JVM spec 4.3.3 End diff – Hi @wuchong . Yes, according to JVM specification. It may have 255 parameters, if the method is static. If the method is not static, the pointer "this" will be one of the parameters, so it is 254. FYI, http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.3.3
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102627293

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
          // go over all signatures and find one matching actual signature
          .find { curSig =>
          // match parameters of signature to actual parameters

          • actualSignature.length == curSig.length &&
            + (actualSignature.length == curSig.length &&
            curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - }

            + }) ||
            + // matching the style which last argument is variable, eg. "Type..." "Type*"
            + (actualSignature.length >= curSig.length &&

              • End diff –

          Thanks @wuchong . I will do some tests and revisions to handle this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627293 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - } + }) || + // matching the style which last argument is variable, eg. "Type..." "Type*" + (actualSignature.length >= curSig.length && End diff – Thanks @wuchong . I will do some tests and revisions to handle this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102628665

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var trailingArray = false
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + val trailingArg = signatures(signatures.length - 1)
          + if (trailingArg.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          else if (trailingArg.isArray)

          { + trailingArray = true + }

          + }
          + })
          + if (trailingSeq && !trailingArray) {
          + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..."
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " +
          — End diff –

          Sure. Thanks @wuchong

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628665 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } else if (trailingArg.isArray) { + trailingArray = true + } + } + }) + if (trailingSeq && !trailingArray) { + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." + throw new ValidationException("The 'eval' method do not support Scala type of " + + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " + — End diff – Sure. Thanks @wuchong
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102626945

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
          .getOrElse(throw new CodeGenException("No matching signature found."))
          val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length) {
          + paramToOperands = paramToOperands :+ (matchingSignature.head, operands)
          — End diff –

          Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this situation. Since the max number of the arguments is 254. I don't think it is necessary to use a component type at the phase of code generation. I will try to add some tests to cover this situation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102626945 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands ) — End diff – Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this situation. Since the max number of the arguments is 254. I don't think it is necessary to use a component type at the phase of code generation. I will try to add some tests to cover this situation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102628111

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var trailingArray = false
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + val trailingArg = signatures(signatures.length - 1)
          + if (trailingArg.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          else if (trailingArg.isArray)

          { + trailingArray = true + }

          + }
          + })
          + if (trailingSeq && !trailingArray) {
          — End diff –

          If the users use the annotation `@scala.annotation.varargs`, Scala will generate two signatures of the method. One is `T eval(scala.collection.Seq<T> args)`, the other is `T eval(T[] args)`. A better idea is to compare every arguments of the signature. We can make sure either there is only one method `T eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq<T> args)` and `T eval(T[] args)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628111 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } else if (trailingArg.isArray) { + trailingArray = true + } + } + }) + if (trailingSeq && !trailingArray) { — End diff – If the users use the annotation `@scala.annotation.varargs`, Scala will generate two signatures of the method. One is `T eval(scala.collection.Seq<T> args)`, the other is `T eval(T[] args)`. A better idea is to compare every arguments of the signature. We can make sure either there is only one method `T eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq<T> args)` and `T eval(T[] args)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102628732

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala —
          @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
          }

          @Test
          + def testVariableArgs(): Unit = {
          + testAllApis(
          — End diff –

          Sure. Thanks @wuchong . As we discussed above, I will add more tests here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628732 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala — @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } @Test + def testVariableArgs(): Unit = { + testAllApis( — End diff – Sure. Thanks @wuchong . As we discussed above, I will add more tests here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102627751

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
          // go over all signatures and find one matching actual signature
          .find { curSig =>
          // match parameters of signature to actual parameters

          • actualSignature.length == curSig.length &&
            + (actualSignature.length == curSig.length &&
            curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - }

            + }) ||
            + // matching the style which last argument is variable, eg. "Type..." "Type*"
            + (actualSignature.length >= curSig.length &&
            + curSig.zipWithIndex.forall { case (clazz, i) =>
            + parameterTypeEquals(actualSignature, clazz) ||
            + (i == curSig.length - 1 && clazz.isArray)

              • End diff –

          Thanks @wuchong. They are good suggestions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627751 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) - } + }) || + // matching the style which last argument is variable, eg. "Type..." "Type*" + (actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature , clazz) || + (i == curSig.length - 1 && clazz.isArray) End diff – Thanks @wuchong. They are good suggestions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102632219

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
          .getOrElse(throw new CodeGenException("No matching signature found."))
          val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length) {
          + paramToOperands = paramToOperands :+ (matchingSignature.head, operands)
          — End diff –

          I will try to use `getComponentType()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102632219 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands ) — End diff – I will try to use `getComponentType()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on the issue:

          https://github.com/apache/flink/pull/3389

          If the following scenario happens,
          ```java
          public int eval(String a, int... b)

          { return b.length; }

          public String eval(String c)

          { return c; }

          ```
          It will throw a ValidationException

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 If the following scenario happens, ```java public int eval(String a, int... b) { return b.length; } public String eval(String c) { return c; } ``` It will throw a ValidationException
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102865002

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils {
          function: UserDefinedFunction,
          signature: Seq[TypeInformation[_]])
          : Option[Array[Class[_]]] = {

          • // We compare the raw Java classes not the TypeInformation.
          • // TypeInformation does not matter during runtime (e.g. within a MapFunction).
          • val actualSignature = typeInfoToClass(signature)
          • val signatures = getSignatures(function)
            -
          • signatures
          • // go over all signatures and find one matching actual signature
          • .find { curSig =>
          • // match parameters of signature to actual parameters
          • actualSignature.length == curSig.length &&
          • curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) - }
          • }
              • End diff –

          I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation [_] ]) : Option[Array[Class [_] ]] = { // We compare the raw Java classes not the TypeInformation. // TypeInformation does not matter during runtime (e.g. within a MapFunction). val actualSignature = typeInfoToClass(signature) val signatures = getSignatures(function) - signatures // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) - } } End diff – I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r102864763

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var noVargs = true
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + if (method.isVarArgs)

          { + noVargs = false + }

          else if (signatures.last.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          + }
          + })
          + if (trailingSeq && noVargs) {
          + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..."
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          — End diff –

          This is correct. Because if there is multiple methods found (override), it will throw another exception.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + if (method.isVarArgs) { + noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } + } + }) + if (trailingSeq && noVargs) { + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." + throw new ValidationException("The 'eval' method do not support Scala type of " + — End diff – This is correct. Because if there is multiple methods found (override), it will throw another exception.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r103366163

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala —
          @@ -114,7 +114,9 @@ object ScalarSqlFunction {

          inferredTypes.zipWithIndex.foreach {
          case (inferredType, i) =>

          • operandTypes = inferredType
            + if (operandTypes.length > 0) {
            + operandTypes = inferredType
              • End diff –

          If this is a varargs method, the inferredType is an array type. The operand type should be the component type of the array type, not the array type.

          And the `operandTypes.length > 0` condition is still not safe, say the method is `eval(String a, int... b)` and calling `eval("hello")`, the `operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An IndexOutOfBoundsException would be thrown as before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103366163 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala — @@ -114,7 +114,9 @@ object ScalarSqlFunction { inferredTypes.zipWithIndex.foreach { case (inferredType, i) => operandTypes = inferredType + if (operandTypes.length > 0) { + operandTypes = inferredType End diff – If this is a varargs method, the inferredType is an array type. The operand type should be the component type of the array type, not the array type. And the `operandTypes.length > 0` condition is still not safe, say the method is `eval(String a, int... b)` and calling `eval("hello")`, the `operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An IndexOutOfBoundsException would be thrown as before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r103361639

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var noVargs = true
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + if (method.isVarArgs)

          { + noVargs = false + }

          else if (signatures.last.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          + }
          + })
          + if (trailingSeq && noVargs) {
          + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..."
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          — End diff –

          If there is multiple eval methods found (not ambiguous), one is varargs, the other is not. It seems that no exception is thrown to tell users that the non-varargs eval method is not work.

          ```
          @varargs
          def eval(args: String*): String =

          {...}
          // no varargs annotation
          def eval(args: Int*): Int = {...}

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103361639 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + if (method.isVarArgs) { + noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } + } + }) + if (trailingSeq && noVargs) { + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." + throw new ValidationException("The 'eval' method do not support Scala type of " + — End diff – If there is multiple eval methods found (not ambiguous), one is varargs, the other is not. It seems that no exception is thrown to tell users that the non-varargs eval method is not work. ``` @varargs def eval(args: String*): String = {...} // no varargs annotation def eval(args: Int*): Int = {...} ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r103369181

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + var trailingSeq = false
          + var noVargs = true
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty) {
          + if (method.isVarArgs)

          { + noVargs = false + }

          else if (signatures.last.getName.equals("scala.collection.Seq"))

          { + trailingSeq = true + }

          + }
          + })
          + if (trailingSeq && noVargs) {
          + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..."
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          — End diff –

          I think one of the important thing here is to check type by type.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103369181 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty) { + if (method.isVarArgs) { + noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { + trailingSeq = true + } + } + }) + if (trailingSeq && noVargs) { + // We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." + throw new ValidationException("The 'eval' method do not support Scala type of " + — End diff – I think one of the important thing here is to check type by type.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r103632469

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala —
          @@ -112,9 +112,16 @@ object ScalarSqlFunction {
          .getParameterTypes(foundSignature)
          .map(typeFactory.createTypeFromTypeInfo)

          • inferredTypes.zipWithIndex.foreach {
          • case (inferredType, i) =>
          • operandTypes = inferredType
            + operandTypes.zipWithIndex.foreach {
            + case (_, i) =>
            + if (i < inferredTypes.length - 1) { + operandTypes(i) = inferredTypes(i) + }

            else if (null != inferredTypes.last.getComponentType)

            { + // last arguments is a collection, the array type + operandTypes(i) = inferredTypes.last.getComponentType + }

            else

            { + operandTypes(i) = inferredTypes.last + }
              • End diff –

          The logic here has also been changed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632469 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala — @@ -112,9 +112,16 @@ object ScalarSqlFunction { .getParameterTypes(foundSignature) .map(typeFactory.createTypeFromTypeInfo) inferredTypes.zipWithIndex.foreach { case (inferredType, i) => operandTypes = inferredType + operandTypes.zipWithIndex.foreach { + case (_, i) => + if (i < inferredTypes.length - 1) { + operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { + // last arguments is a collection, the array type + operandTypes(i) = inferredTypes.last.getComponentType + } else { + operandTypes(i) = inferredTypes.last + } End diff – The logic here has also been changed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r103632353

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty &&
          + signatures.last.getName.equals("scala.collection.Seq") &&
          + // If users specified an @varargs, Scala will generate two methods indeed.
          + // If there does not exists corresponding varargs method of the Seq method,
          + // we will throw an ValidationException.
          + (!methods.exists(m => {
          + val sigs = m.getParameterTypes
          + m.isVarArgs &&
          + sigs.length == signatures.length &&
          + sigs.zipWithIndex.forall

          { case (sig, i) => + i == sigs.length - 1 || sig.equals(signatures(i)) + }

          + }))) {
          + throw new ValidationException("The 'eval' method do not support Scala type of " +
          — End diff –

          I've updated the patch and refactored the logic.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632353 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && + signatures.last.getName.equals("scala.collection.Seq") && + // If users specified an @varargs, Scala will generate two methods indeed. + // If there does not exists corresponding varargs method of the Seq method, + // we will throw an ValidationException. + (!methods.exists(m => { + val sigs = m.getParameterTypes + m.isVarArgs && + sigs.length == signatures.length && + sigs.zipWithIndex.forall { case (sig, i) => + i == sigs.length - 1 || sig.equals(signatures(i)) + } + }))) { + throw new ValidationException("The 'eval' method do not support Scala type of " + — End diff – I've updated the patch and refactored the logic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3389

          Thanks for the update @clarkyzl. I will also take a look at this soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3389 Thanks for the update @clarkyzl. I will also take a look at this soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104581688

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty &&
          + signatures.last.getName.equals("scala.collection.Seq") &&
          + // If users specified an @varargs, Scala will generate two methods indeed.
          + // If there does not exists corresponding varargs method of the Seq method,
          — End diff –

          If there does not exists corresponding varargs annotation of the eval method

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104581688 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && + signatures.last.getName.equals("scala.collection.Seq") && + // If users specified an @varargs, Scala will generate two methods indeed. + // If there does not exists corresponding varargs method of the Seq method, — End diff – If there does not exists corresponding varargs annotation of the eval method
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104584170

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + methods.foreach(method => {
          — End diff –

          Can we move this check to a method like verifyScalaVarargsAnnotation ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104584170 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { — End diff – Can we move this check to a method like verifyScalaVarargsAnnotation ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104595467

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala —
          @@ -136,8 +143,18 @@ object ScalarSqlFunction {
          }

          override def getOperandCountRange: SqlOperandCountRange = {

          • val signatureLengths = signatures.map(_.length)
          • SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
            + var min = 255
            + var max = -1
            + signatures.foreach(sig => {
            + var len = sig.length
            + if (len > 0 && sig(sig.length -1).isArray) {
              • End diff –

          please add space before 1

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104595467 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala — @@ -136,8 +143,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { val signatureLengths = signatures.map(_.length) SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) + var min = 255 + var max = -1 + signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { End diff – please add space before 1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104597703

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
          operands: Seq[GeneratedExpression])
          : GeneratedExpression = {
          // determine function signature and result class

          • val matchingSignature = getSignature(scalarFunction, signature)
            + val matchingMethod = getEvalMethod(scalarFunction, signature)
            .getOrElse(throw new CodeGenException("No matching signature found."))
            + val matchingSignature = matchingMethod.getParameterTypes
            val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length
          — End diff –

          I think this while loop can be replaced by calling `zipAll`

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104597703 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq [GeneratedExpression] ) : GeneratedExpression = { // determine function signature and result class val matchingSignature = getSignature(scalarFunction, signature) + val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) + val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length — End diff – I think this while loop can be replaced by calling `zipAll`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3389

          Hi @twalthr can you also take a look at this PR soon, since you are quite familiar with these language things. Would be great if you can offer some comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3389 Hi @twalthr can you also take a look at this PR soon, since you are quite familiar with these language things. Would be great if you can offer some comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104600115

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
          operands: Seq[GeneratedExpression])
          : GeneratedExpression = {
          // determine function signature and result class

          • val matchingSignature = getSignature(scalarFunction, signature)
            + val matchingMethod = getEvalMethod(scalarFunction, signature)
            .getOrElse(throw new CodeGenException("No matching signature found."))
            + val matchingSignature = matchingMethod.getParameterTypes
            val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length
          — End diff –

          Forget zipAll, i think you can write codes like this instead of a while loop
          ```
          if (operands.length > matchingSignature.length)

          { operands.drop(matchingSignature.length).foreach(op => paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)) }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104600115 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq [GeneratedExpression] ) : GeneratedExpression = { // determine function signature and result class val matchingSignature = getSignature(scalarFunction, signature) + val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) + val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length — End diff – Forget zipAll, i think you can write codes like this instead of a while loop ``` if (operands.length > matchingSignature.length) { operands.drop(matchingSignature.length).foreach(op => paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)) } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104605350

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils

          { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") }

          else {
          + methods.foreach(method => {
          + val signatures = method.getParameterTypes
          + if (signatures.nonEmpty &&
          + signatures.last.getName.equals("scala.collection.Seq") &&
          + // If users specified an @varargs, Scala will generate two methods indeed.
          + // If there does not exists corresponding varargs method of the Seq method,
          — End diff –

          Typo.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104605350 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && + signatures.last.getName.equals("scala.collection.Seq") && + // If users specified an @varargs, Scala will generate two methods indeed. + // If there does not exists corresponding varargs method of the Seq method, — End diff – Typo.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on the issue:

          https://github.com/apache/flink/pull/3389

          Thanks @wuchong, @twalthr and @KurtYoung . I've pushed a reviewed patch as @KurtYoung 's recent reviews.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Thanks @wuchong, @twalthr and @KurtYoung . I've pushed a reviewed patch as @KurtYoung 's recent reviews.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104609713

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala —
          @@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
          operands: Seq[GeneratedExpression])
          : GeneratedExpression = {
          // determine function signature and result class

          • val matchingSignature = getSignature(scalarFunction, signature)
            + val matchingMethod = getEvalMethod(scalarFunction, signature)
            .getOrElse(throw new CodeGenException("No matching signature found."))
            + val matchingSignature = matchingMethod.getParameterTypes
            val resultClass = getResultTypeClass(scalarFunction, matchingSignature)

          + // zip for variable signatures
          + var paramToOperands = matchingSignature.zip(operands)
          + var i = paramToOperands.length
          + while (i < operands.length
          — End diff –

          Sure. I think there will be something similar in https://github.com/apache/flink/pull/3407 (FLINK-5882)

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104609713 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala — @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq [GeneratedExpression] ) : GeneratedExpression = { // determine function signature and result class val matchingSignature = getSignature(scalarFunction, signature) + val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) + val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) + // zip for variable signatures + var paramToOperands = matchingSignature.zip(operands) + var i = paramToOperands.length + while (i < operands.length — End diff – Sure. I think there will be something similar in https://github.com/apache/flink/pull/3407 ( FLINK-5882 )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104614583

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils {
          val evalMethods = checkAndExtractEvalMethods(function)

          val filtered = evalMethods

          • // go over all eval methods and find one matching
          • .filter { cur =>
          • val signatures = cur.getParameterTypes
          • // match parameters of signature to actual parameters
          • (actualSignature.length >= signatures.length &&
          • actualSignature.zipWithIndex.forall { case (clazz, i) =>
          • (i < signatures.length && parameterTypeEquals(clazz, signatures)) ||
          • (i >= signatures.length - 1 && cur.isVarArgs &&
            + // go over all eval methods and filter out one and only one matching
            + .filter {
            + case cur if !cur.isVarArgs =>
            + val signatures = cur.getParameterTypes
            + // match parameters of signature to actual par(ameters
            + actualSignature.length == signatures.length &&
            + signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + }

            + case cur if cur.isVarArgs =>
            + val signatures = cur.getParameterTypes
            + actualSignature.zipWithIndex.forall { case (clazz, i) =>
            + (i < signatures.length - 1 &&

              • End diff –

          we can also move the condition check about `i` into case pattern to make the codes more clear, like:
          ```
          case (clazz, i) if (i < signatures.length - 1 ) =>
          // ...
          case (clazz, i) if (i >= signatures.length - 1) =>
          // ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104614583 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils { val evalMethods = checkAndExtractEvalMethods(function) val filtered = evalMethods // go over all eval methods and find one matching .filter { cur => val signatures = cur.getParameterTypes // match parameters of signature to actual parameters (actualSignature.length >= signatures.length && actualSignature.zipWithIndex.forall { case (clazz, i) => (i < signatures.length && parameterTypeEquals(clazz, signatures )) || (i >= signatures.length - 1 && cur.isVarArgs && + // go over all eval methods and filter out one and only one matching + .filter { + case cur if !cur.isVarArgs => + val signatures = cur.getParameterTypes + // match parameters of signature to actual par(ameters + actualSignature.length == signatures.length && + signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + } + case cur if cur.isVarArgs => + val signatures = cur.getParameterTypes + actualSignature.zipWithIndex.forall { case (clazz, i) => + (i < signatures.length - 1 && End diff – we can also move the condition check about `i` into case pattern to make the codes more clear, like: ``` case (clazz, i) if (i < signatures.length - 1 ) => // ... case (clazz, i) if (i >= signatures.length - 1) => // ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r104614715

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils {
          val evalMethods = checkAndExtractEvalMethods(function)

          val filtered = evalMethods

          • // go over all eval methods and find one matching
          • .filter { cur =>
          • val signatures = cur.getParameterTypes
          • // match parameters of signature to actual parameters
          • (actualSignature.length >= signatures.length &&
          • actualSignature.zipWithIndex.forall { case (clazz, i) =>
          • (i < signatures.length && parameterTypeEquals(clazz, signatures)) ||
          • (i >= signatures.length - 1 && cur.isVarArgs &&
            + // go over all eval methods and filter out one and only one matching
            + .filter {
            + case cur if !cur.isVarArgs =>
            + val signatures = cur.getParameterTypes
            + // match parameters of signature to actual par(ameters
            + actualSignature.length == signatures.length &&
            + signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + }

            + case cur if cur.isVarArgs =>
            + val signatures = cur.getParameterTypes
            + actualSignature.zipWithIndex.forall

            { case (clazz, i) => + (i < signatures.length - 1 && + parameterTypeEquals(clazz, signatures(i))) || + (i >= signatures.length - 1 && parameterTypeEquals(clazz, signatures.last.getComponentType)) - }

            ) ||

          • // match empty variable arguments
          • (actualSignature.length == signatures.length - 1 && cur.isVarArgs)
            + } || (actualSignature.isEmpty && signatures.length == 1)
              • End diff –

          move this to a new line

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104614715 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils { val evalMethods = checkAndExtractEvalMethods(function) val filtered = evalMethods // go over all eval methods and find one matching .filter { cur => val signatures = cur.getParameterTypes // match parameters of signature to actual parameters (actualSignature.length >= signatures.length && actualSignature.zipWithIndex.forall { case (clazz, i) => (i < signatures.length && parameterTypeEquals(clazz, signatures )) || (i >= signatures.length - 1 && cur.isVarArgs && + // go over all eval methods and filter out one and only one matching + .filter { + case cur if !cur.isVarArgs => + val signatures = cur.getParameterTypes + // match parameters of signature to actual par(ameters + actualSignature.length == signatures.length && + signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + } + case cur if cur.isVarArgs => + val signatures = cur.getParameterTypes + actualSignature.zipWithIndex.forall { case (clazz, i) => + (i < signatures.length - 1 && + parameterTypeEquals(clazz, signatures(i))) || + (i >= signatures.length - 1 && parameterTypeEquals(clazz, signatures.last.getComponentType)) - } ) || // match empty variable arguments (actualSignature.length == signatures.length - 1 && cur.isVarArgs) + } || (actualSignature.isEmpty && signatures.length == 1) End diff – move this to a new line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3389

          Thanks @clarkyzl. I will review the PR tomorrow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3389 Thanks @clarkyzl. I will review the PR tomorrow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r105182198

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala —
          @@ -221,3 +223,18 @@ class Func13(prefix: String) extends ScalarFunction {
          }
          }

          +object Func14 extends ScalarFunction {
          +
          + @varargs
          + def eval(a: Int*): Int =

          { + a.sum + }

          +}
          +
          +object Func15 extends ScalarFunction {
          — End diff –

          This does not work:

          ```
          object Func15 extends ScalarFunction {

          @varargs
          def eval(a: String, b: Int*): String =

          { a + b.length }

          def eval(a: String): String =

          { a }

          }
          ```

          It leads to `Found multiple 'eval' methods which match the signature.`

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105182198 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala — @@ -221,3 +223,18 @@ class Func13(prefix: String) extends ScalarFunction { } } +object Func14 extends ScalarFunction { + + @varargs + def eval(a: Int*): Int = { + a.sum + } +} + +object Func15 extends ScalarFunction { — End diff – This does not work: ``` object Func15 extends ScalarFunction { @varargs def eval(a: String, b: Int*): String = { a + b.length } def eval(a: String): String = { a } } ``` It leads to `Found multiple 'eval' methods which match the signature.`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r105186593

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils {
          s"Function class '$

          {function.getClass.getCanonicalName}

          ' does not implement at least " +
          s"one method named 'eval' which is public, not abstract and " +
          s"(in case of table functions) not static.")

          • } else { - methods }

            +
            + verifyScalaVarargsAnnotation(methods)
            + methods
            + }
            +
            + /**
            + * If users specified an @varargs, Scala will generate two methods indeed.
            + * If there does not exist corresponding varargs method of the Seq method,
            + * we will throw an ValidationException.
            + */
            + def verifyScalaVarargsAnnotation(methods: Array[Method]) = {
            + methods.foreach(method => {
            + val signatures = method.getParameterTypes
            + if (signatures.nonEmpty &&
            + signatures.last.getName.equals("scala.collection.Seq") &&

              • End diff –

          I think this condition is too strict. It wouldn't allow `Seq`s in functions which is one of the most important classes in Scala. This does not work right now:

          ```
          object Func16 extends ScalarFunction {

          def eval(a: Seq[String]): String =

          { a.mkString(", ") }

          }
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105186593 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils { s"Function class '$ {function.getClass.getCanonicalName} ' does not implement at least " + s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { - methods } + + verifyScalaVarargsAnnotation(methods) + methods + } + + /** + * If users specified an @varargs, Scala will generate two methods indeed. + * If there does not exist corresponding varargs method of the Seq method, + * we will throw an ValidationException. + */ + def verifyScalaVarargsAnnotation(methods: Array [Method] ) = { + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && + signatures.last.getName.equals("scala.collection.Seq") && End diff – I think this condition is too strict. It wouldn't allow `Seq`s in functions which is one of the most important classes in Scala. This does not work right now: ``` object Func16 extends ScalarFunction { def eval(a: Seq [String] ): String = { a.mkString(", ") } } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3389#discussion_r105187427

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
          @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils {
          s"Function class '$

          {function.getClass.getCanonicalName}

          ' does not implement at least " +
          s"one method named 'eval' which is public, not abstract and " +
          s"(in case of table functions) not static.")

          • } else { - methods }

            +
            + verifyScalaVarargsAnnotation(methods)
            + methods
            + }
            +
            + /**
            + * If users specified an @varargs, Scala will generate two methods indeed.
            + * If there does not exist corresponding varargs method of the Seq method,
            + * we will throw an ValidationException.
            + */
            + def verifyScalaVarargsAnnotation(methods: Array[Method]) = {
            + methods.foreach(method => {
            + val signatures = method.getParameterTypes
            + if (signatures.nonEmpty &&
            + signatures.last.getName.equals("scala.collection.Seq") &&

              • End diff –

          Can you add a test data to `UserDefinedScalaFunctionTest` with
          ```
          data: testData.setField(9, Seq("Hello", "World"))
          typeInfo: org.apache.flink.api.scala.createTypeInformation[Seq[String]]
          test: testAllApis(
          Func16('f9),
          "Func15(f9)",
          "Func15(f9)",
          "Hello, World"
          )
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105187427 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils { s"Function class '$ {function.getClass.getCanonicalName} ' does not implement at least " + s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { - methods } + + verifyScalaVarargsAnnotation(methods) + methods + } + + /** + * If users specified an @varargs, Scala will generate two methods indeed. + * If there does not exist corresponding varargs method of the Seq method, + * we will throw an ValidationException. + */ + def verifyScalaVarargsAnnotation(methods: Array [Method] ) = { + methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && + signatures.last.getName.equals("scala.collection.Seq") && End diff – Can you add a test data to `UserDefinedScalaFunctionTest` with ``` data: testData.setField(9, Seq("Hello", "World")) typeInfo: org.apache.flink.api.scala.createTypeInformation[Seq [String] ] test: testAllApis( Func16('f9), "Func15(f9)", "Func15(f9)", "Hello, World" ) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user clarkyzl commented on the issue:

          https://github.com/apache/flink/pull/3389

          Hi @twalthr . I've updated the patch as your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Hi @twalthr . I've updated the patch as your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3389

          Thanks for the update @clarkyzl. The code looks good. I will merge this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3389 Thanks for the update @clarkyzl. The code looks good. I will merge this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3389

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3389
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 9b179beaea2b623ad3637e417f6d8014b696d038

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 9b179beaea2b623ad3637e417f6d8014b696d038

            People

            • Assignee:
              clarkyzl Zhuoluo Yang
              Reporter:
              clarkyzl Zhuoluo Yang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development