Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user sunjincheng121 opened a pull request:

        https://github.com/apache/flink/pull/3330

        FLINK-5795[TableAPI&SQL] Improve UDTF to support constructor with p…

        …arameter.

        Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
        If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
        In addition to going through the list, please provide a meaningful description of your changes.

        • [×] General
        • The pull request references the related JIRA issue ("FLINK-5795 Improve “UDTF" to support constructor with parameter.")
        • The pull request addresses only one issue
        • Each commit in the PR has a meaningful commit message (including the JIRA id)
        • [ ] Documentation
        • Documentation has been added for new functionality
        • Old documentation affected by the pull request has been updated
        • JavaDoc for public methods has been added
        • [×] Tests & Build
        • Functionality added by the pull request is covered by tests
        • `mvn clean verify` has been executed successfully locally or a Travis build has passed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/sunjincheng121/flink FLINK-5795-PR

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3330.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3330


        commit da788d627527f7a0547c23d50ba916f625b66aeb
        Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
        Date: 2017-02-14T06:43:41Z

        FLINK-5795[TableAPI&SQL] Improve UDTF to support constructor with parameter.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3330 FLINK-5795 [TableAPI&SQL] Improve UDTF to support constructor with p… …arameter. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-5795 Improve “UDTF" to support constructor with parameter.") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5795 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3330 commit da788d627527f7a0547c23d50ba916f625b66aeb Author: 金竹 <jincheng.sunjc@alibaba-inc.com> Date: 2017-02-14T06:43:41Z FLINK-5795 [TableAPI&SQL] Improve UDTF to support constructor with parameter.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101445690

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
        candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
        candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])

        + @throws[Exception]
        + def serialize(function: UserDefinedFunction): String = {
        + val byteArrayOutPut = new ByteArrayOutputStream
        — End diff –

        You can get the serialized byte array of an object by `InstantiationUtil.serializeObject`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101445690 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils { candidate == classOf [Time] && (expected == classOf [Int] || expected == classOf [JInt] ) || candidate == classOf [Timestamp] && (expected == classOf [Long] || expected == classOf [JLong] ) + @throws [Exception] + def serialize(function: UserDefinedFunction): String = { + val byteArrayOutPut = new ByteArrayOutputStream — End diff – You can get the serialized byte array of an object by `InstantiationUtil.serializeObject`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101446163

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        This class can go into `org.apache.flink.table.runtime.dataset.DataSetCorrelateITCase`

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101446163 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – This class can go into `org.apache.flink.table.runtime.dataset.DataSetCorrelateITCase`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101445424

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Why do we need md5Hex here ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101445424 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Why do we need md5Hex here ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101445544

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
        candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
        candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])

        + @throws[Exception]
        + def serialize(function: UserDefinedFunction): String = {
        + val byteArrayOutPut = new ByteArrayOutputStream
        + val objectOutPut = new ObjectOutputStream(byteArrayOutPut)
        + objectOutPut.writeObject(function)
        + objectOutPut.close()
        + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray)
        — End diff –

        Dose `Base64.encodeBase64String` sufficient this?

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101445544 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils { candidate == classOf [Time] && (expected == classOf [Int] || expected == classOf [JInt] ) || candidate == classOf [Timestamp] && (expected == classOf [Long] || expected == classOf [JLong] ) + @throws [Exception] + def serialize(function: UserDefinedFunction): String = { + val byteArrayOutPut = new ByteArrayOutputStream + val objectOutPut = new ObjectOutputStream(byteArrayOutPut) + objectOutPut.writeObject(function) + objectOutPut.close() + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray) — End diff – Dose `Base64.encodeBase64String` sufficient this?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101445785

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
        candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
        candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])

        + @throws[Exception]
        + def serialize(function: UserDefinedFunction): String =

        { + val byteArrayOutPut = new ByteArrayOutputStream + val objectOutPut = new ObjectOutputStream(byteArrayOutPut) + objectOutPut.writeObject(function) + objectOutPut.close() + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray) + }

        +
        + @throws[Exception]
        + def deserialize(data: String): UserDefinedFunction = {
        + val byteData = Base64.decodeBase64(data)
        + new ObjectInputStream(
        — End diff –

        You can get the object from byte array by `InstantiationUtil.deserializeObject`

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101445785 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils { candidate == classOf [Time] && (expected == classOf [Int] || expected == classOf [JInt] ) || candidate == classOf [Timestamp] && (expected == classOf [Long] || expected == classOf [JLong] ) + @throws [Exception] + def serialize(function: UserDefinedFunction): String = { + val byteArrayOutPut = new ByteArrayOutputStream + val objectOutPut = new ObjectOutputStream(byteArrayOutPut) + objectOutPut.writeObject(function) + objectOutPut.close() + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray) + } + + @throws [Exception] + def deserialize(data: String): UserDefinedFunction = { + val byteData = Base64.decodeBase64(data) + new ObjectInputStream( — End diff – You can get the object from byte array by `InstantiationUtil.deserializeObject`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101446217

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,80 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.datastream
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.utils.TableFunc3
        +import org.apache.flink.streaming.api.scala.

        {DataStream, StreamExecutionEnvironment}

        +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
        +import org.apache.flink.table.api.TableEnvironment
        +import org.junit.Assert._
        +import org.junit.Test
        +
        +import scala.collection.mutable
        +
        +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
        — End diff –

        This class can go into `org.apache.flink.table.runtime.datastream.DataStreamCorrelateITCase`

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101446217 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala — @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.datastream + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableFunc3 +import org.apache.flink.streaming.api.scala. {DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { — End diff – This class can go into `org.apache.flink.table.runtime.datastream.DataStreamCorrelateITCase`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101446744

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        1. Calculate MD5 to prevent variable name duplication.
        2. Convert Hex to prevent special characters, such as the symbol "=".

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101446744 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – 1. Calculate MD5 to prevent variable name duplication. 2. Convert Hex to prevent special characters, such as the symbol "=".
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101447054

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
        candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
        candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])

        + @throws[Exception]
        + def serialize(function: UserDefinedFunction): String = {
        + val byteArrayOutPut = new ByteArrayOutputStream
        + val objectOutPut = new ObjectOutputStream(byteArrayOutPut)
        + objectOutPut.writeObject(function)
        + objectOutPut.close()
        + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray)
        — End diff –

        No. we can not use that method. because:
        encodeBase64String will `Encodes binary data using the base64 algorithm into 76 character blocks separated by CRLF.` So,if the binary data more than 76 character, we'll got `CRLF` which we do not want.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101447054 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils { candidate == classOf [Time] && (expected == classOf [Int] || expected == classOf [JInt] ) || candidate == classOf [Timestamp] && (expected == classOf [Long] || expected == classOf [JLong] ) + @throws [Exception] + def serialize(function: UserDefinedFunction): String = { + val byteArrayOutPut = new ByteArrayOutputStream + val objectOutPut = new ObjectOutputStream(byteArrayOutPut) + objectOutPut.writeObject(function) + objectOutPut.close() + Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray) — End diff – No. we can not use that method. because: encodeBase64String will `Encodes binary data using the base64 algorithm into 76 character blocks separated by CRLF.` So,if the binary data more than 76 character, we'll got `CRLF` which we do not want.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101447400

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        In the beginning, I also want to put it into `DataSetCorrelateITCase`, but this class used for `UserDefinedFunction(UDF / UDTF / UDAF)` tests, so I think a single is better, what do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101447400 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – In the beginning, I also want to put it into `DataSetCorrelateITCase`, but this class used for `UserDefinedFunction(UDF / UDTF / UDAF)` tests, so I think a single is better, what do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101448753

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        I'm not sure about that. Most of the time, Scalar Functions can be tested by unit tests (see `ScalarFunctionsTest`), not all IT cases.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101448753 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – I'm not sure about that. Most of the time, Scalar Functions can be tested by unit tests (see `ScalarFunctionsTest`), not all IT cases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101450383

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
        candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
        candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])

        + @throws[Exception]
        + def serialize(function: UserDefinedFunction): String = {
        + val byteArrayOutPut = new ByteArrayOutputStream
        — End diff –

        Cool,good catch. It's the same code, but use InstantiationUtil is a good away.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101450383 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils { candidate == classOf [Time] && (expected == classOf [Int] || expected == classOf [JInt] ) || candidate == classOf [Timestamp] && (expected == classOf [Long] || expected == classOf [JLong] ) + @throws [Exception] + def serialize(function: UserDefinedFunction): String = { + val byteArrayOutPut = new ByteArrayOutputStream — End diff – Cool,good catch. It's the same code, but use InstantiationUtil is a good away.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101450405

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        It seems that if we register two identical function (not the same reference), the field term will the same, which will compile error here. Such as:

        ```
        val func1 = new MyFunction
        val func2 = new MyFunction
        env.register("func1", func1)
        env.register("func2", func2)
        ```

        The func1 and func2 is two different object but the serialized bytes are equals.

        BTW, I will suggest to include this case into your new IT cases.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101450405 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – It seems that if we register two identical function (not the same reference), the field term will the same, which will compile error here. Such as: ``` val func1 = new MyFunction val func2 = new MyFunction env.register("func1", func1) env.register("func2", func2) ``` The func1 and func2 is two different object but the serialized bytes are equals. BTW, I will suggest to include this case into your new IT cases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101455043

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        1. You write the above two functions are stateless, there is no problem when create a instanse .
        2. In fact, we do not have to worried about the variable name, we build a tree, each UDTF node is independent of the codegen.
        So,we can keep named the `fieldTerm = function_$

        {classQualifier.replace('.', '$')}

        `.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101455043 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – 1. You write the above two functions are stateless, there is no problem when create a instanse . 2. In fact, we do not have to worried about the variable name, we build a tree, each UDTF node is independent of the codegen. So,we can keep named the `fieldTerm = function_$ {classQualifier.replace('.', '$')} `.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101456205

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Yes, UDTF node is independent of the codegen. It rarely happens in UDTF. But often happens when involving multiple ScalarFucntions (assuming the same way to implement ScalarFunction).

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101456205 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Yes, UDTF node is independent of the codegen. It rarely happens in UDTF. But often happens when involving multiple ScalarFucntions (assuming the same way to implement ScalarFunction).
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101458924

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Yes, thanks @wuchong ! UDF and UDTF will be a little different, so we discuss it separately, and in FLINK-5794 I will consider UDF in conjunction with your suggestion.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101458924 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Yes, thanks @wuchong ! UDF and UDTF will be a little different, so we discuss it separately, and in FLINK-5794 I will consider UDF in conjunction with your suggestion.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101460153

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        I think it makes sense to keep the same implementation for UDF and UDTF. What about code generate a byte[] field which hold the serialized function (proposed by Fabian)?

        >I see two options:
        >1. make the UDF a member of wrapping function. It might be a bit tricky to pass the reference into the code-gen'd function.
        >2. add a final byte[] field into the code-gen'd function that holds the serialized UDF object and deserialize during initialization. This will blow up the code-gen'd string but might work well.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101460153 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – I think it makes sense to keep the same implementation for UDF and UDTF. What about code generate a byte[] field which hold the serialized function (proposed by Fabian)? >I see two options: >1. make the UDF a member of wrapping function. It might be a bit tricky to pass the reference into the code-gen'd function. >2. add a final byte[] field into the code-gen'd function that holds the serialized UDF object and deserialize during initialization. This will blow up the code-gen'd string but might work well.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101461418

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Cool, we can talk about it, outer this PR.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101461418 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Cool, we can talk about it, outer this PR.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101726090

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        @wuchong, maybe you are right. But I'm not sure. I think @fhueske can give us good advice,What do you think @fhueske.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101726090 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – @wuchong, maybe you are right. But I'm not sure. I think @fhueske can give us good advice,What do you think @fhueske.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101760789

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        — End diff –

        By serializing the UDF, we don't need the default constructor anymore, right? So we should update the JavaDocs of the method.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101760789 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { — End diff – By serializing the UDF, we don't need the default constructor anymore, right? So we should update the JavaDocs of the method.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101765235

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        The problem with many `TableProgramsClusterTestBase` ITCases classes is that each class adds significant testing overhead. For each class, a Flink testing cluster is started which is shared across all tests in the file. So having a single ITCase with many tests is less of a problem than having many classes with one test method each.

        I think it makes sense to move this test to `DataSetCorrelateITCase`. Alternatively, we can extend `TableProgramsCollectionTestBase` which does not start a cluster but runs the program on Java collections. That should be fine for this test.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101765235 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – The problem with many `TableProgramsClusterTestBase` ITCases classes is that each class adds significant testing overhead. For each class, a Flink testing cluster is started which is shared across all tests in the file. So having a single ITCase with many tests is less of a problem than having many classes with one test method each. I think it makes sense to move this test to `DataSetCorrelateITCase`. Alternatively, we can extend `TableProgramsCollectionTestBase` which does not start a cluster but runs the program on Java collections. That should be fine for this test.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101765703

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,80 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.datastream
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.utils.TableFunc3
        +import org.apache.flink.streaming.api.scala.

        {DataStream, StreamExecutionEnvironment}

        +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
        +import org.apache.flink.table.api.TableEnvironment
        +import org.junit.Assert._
        +import org.junit.Test
        +
        +import scala.collection.mutable
        +
        +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
        — End diff –

        Streaming integration tests do always start a new Flink cluster (there is no collection mode).
        So I think moving this to `DataStreamCorrelateITCase` as @wuchong suggested is a good idea.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101765703 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala — @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.datastream + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableFunc3 +import org.apache.flink.streaming.api.scala. {DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { — End diff – Streaming integration tests do always start a new Flink cluster (there is no collection mode). So I think moving this to `DataStreamCorrelateITCase` as @wuchong suggested is a good idea.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101775264

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        I think encoding the serialized object as String is a good idea. I like the implementation.

        Regarding the name collisions. I did a test with two identical scalar functions and everything worked well because `reusableMemberStatements` is a hash set that deduplicates the terms.

        Btw. what else do we need to do for scalar UDFs? They are injected with the same method and hence should be serialized as well. Isn't the only missing thing to add more tests? If yes, we could do that in this PR.

        What do you think @sunjincheng121 and @wuchong?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101775264 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – I think encoding the serialized object as String is a good idea. I like the implementation. Regarding the name collisions. I did a test with two identical scalar functions and everything worked well because `reusableMemberStatements` is a hash set that deduplicates the terms. Btw. what else do we need to do for scalar UDFs? They are injected with the same method and hence should be serialized as well. Isn't the only missing thing to add more tests? If yes, we could do that in this PR. What do you think @sunjincheng121 and @wuchong?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101759328

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala —
        @@ -22,7 +22,9 @@ package org.apache.flink.table.functions.utils
        import java.lang.

        {Long => JLong, Integer => JInt}

        import java.lang.reflect.

        {Method, Modifier}

        import java.sql.

        {Date, Time, Timestamp}

        +import java.io.

        {ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

        — End diff –

        some unused imports.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101759328 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala — @@ -22,7 +22,9 @@ package org.apache.flink.table.functions.utils import java.lang. {Long => JLong, Integer => JInt} import java.lang.reflect. {Method, Modifier} import java.sql. {Date, Time, Timestamp} +import java.io. {ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} — End diff – some unused imports.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101762354

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}

          + """.stripMargin

        val fieldFunction =
        s"""

        transient $classQualifier $fieldTerm = null;
        """.stripMargin
        reusableMemberStatements.add(fieldFunction)
        • val constructorTerm = s"constructor_$ {classQualifier.replace('.', '$')}

          "
          val constructorAccessibility =
          s"""

        • java.lang.reflect.Constructor $constructorTerm =
        • $classQualifier.class.getDeclaredConstructor();
        • $constructorTerm.setAccessible(true);
        • $fieldTerm = ($classQualifier) $constructorTerm.newInstance();
          +
          $fieldTerm = ($classQualifier)
          +
          org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
            • End diff –

        replace hardcoded class name by `$

        {UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")}

        `

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101762354 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} + """.stripMargin val fieldFunction = s""" transient $classQualifier $fieldTerm = null; """.stripMargin reusableMemberStatements.add(fieldFunction) val constructorTerm = s"constructor_$ {classQualifier.replace('.', '$')} " val constructorAccessibility = s""" java.lang.reflect.Constructor $constructorTerm = $classQualifier.class.getDeclaredConstructor(); $constructorTerm.setAccessible(true); $fieldTerm = ($classQualifier) $constructorTerm.newInstance(); + $fieldTerm = ($classQualifier) + org.apache.flink.table.functions.utils.UserDefinedFunctionUtils End diff – replace hardcoded class name by `$ {UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")} `
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101763661

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}

          + """.stripMargin

        val fieldFunction =
        s"""

        transient $classQualifier $fieldTerm = null;
        """.stripMargin
        reusableMemberStatements.add(fieldFunction)
        • val constructorTerm = s"constructor_$ {classQualifier.replace('.', '$')}

          "
          val constructorAccessibility =

            • End diff –

        rename variable to something like `functionDeserialization`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101763661 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} + """.stripMargin val fieldFunction = s""" transient $classQualifier $fieldTerm = null; """.stripMargin reusableMemberStatements.add(fieldFunction) val constructorTerm = s"constructor_$ {classQualifier.replace('.', '$')} " val constructorAccessibility = End diff – rename variable to something like `functionDeserialization`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101885634

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        I find that the md5Hex string in the fieldTerm is never used. What about using `CodeGenUtils.newName` to generate a new function field name (as shown below). It is a common usage in `CodeGenerator` and there must be no naming collisions and the generated name will be more readable. What do you think @sunjincheng121 @fhueske ?

        ```
        CodeGenUtils.newName(s"function_$

        {classQualifier.replace('.', '$')}

        ")
        ```

        Regarding to another PR for scalar UDFs, I think you are right. We can that in this PR.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101885634 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – I find that the md5Hex string in the fieldTerm is never used. What about using `CodeGenUtils.newName` to generate a new function field name (as shown below). It is a common usage in `CodeGenerator` and there must be no naming collisions and the generated name will be more readable. What do you think @sunjincheng121 @fhueske ? ``` CodeGenUtils.newName(s"function_$ {classQualifier.replace('.', '$')} ") ``` Regarding to another PR for scalar UDFs, I think you are right. We can that in this PR.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101906986

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Thanks @fhueske
        Regarding the name collisions , the current implementation is to meet UDTF and UDF.
        But from `tableAPI` to `SqlFunction` the creation process has some work to do because the current UDF parameters do not take effect.I just make a simple test, did not carefully check the reasons (I would like to consider in the FLINK-5795). Feel free to let me know If you also want merge FLINK-5794 into this PR。

        Thanks, @wuchong
        1. md5Hex used for carry the construction parameters of the situation, without md5Hex will produce object coverage, can not produce the correct results.
        2. `CodeGenUtils.newName` not work well, because this method use `AtomicInteger.getAndIncrement` generate name number, when we use multiple UDFs of the same state, it will lead to the creation of multiple UDF objects, and in fact a shared object is sufficient. e.g.
        `
        tEnv.registerFunction("func0", new Func13)
        tEnv.registerFunction("func1", new Func13)
        tEnv.registerFunction("func2", new Func13)
        `
        `reusableMemberStatements ` will contain three different elements, can not do the de-duplicates. and in fact a shared object is sufficient

        What do you think @fhueske @wuchong ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101906986 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Thanks @fhueske Regarding the name collisions , the current implementation is to meet UDTF and UDF. But from `tableAPI` to `SqlFunction` the creation process has some work to do because the current UDF parameters do not take effect.I just make a simple test, did not carefully check the reasons (I would like to consider in the FLINK-5795 ). Feel free to let me know If you also want merge FLINK-5794 into this PR。 Thanks, @wuchong 1. md5Hex used for carry the construction parameters of the situation, without md5Hex will produce object coverage, can not produce the correct results. 2. `CodeGenUtils.newName` not work well, because this method use `AtomicInteger.getAndIncrement` generate name number, when we use multiple UDFs of the same state, it will lead to the creation of multiple UDF objects, and in fact a shared object is sufficient. e.g. ` tEnv.registerFunction("func0", new Func13) tEnv.registerFunction("func1", new Func13) tEnv.registerFunction("func2", new Func13) ` `reusableMemberStatements ` will contain three different elements, can not do the de-duplicates. and in fact a shared object is sufficient What do you think @fhueske @wuchong ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101907504

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        >But from tableAPI to SqlFunction the creation process has some work to do because the current UDF parameters do not take effect.

        Do you mean "from Java Table API to SqlFunction" ? If yes, the Java Table API (i.e. parse expression from a string) will instantiate a new instance of ScalarFunction when lookup this function call (the line is https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala#L84). TableFunction will reuse the object created by user. You can follow the way of TableFunction to fix this.

        BTW, scalar UDF in Scala Table API works good, right?

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101907504 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – >But from tableAPI to SqlFunction the creation process has some work to do because the current UDF parameters do not take effect. Do you mean "from Java Table API to SqlFunction" ? If yes, the Java Table API (i.e. parse expression from a string) will instantiate a new instance of ScalarFunction when lookup this function call (the line is https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala#L84 ). TableFunction will reuse the object created by user. You can follow the way of TableFunction to fix this. BTW, scalar UDF in Scala Table API works good, right?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101942319

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        Hi @wuchong
        No, scalar UDF in Scala Table API not works well.
        The reason is that when we create `ScalarSqlFunction`, we apply` scalarFunction.getClass.getCanonicalName` as sql identifier, which produces the wrong result.

        Hi, @fhueske
        So far, we have discussed a lot of UDF implementations in this PR, so I agree with merge FLINK-5794 into this PR.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101942319 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – Hi @wuchong No, scalar UDF in Scala Table API not works well. The reason is that when we create `ScalarSqlFunction`, we apply` scalarFunction.getClass.getCanonicalName` as sql identifier, which produces the wrong result. Hi, @fhueske So far, we have discussed a lot of UDF implementations in this PR, so I agree with merge FLINK-5794 into this PR.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101943644

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        We also need to add the UDF’s test , UDF‘s test add to the `DataSetCorrelateITCase` is not appropriate. So I think extend `TableProgramsCollectionTestBase` is good way .

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101943644 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – We also need to add the UDF’s test , UDF‘s test add to the `DataSetCorrelateITCase` is not appropriate. So I think extend `TableProgramsCollectionTestBase` is good way .
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101944209

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,80 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.datastream
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.utils.TableFunc3
        +import org.apache.flink.streaming.api.scala.

        {DataStream, StreamExecutionEnvironment}

        +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
        +import org.apache.flink.table.api.TableEnvironment
        +import org.junit.Assert._
        +import org.junit.Test
        +
        +import scala.collection.mutable
        +
        +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
        — End diff –

        Where do we need to put the UDF test? If we put the UDTF test into DataStreamCorrelateITCase. Do we need to create a separate ITCase? @fhueske @wuchong

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101944209 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala — @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.datastream + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableFunc3 +import org.apache.flink.streaming.api.scala. {DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { — End diff – Where do we need to put the UDF test? If we put the UDTF test into DataStreamCorrelateITCase. Do we need to create a separate ITCase? @fhueske @wuchong
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101946735

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,80 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.datastream
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.utils.TableFunc3
        +import org.apache.flink.streaming.api.scala.

        {DataStream, StreamExecutionEnvironment}

        +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
        +import org.apache.flink.table.api.TableEnvironment
        +import org.junit.Assert._
        +import org.junit.Test
        +
        +import scala.collection.mutable
        +
        +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
        — End diff –

        IMO, the scalar UDF tests can be put into `UserDefinedScalarFunctionTest`. Scalar function tests do not need to setup a cluster environment, so an unit test is enough.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101946735 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala — @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.datastream + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableFunc3 +import org.apache.flink.streaming.api.scala. {DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { — End diff – IMO, the scalar UDF tests can be put into `UserDefinedScalarFunctionTest`. Scalar function tests do not need to setup a cluster environment, so an unit test is enough.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on the issue:

        https://github.com/apache/flink/pull/3330

        @fhueske @wuchong I had add UDF's implementation.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3330 @fhueske @wuchong I had add UDF's implementation.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101972557

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        I would really like to avoid having too many ITCase classes. Using the CollectionTestBase does only work for batch.

        Maybe we can rename the CorrelateITCases to UserDefinedFunctionsITCases.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101972557 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – I would really like to avoid having too many ITCase classes. Using the CollectionTestBase does only work for batch. Maybe we can rename the CorrelateITCases to UserDefinedFunctionsITCases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101974098

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -1463,21 +1465,23 @@ class CodeGenerator(
        */
        def addReusableFunction(function: UserDefinedFunction): String = {
        val classQualifier = function.getClass.getCanonicalName

        • val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}"
          + val functionSerializedData = serialize(function)
          + val fieldTerm =
          + s"""
          + |function_${classQualifier.replace('.', '$')}

          _$

          {DigestUtils.md5Hex(functionSerializedData)}
            • End diff –

        I agree with @sunjincheng121.
        The naming collision with md5hex are actually intentional to deduplicate identical functions This might happen because the same function is used more than once or because the same function has been registered twice and both are used. By using `CodeGen.newName` we would have the serialized code and initialization as often as the same function is used.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101974098 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1463,21 +1465,23 @@ class CodeGenerator( */ def addReusableFunction(function: UserDefinedFunction): String = { val classQualifier = function.getClass.getCanonicalName val fieldTerm = s"function_$ {classQualifier.replace('.', '$')}" + val functionSerializedData = serialize(function) + val fieldTerm = + s""" + |function_${classQualifier.replace('.', '$')} _$ {DigestUtils.md5Hex(functionSerializedData)} End diff – I agree with @sunjincheng121. The naming collision with md5hex are actually intentional to deduplicate identical functions This might happen because the same function is used more than once or because the same function has been registered twice and both are used. By using `CodeGen.newName` we would have the serialized code and initialization as often as the same function is used.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101975970

        — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala —
        @@ -0,0 +1,75 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.table.runtime.dataset
        +
        +import org.apache.flink.api.scala._
        +import org.apache.flink.types.Row
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
        +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
        +import org.apache.flink.table.api.scala._
        +import org.apache.flink.table.api.TableEnvironment
        +import org.apache.flink.table.utils._
        +import org.apache.flink.test.util.TestBaseUtils
        +import org.junit.Test
        +import org.junit.runner.RunWith
        +import org.junit.runners.Parameterized
        +
        +import scala.collection.JavaConverters._
        +import scala.collection.mutable
        +
        +@RunWith(classOf[Parameterized])
        +class DataSetUserDefinedFunctionITCase (
        — End diff –

        Yes. rename the CorrelateITCases to UserDefinedFunctionsITCases make sense for me. Because we can add UDF/UDTF/UDAF tests in one ITCase.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101975970 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.dataset + +import org.apache.flink.api.scala._ +import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.utils._ +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +@RunWith(classOf [Parameterized] ) +class DataSetUserDefinedFunctionITCase ( — End diff – Yes. rename the CorrelateITCases to UserDefinedFunctionsITCases make sense for me. Because we can add UDF/UDTF/UDAF tests in one ITCase.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101974535

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala —
        @@ -94,7 +96,10 @@ abstract class TableFunction[T] extends UserDefinedFunction

        { TableFunctionCall(getClass.getSimpleName, this, params, resultType) }
        • override def toString: String = getClass.getCanonicalName
          + override def toString: String = {
            • End diff –

        Same as for `ScalarFunction`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101974535 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala — @@ -94,7 +96,10 @@ abstract class TableFunction [T] extends UserDefinedFunction { TableFunctionCall(getClass.getSimpleName, this, params, resultType) } override def toString: String = getClass.getCanonicalName + override def toString: String = { End diff – Same as for `ScalarFunction`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r101974425

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala —
        @@ -56,8 +58,10 @@ abstract class ScalarFunction extends UserDefinedFunction

        { ScalarFunctionCall(this, params) }
        • override def toString: String = getClass.getCanonicalName
          -
          + override def toString: String = {
          + val md5 = DigestUtils.md5Hex(serialize(this))
            • End diff –

        I think this is dangerous.
        The function might be overridden by a UDF.
        Rather add a new `final` method.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r101974425 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala — @@ -56,8 +58,10 @@ abstract class ScalarFunction extends UserDefinedFunction { ScalarFunctionCall(this, params) } override def toString: String = getClass.getCanonicalName - + override def toString: String = { + val md5 = DigestUtils.md5Hex(serialize(this)) End diff – I think this is dangerous. The function might be overridden by a UDF. Rather add a new `final` method.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on the issue:

        https://github.com/apache/flink/pull/3330

        Hi, @fhueske ,
        Thanks for your review. I have added functionIdentifier method.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3330 Hi, @fhueske , Thanks for your review. I have added functionIdentifier method.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r102032869

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala —
        @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {

        override def toString: String = getClass.getCanonicalName

        + final def functionIdentifier: String = {
        — End diff –

        Can the `functionIdentifier` to be the default implementation of UserDefinedFunction ? I think it also works for AggregateFunction in the future.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r102032869 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala — @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction { override def toString: String = getClass.getCanonicalName + final def functionIdentifier: String = { — End diff – Can the `functionIdentifier` to be the default implementation of UserDefinedFunction ? I think it also works for AggregateFunction in the future.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on the issue:

        https://github.com/apache/flink/pull/3330

        Thanks @sunjincheng121 for updating. Could you rebase the code on the master?

        After that, I can help to merge this.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Thanks @sunjincheng121 for updating. Could you rebase the code on the master? After that, I can help to merge this.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r102035404

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala —
        @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {

        override def toString: String = getClass.getCanonicalName

        + final def functionIdentifier: String = {
        — End diff –

        In order to keep JAVA's interface and SCALAR's trait consistent, I prefer not to do any implementation in UserDefinedFunction.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r102035404 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala — @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction { override def toString: String = getClass.getCanonicalName + final def functionIdentifier: String = { — End diff – In order to keep JAVA's interface and SCALAR's trait consistent, I prefer not to do any implementation in UserDefinedFunction.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3330#discussion_r102037328

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala —
        @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {

        override def toString: String = getClass.getCanonicalName

        + final def functionIdentifier: String = {
        — End diff –

        The `UserDefinedFunction` has been declared as an abstract class in the newest master when we adding `open` and `close` default implementation. I think the `functionIdentifier` can be the same and will not hurt anything.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3330#discussion_r102037328 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala — @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction { override def toString: String = getClass.getCanonicalName + final def functionIdentifier: String = { — End diff – The `UserDefinedFunction` has been declared as an abstract class in the newest master when we adding `open` and `close` default implementation. I think the `functionIdentifier` can be the same and will not hurt anything.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on the issue:

        https://github.com/apache/flink/pull/3330

        thanks @wuchong , I'll rebase the code.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3330 thanks @wuchong , I'll rebase the code.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on the issue:

        https://github.com/apache/flink/pull/3330

        Hi @wuchong , thanks for the review. I have rebased the code on the master.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3330 Hi @wuchong , thanks for the review. I have rebased the code on the master.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on the issue:

        https://github.com/apache/flink/pull/3330

        Looks good to me. Wait for another committer +1

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Looks good to me. Wait for another committer +1
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3330

        Thanks for the update @sunjincheng121!
        +1 to merge.
        @wuchong do you want to merge this PR?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3330 Thanks for the update @sunjincheng121! +1 to merge. @wuchong do you want to merge this PR?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on the issue:

        https://github.com/apache/flink/pull/3330

        Yes @fhueske , I would like to help merge it. Is there anything I need to pay attention to ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Yes @fhueske , I would like to help merge it. Is there anything I need to pay attention to ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3330

        @wuchong just sent you mail with the steps I take to merge a PR.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3330 @wuchong just sent you mail with the steps I take to merge a PR.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wuchong commented on the issue:

        https://github.com/apache/flink/pull/3330

        Thank your for the steps.

        merging...

        Show
        githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Thank your for the steps. merging...
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user twalthr commented on the issue:

        https://github.com/apache/flink/pull/3330

        One last comment: (Maybe as a followup issue) We should also update the documentation about this change.

        Show
        githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3330 One last comment: (Maybe as a followup issue) We should also update the documentation about this change.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sunjincheng121 commented on the issue:

        https://github.com/apache/flink/pull/3330

        HI @twalthr, Thanks for the reminder, I'll update document in FLINK-5794.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3330 HI @twalthr, Thanks for the reminder, I'll update document in FLINK-5794 .
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3330

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3330
        Hide
        jark Jark Wu added a comment -

        Fixed in 1.3.0: 45e01cf2321dda58f572d8b9dbe64947c6725ad1

        Show
        jark Jark Wu added a comment - Fixed in 1.3.0: 45e01cf2321dda58f572d8b9dbe64947c6725ad1
        Hide
        jark Jark Wu added a comment -

        Fixed in 1.3.0: 45e01cf2321dda58f572d8b9dbe64947c6725ad1

        Show
        jark Jark Wu added a comment - Fixed in 1.3.0: 45e01cf2321dda58f572d8b9dbe64947c6725ad1

          People

          • Assignee:
            sunjincheng121 sunjincheng
            Reporter:
            sunjincheng121 sunjincheng
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development