Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24950

Use Hive Dialect execute Hive DDL, But throw a NullPointerException

    XMLWordPrintableJSON

Details

    • hive-exec.version = 1.1.0 do not supper hive Dialect, try to use higher version like 1.2.0

    Description

      Dear all friends:

      I try to execute a hive ddl sql with stream table api on flink-1.13.2, the code like:

      ```java

      String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql");
      EnvironmentSettings settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode().build();
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

      String name = "hive";
      String defaultDatabase = "stream";
      String hiveConfDir = "conf";

      HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
      tableEnv.registerCatalog("hive", hive);
      tableEnv.useCatalog("hive");
      tableEnv.useDatabase("stream");

      tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table");
      // 设置HIVE方言
      tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
      tableEnv.executeSql(hiveDDL);

      ```

      the hive server in cdh5.14.2, and the ddl sql like:

      ```sql

      CREATE TABLE dimension_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP(3),
      update_user STRING
      )
      PARTITIONED BY (
      pt_year STRING,
      pt_month STRING,
      pt_day STRING
      )
      TBLPROPERTIES (
      – using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.monitor-interval' = '12 h',
      'streaming-source.partition-order' = 'partition-name', – option with default value, can be ignored.

      – using partition file create-time order to load the latest partition every 12h
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.partition-order' = 'create-time',
      'streaming-source.monitor-interval' = '12 h',

      – using partition-time order to load the latest partition every 12h
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.monitor-interval' = '12 h',
      'streaming-source.partition-order' = 'partition-time',
      'partition.time-extractor.kind' = 'default',
      'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
      )

      ```

      then run it, but throw NullPointerException, like:

      ```

      2021-11-18 15:33:00,387 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Setting hive conf dir as conf
      2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      2021-11-18 15:33:01,345 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Created HiveCatalog 'hive'
      2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to metastore with URI thrift://cdh-dev-node-119:9083
      2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to metastore, current connections: 1
      2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore.
      2021-11-18 15:33:01,856 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Connected to Hive metastore
      2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] - Set the current default catalog as [hive] and the current default database as [stream].
      2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources
      2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
      2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
      2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db
      2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] - No Tez session required at this point. hive.execution.engine=mr.
      Exception in thread "main" java.lang.NullPointerException
          at org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(HiveShimV100.java:422)
          at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:217)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
          at com.hacker.flinksql.hive.HiveSqlTest.main(HiveSqlTest.java:48)

      ```

      I found the error code in flink-1.13.2, 

      org.apache.flink.table.catalog.hive.client.HiveShimV100.java - line:422

      this method params is null, the code:

      ```

      @Override
      public void registerTemporaryFunction(String funcName, Class funcClass) {
      try

      { registerTemporaryFunction.invoke(null, funcName, funcClass); }

      catch (IllegalAccessException | InvocationTargetException e)

      { throw new FlinkHiveException("Failed to register temp function", e); }

      }

      ```

      my maven dependency

      ```

      <properties>
      <hadoop.version>2.6.0-cdh5.14.2</hadoop.version>
      <hive.version>1.1.0-cdh5.14.2</hive.version>
      </properties>

      <!-- flink sql core -->
      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
      </dependency>

      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
      </dependency>

      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
      </dependency>

      <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.5</version>
      <scope>provided</scope>
      </dependency>

      <!-- hive catalog -->
      <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
      </dependency>

      <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
      <scope>provided</scope>
      </dependency>

      <!-- catalog hadoop dependency -->
      <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.0-cdh5.15.2</version>
      <scope>provided</scope>
      </dependency>

      <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.0-cdh5.15.2</version>
      <scope>provided</scope>
      </dependency>

      ```

      Attachments

        Activity

          People

            Unassigned Unassigned
            slashchenxiaojun xiaojunchen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: