The flink-table component consists of
- Scala-embedded Table API
- String-based Table API (for Java)
and compiles to two execution backends:
- DataStream API
- DataSet API
There are many different translation paths involved until a query is executed:
- Table API String -> Table API logical plan
- Table API Scala-expressions -> Table API logical plan
- Table API logical plan -> Calcite RelNode plans
- SQL -> Calcite RelNode plans (done by exclusively via Calcite)
- Calcite RelNodes -> DataSet RelNodes
- DataSet RelNodes -> DataSet program
- Calcite RelNodes -> DataStream RelNodes
- DataStream RelNodes -> DataStream program
- Calcite RexNode expressions -> generated code
which need to be thoroughly tested.
Initially, many tests were done as end-to-end integration tests with high overhead.
However, due to the combinations of APIs and execution back-ends, this approach causes many redundant tests and long build times.
Therefore, I propose the following testing scheme:
1. Table API String -> Table API expression:
The String-based Table API is tested by comparing the resulting logical plan (Table.logicalPlan) to the logical plan of an equivalent Table program that uses the Scala-embedded syntax. The logical plan is the Table API internal representation which is later converted into a Calcite RelNode plan.
All existing integration tests that check the "Java" Table API should be ported to unit tests. There will also be duplicated tests because, the Java Table API is tested for batch and streaming which is not necessary anymore.
2. Table API Scala-expressions -> Table API logical plan -> Calcite RelNodes -> DataSet RelNodes / DataStream RelNodes
These tests cover the translation and optimization of Table API queries and verify the Calcite optimized plan. We need distinct tests for DataSet and DataStream environments since features and translation rules vary. These test will also identify if added or modified rules or cost functions result in different plans. These should be the main tests for the Table API and very extensive.
These tests should be implemented by extending the TableTestBase which is a base class for unit tests and hence very lightweight.
3. SQL -> Calcite RelNodes -> DataSet RelNodes / DataStream RelNodes
These are the same tests as described for 2. (Table API Scala-expressions -> DataSet / DataStream RelNodes) but just for SQL.
4. DataSet RelNode -> DataSet program
Unfortunately, the DataSet API lacks a good mechanism to test generated programs, i.e., get a plan traversable of all operators with access to all user-defined functions. Until such a testing utility is available, I propose to test the translation to DataSet programs as end-to-end integration tests. However, I think we can run most tests on a Collection ExecutionEnvironment, which does not start a Flink cluster but runs all code on Java collections. This makes these tests much more lightweight than cluster-based ITCases. The goal of these tests should be to cover all translation paths from DataSetRel to DataSet program, i.e., all DataSetRel nodes and their translation logic. These tests should be implemented by extending the TableProgramsCollectionTestBase (see
Moreover, we should have very few cluster-based ITCases in place that check the execution path with the actual operators, serializers, and comparators. However, we should limit these tests to the minimum to keep build time low. These tests should be implemented by extending the TableProgramsClusterTestBase (
FLINK-5268) and all be located in the same class to avoid repeated instantiation of the Flink MiniCluster.
5. DataStream RelNode -> DataStream program
Here basically the same applies as for the DataSet programs. I'm not aware of a good way to test generated DataStream programs without executing them. A testing utility would be great for all libraries that are built on top of the API. Until then, I propose to use end-to-end integration tests. Unfortunately, the DataStream API does not feature a collection execution mode, so all tests need to be run on a MiniCluster. Therefore, we should again keep these tests to the minimum. These tests should be implemented by extending the StreamingMultipleProgramsTestBase and be located in few classes to avoid repeated instantiations of the FLink MiniCluster.
6. (Scala expressions | String-parsed expressions | SQL expressions) -> RexNode expressions -> Generated Code
In order to avoid extensive optimization tests for each supported expression or built-in function, we have the ExpressionTestBase which compiles expressions into generated code and tests for the correctness of results. All supported expressions and built-in function should be tested by extending the ExpressionTestBase instead of running a full integration test.
I will add a few JIRAs to migrate existing tests to the new testing scheme.
|Split TableProgramsTestBase into TableProgramsCollectionTestBase and TableProgramsClusterTestBase||Resolved||Unassigned|
|Convert string-based Table API tests into unit tests||Closed||Unassigned|
|3.||Refactor the batch Scala-expression Table API tests||Open|
|Refactor the batch SQL tests||Closed|
|5.||Refactor the stream Scala-expression Table API tests||Open|
|Refactor stream SQL tests||Closed|
|7.||Convert TableEnvironmentITCases to unit tests||Open|