Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5571

add open and close methods for UserDefinedFunction in TableAPI & SQL

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Currently, a User Defined Function (UDF) in table API & SQL works on zero, one, or multiple values in custom evaluation method. Many UDFs need more complex features, e.g. report metrics, get parameters from job configuration, or get extra data from distribute cache file, etc. Adding open and close methods in UserDefinedFunction class can solve this problem. The code cloud look like:

      trait UserDefinedFunction {
      
        def open(context: UDFContext): Unit = {}
      
        def close(): Unit = {}
      }
      

      UDFContext contains the information about metric reporters, job parameters, distribute cache, etc. The code cloud look like:

      class UDFContext(context: RuntimeContext) {
      
        def getMetricGroup: MetricGroup = ???
      
        def getDistributedCacheFile(name: String): File = ???
      
        def getJobParameter(key: String, default: String): String = ???
      }
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user godfreyhe opened a pull request:

          https://github.com/apache/flink/pull/3176

          FLINK-5571 [table] add open and close methods for UserDefinedFunction

          Currently, a User Defined Function (UDF) in table API & SQL works on zero, one, or multiple values in custom evaluation method. Many UDFs need more complex features, e.g. report metrics, get parameters from job configuration, or get extra data from distribute cache file, etc. Adding open and close methods in UserDefinedFunction class can solve this problem.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/godfreyhe/flink udf-open-close

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3176.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3176


          commit 4ac2be7b3ab4dc0e8a18e43c79a031d7a16ee1ea
          Author: godfreyhe <godfreyhe@163.com>
          Date: 2017-01-20T06:42:12Z

          add open and close methods for UserDefinedFunction


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user godfreyhe opened a pull request: https://github.com/apache/flink/pull/3176 FLINK-5571 [table] add open and close methods for UserDefinedFunction Currently, a User Defined Function (UDF) in table API & SQL works on zero, one, or multiple values in custom evaluation method. Many UDFs need more complex features, e.g. report metrics, get parameters from job configuration, or get extra data from distribute cache file, etc. Adding open and close methods in UserDefinedFunction class can solve this problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/godfreyhe/flink udf-open-close Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3176 commit 4ac2be7b3ab4dc0e8a18e43c79a031d7a16ee1ea Author: godfreyhe <godfreyhe@163.com> Date: 2017-01-20T06:42:12Z add open and close methods for UserDefinedFunction
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r97043055

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala —
          @@ -0,0 +1,106 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.api.scala.util.CollectionDataSets
          +import org.apache.flink.api.scala.

          {ExecutionEnvironment, _}

          +import org.apache.flink.table.api.TableEnvironment
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils
          +import org.apache.flink.table.utils.

          {RichTableFunc0, RichTableFunc1}

          +import org.apache.flink.test.util.TestBaseUtils
          +import org.apache.flink.types.Row
          +import org.junit.Test
          +
          +import scala.collection.JavaConverters._
          +
          +class UserDefinedTableFunctionITCase {
          +
          + @Test
          + def testOpenClose(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc0", new RichTableFunc0) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = + "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test
          + def testSingleUDTFWithParameter(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc1", new RichTableFunc1) + UDFTestUtils.setJobParameters(env, Map("word_separator" -> " ")) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)" + + val ds = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,Hello\n3,world" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test
          + def testMultiUDTFs(): Unit = {
          — End diff –

          Actually, this can't test multiple RichTableFunction in one generated function. The RichTableFunc0 will join with t1 first, and then RichTableFunc1 will join with the result of previous.

          I will suggest to test UDTF with UDF , such as `RIchTableFunc0(RichScalarFunc0(c))`, to test multiple RichUserDefinedFunction in one generated FlatMapFunction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r97043055 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala — @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala. {ExecutionEnvironment, _} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils +import org.apache.flink.table.utils. {RichTableFunc0, RichTableFunc1} +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit.Test + +import scala.collection.JavaConverters._ + +class UserDefinedTableFunctionITCase { + + @Test + def testOpenClose(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc0", new RichTableFunc0) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = + "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSingleUDTFWithParameter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc1", new RichTableFunc1) + UDFTestUtils.setJobParameters(env, Map("word_separator" -> " ")) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)" + + val ds = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,Hello\n3,world" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testMultiUDTFs(): Unit = { — End diff – Actually, this can't test multiple RichTableFunction in one generated function. The RichTableFunc0 will join with t1 first, and then RichTableFunc1 will join with the result of previous. I will suggest to test UDTF with UDF , such as `RIchTableFunc0(RichScalarFunc0(c))`, to test multiple RichUserDefinedFunction in one generated FlatMapFunction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r97037393

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala —
          @@ -0,0 +1,52 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.functions
          +
          +import java.io.File
          +
          +import org.apache.flink.annotation.PublicEvolving
          +import org.apache.flink.api.common.functions.RuntimeContext
          +import org.apache.flink.metrics.MetricGroup
          +
          +class UDFContext(context: RuntimeContext) {
          +
          + /**
          + * Returns the metric group for this parallel subtask.
          + */
          + @PublicEvolving
          + def getMetricGroup: MetricGroup = context.getMetricGroup
          +
          + /**
          + * Get the local temporary file copies of distributed cache files
          + */
          + def getDistributedCacheFile(name: String): File = context.getDistributedCache.getFile(name)
          +
          + /**
          + * Get the global job parameter
          + * which is set by ExecutionEnvironment.getConfig.setGlobalJobParameters()
          + */
          + @PublicEvolving
          + def getJobParameter(key: String, default: String): String = {
          + context.getExecutionConfig.getGlobalJobParameters match {
          — End diff –

          I think a simple if else is better than Scala pattern match here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r97037393 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala — @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.io.File + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.metrics.MetricGroup + +class UDFContext(context: RuntimeContext) { + + /** + * Returns the metric group for this parallel subtask. + */ + @PublicEvolving + def getMetricGroup: MetricGroup = context.getMetricGroup + + /** + * Get the local temporary file copies of distributed cache files + */ + def getDistributedCacheFile(name: String): File = context.getDistributedCache.getFile(name) + + /** + * Get the global job parameter + * which is set by ExecutionEnvironment.getConfig.setGlobalJobParameters() + */ + @PublicEvolving + def getJobParameter(key: String, default: String): String = { + context.getExecutionConfig.getGlobalJobParameters match { — End diff – I think a simple if else is better than Scala pattern match here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r97033852

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -220,56 +247,105 @@ class CodeGenerator(
          // manual casting here
          val samHeader =
          // FlatMapFunction

          • if (clazz == classOf[FlatMapFunction[_,_]]) {
            + if (clazz == classOf[FlatMapFunction[_, _]]) {
            + val baseClass = if (generatedRichFunctions) { + classOf[RichFlatMapFunction[_, _]] + }

            else

            { + classOf[FlatMapFunction[_, _]] + }

            val inputTypeTerm = boxedTypeTermForTypeInfo(input1)

          • (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
            + (baseClass,
            + s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
            List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
            }

          // MapFunction

          • else if (clazz == classOf[MapFunction[_,_]]) {
            + else if (clazz == classOf[MapFunction[_, _]]) {
            + val baseClass = if (generatedRichFunctions) { + classOf[RichMapFunction[_, _]] + }

            else

            { + classOf[MapFunction[_, _]] + }

            val inputTypeTerm = boxedTypeTermForTypeInfo(input1)

          • ("Object map(Object _in1)",
            + (baseClass,
            + "Object map(Object _in1)",
            List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
            }

          // FlatJoinFunction

          • else if (clazz == classOf[FlatJoinFunction[_,_,_]]) {
            + else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
            + val baseClass = if (generatedRichFunctions) { + classOf[RichFlatJoinFunction[_, _, _]] + }

            else

            { + classOf[FlatJoinFunction[_, _, _]] + }

            val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
            val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(

          • throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
          • (s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
            + throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
            + (baseClass,
            + s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
            List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
          • s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
            + s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
            }
            else { // TODO more functions throw new CodeGenException("Unsupported Function.") }
          • val funcCode = j"""
          • public class $funcName
          • implements $ {clazz.getCanonicalName}

            {
            + val funcCode = if (generatedRichFunctions) {

              • End diff –

          There is a lot of duplicate code between RichFunction codegen and non-RichFunction codegen. The only difference between them is the open close method code, so I think it would be better to *insert* open close code when it is a RichFunction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r97033852 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -220,56 +247,105 @@ class CodeGenerator( // manual casting here val samHeader = // FlatMapFunction if (clazz == classOf[FlatMapFunction [_,_] ]) { + if (clazz == classOf[FlatMapFunction [_, _] ]) { + val baseClass = if (generatedRichFunctions) { + classOf[RichFlatMapFunction[_, _]] + } else { + classOf[FlatMapFunction[_, _]] + } val inputTypeTerm = boxedTypeTermForTypeInfo(input1) (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + (baseClass, + s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } // MapFunction else if (clazz == classOf[MapFunction [_,_] ]) { + else if (clazz == classOf[MapFunction [_, _] ]) { + val baseClass = if (generatedRichFunctions) { + classOf[RichMapFunction[_, _]] + } else { + classOf[MapFunction[_, _]] + } val inputTypeTerm = boxedTypeTermForTypeInfo(input1) ("Object map(Object _in1)", + (baseClass, + "Object map(Object _in1)", List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) } // FlatJoinFunction else if (clazz == classOf[FlatJoinFunction [_,_,_] ]) { + else if (clazz == classOf[FlatJoinFunction [_, _, _] ]) { + val baseClass = if (generatedRichFunctions) { + classOf[RichFlatJoinFunction[_, _, _]] + } else { + classOf[FlatJoinFunction[_, _, _]] + } val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) (s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", + throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) + (baseClass, + s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) } else { // TODO more functions throw new CodeGenException("Unsupported Function.") } val funcCode = j""" public class $funcName implements $ {clazz.getCanonicalName} { + val funcCode = if (generatedRichFunctions) { End diff – There is a lot of duplicate code between RichFunction codegen and non-RichFunction codegen. The only difference between them is the open close method code, so I think it would be better to * insert * open close code when it is a RichFunction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r97038745

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          The `UserDefinedFunction` should be declared as `abstract class` now. Because trait is recognized as interface (without default implementation) in Java, so Java users have to implement `open()` and `close()` if `UserDefinedFunction` is a trait.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r97038745 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – The `UserDefinedFunction` should be declared as `abstract class` now. Because trait is recognized as interface (without default implementation) in Java, so Java users have to implement `open()` and `close()` if `UserDefinedFunction` is a trait.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r97036095

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -220,56 +247,105 @@ class CodeGenerator(
          // manual casting here
          val samHeader =
          // FlatMapFunction

          • if (clazz == classOf[FlatMapFunction[_,_]]) {
            + if (clazz == classOf[FlatMapFunction[_, _]]) {
              • End diff –

          I would like to determine the baseClass by the `clazz` parameter not by the `generatedRichFunctions` boolean flag. Users call the `generateFunction(desc, classOf[FlatMapFunction[Any, Any]], body, returnType)` but returned a RichFlatMapFunction, which is wired and uncontrollable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r97036095 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -220,56 +247,105 @@ class CodeGenerator( // manual casting here val samHeader = // FlatMapFunction if (clazz == classOf[FlatMapFunction [_,_] ]) { + if (clazz == classOf[FlatMapFunction [_, _] ]) { End diff – I would like to determine the baseClass by the `clazz` parameter not by the `generatedRichFunctions` boolean flag. Users call the `generateFunction(desc, classOf[FlatMapFunction [Any, Any] ], body, returnType)` but returned a RichFlatMapFunction, which is wired and uncontrollable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r99532146

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -220,56 +247,105 @@ class CodeGenerator(
          // manual casting here
          val samHeader =
          // FlatMapFunction

          • if (clazz == classOf[FlatMapFunction[_,_]]) {
            + if (clazz == classOf[FlatMapFunction[_, _]]) {
              • End diff –

          We can not choose `FlatMapFunction` or `RichFlatMapFunction` in `translateToPlan` method, unless We know whether the current rexNode contains `UserDefinedFunction`s. However CodeGenerator is a kind of RexVisitor and know `UserDefinedFunction` when `generateExpression` called.
          Besides, `RichFlatMapFunction` implements `FlatMapFunction`, so It's reasonable that Users call the generateFunction(desc, classOf[FlatMapFunction[Any, Any]], body, returnType) and returned a RichFlatMapFunction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r99532146 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -220,56 +247,105 @@ class CodeGenerator( // manual casting here val samHeader = // FlatMapFunction if (clazz == classOf[FlatMapFunction [_,_] ]) { + if (clazz == classOf[FlatMapFunction [_, _] ]) { End diff – We can not choose `FlatMapFunction` or `RichFlatMapFunction` in `translateToPlan` method, unless We know whether the current rexNode contains `UserDefinedFunction`s. However CodeGenerator is a kind of RexVisitor and know `UserDefinedFunction` when `generateExpression` called. Besides, `RichFlatMapFunction` implements `FlatMapFunction`, so It's reasonable that Users call the generateFunction(desc, classOf[FlatMapFunction [Any, Any] ], body, returnType) and returned a RichFlatMapFunction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r99544629

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          I test successfully in Java without `open` and `close` methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r99544629 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – I test successfully in Java without `open` and `close` methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r99548048

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala —
          @@ -0,0 +1,106 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.api.scala.util.CollectionDataSets
          +import org.apache.flink.api.scala.

          {ExecutionEnvironment, _}

          +import org.apache.flink.table.api.TableEnvironment
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils
          +import org.apache.flink.table.utils.

          {RichTableFunc0, RichTableFunc1}

          +import org.apache.flink.test.util.TestBaseUtils
          +import org.apache.flink.types.Row
          +import org.junit.Test
          +
          +import scala.collection.JavaConverters._
          +
          +class UserDefinedTableFunctionITCase {
          +
          + @Test
          + def testOpenClose(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc0", new RichTableFunc0) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = + "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test
          + def testSingleUDTFWithParameter(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc1", new RichTableFunc1) + UDFTestUtils.setJobParameters(env, Map("word_separator" -> " ")) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)" + + val ds = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,Hello\n3,world" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test
          + def testMultiUDTFs(): Unit = {
          — End diff –

          yes, each `RichTableFunction` will generate independent FlatMap function. And I think this test is also meaningful. I will add cases to test UDTF with UDF later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r99548048 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala — @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala. {ExecutionEnvironment, _} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils +import org.apache.flink.table.utils. {RichTableFunc0, RichTableFunc1} +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit.Test + +import scala.collection.JavaConverters._ + +class UserDefinedTableFunctionITCase { + + @Test + def testOpenClose(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc0", new RichTableFunc0) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = + "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSingleUDTFWithParameter(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.registerFunction("RichTableFunc1", new RichTableFunc1) + UDFTestUtils.setJobParameters(env, Map("word_separator" -> " ")) + + val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)" + + val ds = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,Hello\n3,world" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testMultiUDTFs(): Unit = { — End diff – yes, each `RichTableFunction` will generate independent FlatMap function. And I think this test is also meaningful. I will add cases to test UDTF with UDF later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r99566482

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -220,56 +247,105 @@ class CodeGenerator(
          // manual casting here
          val samHeader =
          // FlatMapFunction

          • if (clazz == classOf[FlatMapFunction[_,_]]) {
            + if (clazz == classOf[FlatMapFunction[_, _]]) {
              • End diff –

          Make sense to me. I missed the case of `ScalarFunction` before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r99566482 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -220,56 +247,105 @@ class CodeGenerator( // manual casting here val samHeader = // FlatMapFunction if (clazz == classOf[FlatMapFunction [_,_] ]) { + if (clazz == classOf[FlatMapFunction [_, _] ]) { End diff – Make sense to me. I missed the case of `ScalarFunction` before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r99568239

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          Did you create a Java class which extends `ScalarFunction` or `TableFunction` ? I tried this in my environment but the compiling is failed.

          ```java
          public class MyScalarFunction extends ScalarFunction {
          public int eval(int a)

          { return a + 1; }

          }
          ```

          ```
          Error:(25, 8) java: org.apache.flink.table.examples.java.MyScalarFunction不是抽象的,
          并且未覆盖org.apache.flink.table.functions.UserDefinedFunction中的抽象方法close()
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r99568239 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – Did you create a Java class which extends `ScalarFunction` or `TableFunction` ? I tried this in my environment but the compiling is failed. ```java public class MyScalarFunction extends ScalarFunction { public int eval(int a) { return a + 1; } } ``` ``` Error:(25, 8) java: org.apache.flink.table.examples.java.MyScalarFunction不是抽象的, 并且未覆盖org.apache.flink.table.functions.UserDefinedFunction中的抽象方法close() ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on the issue:

          https://github.com/apache/flink/pull/3176

          Hi @wuchong , thanks for this review.

          I think `UserDefinedScalarFunctionTest` is not enough, because `open` , `close` methods and job parameters are strongly depend on runtime, so it's better to test them in integration tests.

          I will add a test to verify Java ScalarFunction later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on the issue: https://github.com/apache/flink/pull/3176 Hi @wuchong , thanks for this review. I think `UserDefinedScalarFunctionTest` is not enough, because `open` , `close` methods and job parameters are strongly depend on runtime, so it's better to test them in integration tests. I will add a test to verify Java ScalarFunction later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r100469073

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          yes, It works! please refer to: http://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r100469073 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – yes, It works! please refer to: http://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r100470762

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          Copy from the blog you link:
          >To summarize: If you’re writing a Scala API that will be used by Java clients, don’t expose traits that have implemented behavior in your public API. If you have traits like that, wrap them in a class for your Java consumers.

          The wrapper class mentioned in the blog is a concrete class defined in Scala and not an abstract class (like `ScalarFunction`). And if we extend this class in Java, the same error will be thrown. I think define `UserDefinedFunction` as abstract can easily fix this problem and no side effects.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r100470762 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – Copy from the blog you link: >To summarize: If you’re writing a Scala API that will be used by Java clients, don’t expose traits that have implemented behavior in your public API. If you have traits like that, wrap them in a class for your Java consumers. The wrapper class mentioned in the blog is a concrete class defined in Scala and not an abstract class (like `ScalarFunction`). And if we extend this class in Java, the same error will be thrown. I think define `UserDefinedFunction` as abstract can easily fix this problem and no side effects.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r100478286

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          The Scala wrapper class can be abstract class.

          MathTrait.scala
          ```Scala
          trait MathTrait

          { def sum(x: Int, y: Int) = x + y }

          ```

          MathTraitWrapper.scala
          ```Scala
          abstract class MathTraitWrapper extends MathTrait {
          }
          ```

          JavaMath.java
          ```Java
          public class JavaMath extends MathTraitWrapper {
          public static void main(String[] args)

          { new JavaMath(); }

          public JavaMath()

          { System.out.println(sum(2, 2)); }

          }
          ```

          This code works as expected, printing the number 4 when it is run.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r100478286 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – The Scala wrapper class can be abstract class. MathTrait.scala ```Scala trait MathTrait { def sum(x: Int, y: Int) = x + y } ``` MathTraitWrapper.scala ```Scala abstract class MathTraitWrapper extends MathTrait { } ``` JavaMath.java ```Java public class JavaMath extends MathTraitWrapper { public static void main(String[] args) { new JavaMath(); } public JavaMath() { System.out.println(sum(2, 2)); } } ``` This code works as expected, printing the number 4 when it is run.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r100483661

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          I tried your code and my code again and it works good. Do not know why not work before...

          Thank you for your explain.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r100483661 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – I tried your code and my code again and it works good. Do not know why not work before... Thank you for your explain.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101247688

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -220,56 +247,105 @@ class CodeGenerator(
          // manual casting here
          val samHeader =
          // FlatMapFunction

          • if (clazz == classOf[FlatMapFunction[_,_]]) {
            + if (clazz == classOf[FlatMapFunction[_, _]]) {
              • End diff –

          As I mentioned above, why not keeping it simple and always generate RichFunctions? The `clazz` parameter is still correct as `RichFlatMapFunction` implements `FlatMapFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101247688 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -220,56 +247,105 @@ class CodeGenerator( // manual casting here val samHeader = // FlatMapFunction if (clazz == classOf[FlatMapFunction [_,_] ]) { + if (clazz == classOf[FlatMapFunction [_, _] ]) { End diff – As I mentioned above, why not keeping it simple and always generate RichFunctions? The `clazz` parameter is still correct as `RichFlatMapFunction` implements `FlatMapFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101246957

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -122,6 +123,18 @@ class CodeGenerator(
          // we use a LinkedHashSet to keep the insertion order
          private val reusableInitStatements = mutable.LinkedHashSet[String]()

          + // generate RichFunction(e.g. RichFlatMapFunction) if true
          + // generate Function(e.g. FlatMapFunction) if false
          + private var generatedRichFunction = false
          — End diff –

          This increases the complexity of the code generator. Why don't we generate RichFunctions by default? Every RichFunction is a valid Function anyway.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101246957 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -122,6 +123,18 @@ class CodeGenerator( // we use a LinkedHashSet to keep the insertion order private val reusableInitStatements = mutable.LinkedHashSet [String] () + // generate RichFunction(e.g. RichFlatMapFunction) if true + // generate Function(e.g. FlatMapFunction) if false + private var generatedRichFunction = false — End diff – This increases the complexity of the code generator. Why don't we generate RichFunctions by default? Every RichFunction is a valid Function anyway.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101249809

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala —
          @@ -0,0 +1,53 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.functions
          +
          +import java.io.File
          +
          +import org.apache.flink.annotation.PublicEvolving
          +import org.apache.flink.api.common.functions.RuntimeContext
          +import org.apache.flink.metrics.MetricGroup
          +
          +class UDFContext(context: RuntimeContext) {
          +
          + /**
          + * Returns the metric group for this parallel subtask.
          + */
          + @PublicEvolving
          — End diff –

          The Table API has no annotations yet as the API can still change. Can you remove all annotations here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101249809 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala — @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.io.File + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.metrics.MetricGroup + +class UDFContext(context: RuntimeContext) { + + /** + * Returns the metric group for this parallel subtask. + */ + @PublicEvolving — End diff – The Table API has no annotations yet as the API can still change. Can you remove all annotations here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101248702

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala —
          @@ -0,0 +1,53 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.functions
          +
          +import java.io.File
          +
          +import org.apache.flink.annotation.PublicEvolving
          +import org.apache.flink.api.common.functions.RuntimeContext
          +import org.apache.flink.metrics.MetricGroup
          +
          +class UDFContext(context: RuntimeContext) {
          — End diff –

          Please also add a Javadoc here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101248702 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala — @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.io.File + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.metrics.MetricGroup + +class UDFContext(context: RuntimeContext) { — End diff – Please also add a Javadoc here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101265531

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          Did you run your code in Java 7 or Java 8? Java 8 introduces default implementation for interfaces, but Java 7 could cause problems. I think it won't hurt if we convert it into an `abstract class`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101265531 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – Did you run your code in Java 7 or Java 8? Java 8 introduces default implementation for interfaces, but Java 7 could cause problems. I think it won't hurt if we convert it into an `abstract class`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101265088

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala —
          @@ -0,0 +1,53 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.functions
          +
          +import java.io.File
          +
          +import org.apache.flink.annotation.PublicEvolving
          +import org.apache.flink.api.common.functions.RuntimeContext
          +import org.apache.flink.metrics.MetricGroup
          +
          +class UDFContext(context: RuntimeContext) {
          +
          + /**
          + * Returns the metric group for this parallel subtask.
          + */
          + @PublicEvolving
          + def getMetricGroup: MetricGroup = context.getMetricGroup
          +
          + /**
          + * Get the local temporary file copies of distributed cache files
          — End diff –

          Please also add Javadoc for parameters and return values. This user-facing API and should be well documented. Please also add some website documentation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101265088 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala — @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.io.File + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.metrics.MetricGroup + +class UDFContext(context: RuntimeContext) { + + /** + * Returns the metric group for this parallel subtask. + */ + @PublicEvolving + def getMetricGroup: MetricGroup = context.getMetricGroup + + /** + * Get the local temporary file copies of distributed cache files — End diff – Please also add Javadoc for parameters and return values. This user-facing API and should be well documented. Please also add some website documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101269209

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala —
          @@ -43,10 +44,16 @@ class FlatMapRunner[IN, OUT](
          val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
          LOG.debug("Instantiating FlatMapFunction.")
          function = clazz.newInstance()
          + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
          — End diff –

          Don't we have to add this to the `CorrelateFlatMapRunner` as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101269209 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala — @@ -43,10 +44,16 @@ class FlatMapRunner [IN, OUT] ( val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating FlatMapFunction.") function = clazz.newInstance() + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) — End diff – Don't we have to add this to the `CorrelateFlatMapRunner` as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101428511

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          It works in Java 7

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101428511 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – It works in Java 7
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101437890

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala —
          @@ -0,0 +1,53 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.functions
          +
          +import java.io.File
          +
          +import org.apache.flink.annotation.PublicEvolving
          +import org.apache.flink.api.common.functions.RuntimeContext
          +import org.apache.flink.metrics.MetricGroup
          +
          +class UDFContext(context: RuntimeContext) {
          +
          + /**
          + * Returns the metric group for this parallel subtask.
          + */
          + @PublicEvolving
          — End diff –

          OK

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101437890 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala — @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.io.File + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.metrics.MetricGroup + +class UDFContext(context: RuntimeContext) { + + /** + * Returns the metric group for this parallel subtask. + */ + @PublicEvolving — End diff – OK
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101437939

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -122,6 +123,18 @@ class CodeGenerator(
          // we use a LinkedHashSet to keep the insertion order
          private val reusableInitStatements = mutable.LinkedHashSet[String]()

          + // generate RichFunction(e.g. RichFlatMapFunction) if true
          + // generate Function(e.g. FlatMapFunction) if false
          + private var generatedRichFunction = false
          — End diff –

          OK

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101437939 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -122,6 +123,18 @@ class CodeGenerator( // we use a LinkedHashSet to keep the insertion order private val reusableInitStatements = mutable.LinkedHashSet [String] () + // generate RichFunction(e.g. RichFlatMapFunction) if true + // generate Function(e.g. FlatMapFunction) if false + private var generatedRichFunction = false — End diff – OK
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3176#discussion_r101447660

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala —
          @@ -24,4 +24,19 @@ package org.apache.flink.table.functions

          • User-defined functions must have a default constructor and must be instantiable during runtime.
            */
            trait UserDefinedFunction {
              • End diff –

          I tried in Java 7, it works.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3176#discussion_r101447660 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala — @@ -24,4 +24,19 @@ package org.apache.flink.table.functions User-defined functions must have a default constructor and must be instantiable during runtime. */ trait UserDefinedFunction { End diff – I tried in Java 7, it works.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user godfreyhe commented on the issue:

          https://github.com/apache/flink/pull/3176

          Thanks for the suggestions @twalthr, I have updated the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user godfreyhe commented on the issue: https://github.com/apache/flink/pull/3176 Thanks for the suggestions @twalthr, I have updated the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3176

          Thanks for the update @godfreyhe. I will have a final pass through the code and merge it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3176 Thanks for the update @godfreyhe. I will have a final pass through the code and merge it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3176

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3176
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: b820fd3ca038e411bc7f43e1c35637bf62981fe5

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: b820fd3ca038e411bc7f43e1c35637bf62981fe5

            People

            • Assignee:
              godfreyhe godfrey he
              Reporter:
              godfreyhe godfrey he
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development