Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Connectors / Hive
    • None

    Description

      We can reproduce it with the following example

      @Test
      public void testUnionMapType() {
          // automatically load hive module in hive-compatible mode
          HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
          CoreModule coreModule = CoreModule.INSTANCE;
          for (String loaded : tableEnv.listModules()) {
              tableEnv.unloadModule(loaded);
          }
          tableEnv.loadModule("hive", hiveModule);
          tableEnv.loadModule("core", coreModule);
          tableEnv.executeSql(
                  "CREATE TABLE test_map_table (params string) PARTITIONED BY (`p_date` string)");
          tableEnv.executeSql("select map(\"\",\"\") as params from test_map_table union select map(\"\",\"\") as params from test_map_table");
      } 

      Because union semantics need to be de-duplicated, So flink will introduce an Aggregate,

      An exception will be thrown

      Unsupported type(MAP) to generate hash code, the type(MAP) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field 

      We can see the Aggregate operator in the execution plan

      optimize subquery_rewrite cost 33 ms.
      optimize result: 
      LogicalSink(table=[*anonymous_collect$1*], fields=[params])
      +- LogicalProject(inputs=[0])
         +- LogicalAggregate(group=[{0}])
            +- LogicalProject(inputs=[0])
               +- LogicalUnion(all=[true])
                  :- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]])
                  :  +- LogicalTableScan(table=[[test-catalog, default, test_map_table]])
                  +- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]])
                     +- LogicalTableScan(table=[[test-catalog, default, test_map_table]]) 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            luoyuxia luoyuxia
            tartarus tartarus

            Dates

              Created:
              Updated:

              Slack

                Issue deployment