Details
-
Sub-task
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
We can reproduce it with the following example
@Test public void testUnionMapType() { // automatically load hive module in hive-compatible mode HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion()); CoreModule coreModule = CoreModule.INSTANCE; for (String loaded : tableEnv.listModules()) { tableEnv.unloadModule(loaded); } tableEnv.loadModule("hive", hiveModule); tableEnv.loadModule("core", coreModule); tableEnv.executeSql( "CREATE TABLE test_map_table (params string) PARTITIONED BY (`p_date` string)"); tableEnv.executeSql("select map(\"\",\"\") as params from test_map_table union select map(\"\",\"\") as params from test_map_table"); }
Because union semantics need to be de-duplicated, So flink will introduce an Aggregate,
An exception will be thrown
Unsupported type(MAP) to generate hash code, the type(MAP) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field
We can see the Aggregate operator in the execution plan
optimize subquery_rewrite cost 33 ms. optimize result: LogicalSink(table=[*anonymous_collect$1*], fields=[params]) +- LogicalProject(inputs=[0]) +- LogicalAggregate(group=[{0}]) +- LogicalProject(inputs=[0]) +- LogicalUnion(all=[true]) :- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]]) : +- LogicalTableScan(table=[[test-catalog, default, test_map_table]]) +- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]]) +- LogicalTableScan(table=[[test-catalog, default, test_map_table]])