Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21001

Flink job is blocked while using tableEnvironment with tableFunction and join




      The code is as follow.

      package com.oppo.recdata.datapipe;
      import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum;
      import com.oppo.recdata.datapipe.flink.transform.ExplodeModify;
      import com.oppo.recdata.datapipe.flink.transform.TableExplode;
      import com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction;
      import org.apache.flink.table.api.*;
      import static org.apache.flink.table.api.Expressions.row;
       * @author wujianzhen@oppo.com
      public class BatchTable {
          public static void main(String[] args) {
              EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
              TableEnvironment tableEnv = TableEnvironment.create(settings);
              ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, null, "&");
              tableEnv.createTemporarySystemFunction("explode", new TableExplode(modify));
              tableEnv.createFunction("collect_map", CollectMapAggregateFunction.class);
              Table table = tableEnv.fromValues(
                      DataTypes.FIELD("buuid", DataTypes.STRING()),
                      DataTypes.FIELD("docType", DataTypes.INT()),
                      DataTypes.FIELD("viewTime", DataTypes.INT()),
                      DataTypes.FIELD("subCategory", DataTypes.STRING())
                  row("John", "1", "36", "NBA&football")
              tableEnv.createTemporaryView("feeds_expose_click_profile", table);
              Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as varchar) as docType, viewTime, subCategory from feeds_expose_click_profile where buuid is not null and docType is not null and viewTime > 0");
              tableEnv.createTemporaryView("add_profile", add_profile);
              Table cate2Click = tableEnv.sqlQuery("select buuid, docType, viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as t(cate2) where subCategory is not null");
              tableEnv.createTemporaryView("cate2_click", cate2Click);
              Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, docType");
              tableEnv.createTemporaryView("user_cate2_detail", cate2_detail);
              Table user_global_cate2 = tableEnv.sqlQuery("select 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue,  buuid as keyName, docType from cate2_click group by buuid, docType");
              tableEnv.createTemporaryView("user_global_cate2", user_global_cate2);
              Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as keyName, docType from cate2_click group by cate2, docType ");
              Table global_user_global_cate2 = tableEnv.sqlQuery("select 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, 'global_feature' as keyName, docType from cate2_click group by docType");
              tableEnv.createTemporaryView("global_user_global_cate2", global_user_global_cate2);
              Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and a.docType = c.docType join global_user_global_cate2 d on a.docType = d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 and d.fieldValue > 0");
              tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail);
              Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, concat(docType, '#', keyName) as keyName from cate2_cs_detail  where fieldValue < 0 or fieldValue >= 0 group by keyName, docType");

      The client log is as follow.

      "C:\Program Files\Java\jdk1.8.0_73\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\lib\idea_rt.jar=64196:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\80242151\AppData\Local\Temp\classpath403316789.jar com.oppo.recdata.datapipe.BatchTable
      SLF4J: Class path contains multiple SLF4J bindings.
      SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/D:/lib/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.8.2/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.15/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
      SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
      2021-01-17 15:05:25,639 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
      2021-01-17 15:05:25,645 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
      2021-01-17 15:05:25,652 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
      2021-01-17 15:05:25,653 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
      2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
      2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.


        1. client_log.txt
          945 kB



            Unassigned Unassigned
            Janze Wu
            0 Vote for this issue
            3 Start watching this issue

