Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5452

Make table unit tests pass under cluster mode

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Currently if we change the test execution mode to TestExecutionMode.CLUSTER in TableProgramsTestBase, some cases will fail. Need to figure out whether it's the case design problem or there are some bugs.

        Issue Links

          Activity

          Hide
          twalthr Timo Walther added a comment -

          Thanks for looking into this. This has to be fixed for the 1.2 release.

          Show
          twalthr Timo Walther added a comment - Thanks for looking into this. This has to be fixed for the 1.2 release.
          Hide
          ykt836 Kurt Young added a comment -

          OK, i will take a look ASAP.

          Show
          ykt836 Kurt Young added a comment - OK, i will take a look ASAP.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user KurtYoung opened a pull request:

          https://github.com/apache/flink/pull/3095

          FLINK-5452 [table] Fix SortITCase which will fail under cluster mode

          I tried this combination from `TableProgramsTestBase`:

          Array(TestExecutionMode.CLUSTER, DEFAULT)
          Array(TestExecutionMode.CLUSTER, EFFICIENT)
          Array(TestExecutionMode.COLLECTION, DEFAULT)
          Array(TestExecutionMode.COLLECTION, EFFICIENT)

          And all the cases can pass now

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/KurtYoung/flink flink-5452

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3095.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3095


          commit 65e3c33840534781fecf899000dfd81ae250f39a
          Author: kete.yangkt <kete.yangkt@alibaba-inc.com>
          Date: 2017-01-11T17:51:41Z

          FLINK-5452 [table] Fix SortITCase which will fail under cluster mode


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3095 FLINK-5452 [table] Fix SortITCase which will fail under cluster mode I tried this combination from `TableProgramsTestBase`: Array(TestExecutionMode.CLUSTER, DEFAULT) Array(TestExecutionMode.CLUSTER, EFFICIENT) Array(TestExecutionMode.COLLECTION, DEFAULT) Array(TestExecutionMode.COLLECTION, EFFICIENT) And all the cases can pass now You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5452 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3095.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3095 commit 65e3c33840534781fecf899000dfd81ae250f39a Author: kete.yangkt <kete.yangkt@alibaba-inc.com> Date: 2017-01-11T17:51:41Z FLINK-5452 [table] Fix SortITCase which will fail under cluster mode
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3095

          Thanks for looking into this issue @KurtYoung
          I agree that this is rather an issue with the tests and not with the actual code. However, I would fix the tests a bit differently.
          The goal of the code that you removed was to validate that each partition is correctly sorted and that the partitions themselves are correctly sorted, i.e., for a descending sort, the highest values should be in partition 0 and the lowest in partition n.

          In order to ensure parallel execution, we cannot execute the sort tests in a collection environment but need a cluster environment. Moreover, we should explicitly set a default parallelism on the ExecutionEnvironment to avoid that the program is executed with parallelism 1 (a parallelism of 3 should suffice). Once we do that we must ensure that the started minicluster offers enough slots to run the program.

          I'll add a few more inline comments to the tests.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3095 Thanks for looking into this issue @KurtYoung I agree that this is rather an issue with the tests and not with the actual code. However, I would fix the tests a bit differently. The goal of the code that you removed was to validate that each partition is correctly sorted and that the partitions themselves are correctly sorted, i.e., for a descending sort, the highest values should be in partition 0 and the lowest in partition n. In order to ensure parallel execution, we cannot execute the sort tests in a collection environment but need a cluster environment. Moreover, we should explicitly set a default parallelism on the ExecutionEnvironment to avoid that the program is executed with parallelism 1 (a parallelism of 3 should suffice). Once we do that we must ensure that the started minicluster offers enough slots to run the program. I'll add a few more inline comments to the tests. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3095#discussion_r95753596

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala —
          @@ -55,14 +55,8 @@ class SortITCase(
          tEnv.registerDataSet("MyTable", ds)

          val expected = sortExpectedly(tupleDataSetStrings)

          • val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
            -
          • val result = results
          • .filterNot(_.isEmpty)
          • .sortBy(_.head)(Ordering.by(f=> f.toString))
              • End diff –

          The problem here is the string conversion which results in a lexicographical order ("21,....." sorts before "5,...").
          We should change this to `.sortBy(_.head)(Ordering.by(r => (r.getField(0), r.getField(1))))`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3095#discussion_r95753596 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala — @@ -55,14 +55,8 @@ class SortITCase( tEnv.registerDataSet("MyTable", ds) val expected = sortExpectedly(tupleDataSetStrings) val results = tEnv.sql(sqlQuery).toDataSet [Row] .mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results .filterNot(_.isEmpty) .sortBy(_.head)(Ordering.by(f=> f.toString)) End diff – The problem here is the string conversion which results in a lexicographical order ("21,....." sorts before "5,..."). We should change this to `.sortBy(_.head)(Ordering.by(r => (r.getField(0), r.getField(1))))`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3095#discussion_r95753661

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala —
          @@ -79,14 +73,8 @@ class SortITCase(
          tEnv.registerDataSet("MyTable", ds)

          val expected = sortExpectedly(tupleDataSetStrings, 2, 21)

          • val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
            -
          • val result = results.
          • filterNot(_.isEmpty)
          • .sortBy(_.head)(Ordering.by(f=> f.toString))
              • End diff –

          same here

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3095#discussion_r95753661 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala — @@ -79,14 +73,8 @@ class SortITCase( tEnv.registerDataSet("MyTable", ds) val expected = sortExpectedly(tupleDataSetStrings, 2, 21) val results = tEnv.sql(sqlQuery).toDataSet [Row] .mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results. filterNot(_.isEmpty) .sortBy(_.head)(Ordering.by(f=> f.toString)) End diff – same here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3095#discussion_r95754128

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala —
          @@ -57,14 +57,8 @@ class SortITCase(

          • x.productElement(0).asInstanceOf[Int] )

          val expected = sortExpectedly(tupleDataSetStrings)

          • val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
            -
          • val result = results
          • .filterNot(_.isEmpty)
          • .sortBy(_.head)(Ordering.by(f=> f.toString))
              • End diff –

          We should change this to ` .sortBy(_.head)` and provide an implicit ordering for `Row`.
          When we changed `Row` to not extend `Product`, we should have added the implicit ordering for `Row` instead of sorting by String.

          The same applies to the other tests in this class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3095#discussion_r95754128 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala — @@ -57,14 +57,8 @@ class SortITCase( x.productElement(0).asInstanceOf [Int] ) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet [Row] .mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results .filterNot(_.isEmpty) .sortBy(_.head)(Ordering.by(f=> f.toString)) End diff – We should change this to ` .sortBy(_.head)` and provide an implicit ordering for `Row`. When we changed `Row` to not extend `Product`, we should have added the implicit ordering for `Row` instead of sorting by String. The same applies to the other tests in this class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3095

          It might make sense to wait with this PR until #3099 is in

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3095 It might make sense to wait with this PR until #3099 is in
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3095

          @fhueske Ah i see, thanks for the explaining. I will try to fix this in another way after #3099 is in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3095 @fhueske Ah i see, thanks for the explaining. I will try to fix this in another way after #3099 is in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3095

          Hi @KurtYoung, #3099 was merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3095 Hi @KurtYoung, #3099 was merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3095

          Thanks @fhueske, i will fix this today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3095 Thanks @fhueske, i will fix this today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3095

          Thanks for fixing the tests @KurtYoung!
          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3095 Thanks for fixing the tests @KurtYoung! +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3095

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3095
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2.0 with 7f7692619b0fcf66a560fb8a20e9425f4fe02747
          Fixed for 1.3.0 with 0ea996a64ca0ff9589ffcd5b89967f51aee1ffe6

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2.0 with 7f7692619b0fcf66a560fb8a20e9425f4fe02747 Fixed for 1.3.0 with 0ea996a64ca0ff9589ffcd5b89967f51aee1ffe6

            People

            • Assignee:
              ykt836 Kurt Young
              Reporter:
              ykt836 Kurt Young
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development