Status: Resolved
Resolution: Fixed
hive-exec.version = 1.1.0 do not supper hive Dialect, try to use higher version like 1.2.0
Dear all friends:
I try to execute a hive ddl sql with stream table api on flink-1.13.2, the code like:
String hiveDDL = ResourceUtil.readClassPathSource("hive-ddl.sql");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String name = "hive";
String defaultDatabase = "stream";
String hiveConfDir = "conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("hive", hive);
tableEnv.executeSql("DROP TABLE IF EXISTS dimension_table");
// 设置HIVE方言
the hive server in cdh5.14.2, and the ddl sql like:
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING
pt_year STRING,
pt_month STRING,
pt_day STRING
– using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name', – option with default value, can be ignored.
– using partition file create-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',
'streaming-source.monitor-interval' = '12 h',
– using partition-time order to load the latest partition every 12h
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time',
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
then run it, but throw NullPointerException, like:
2021-11-18 15:33:00,387 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Setting hive conf dir as conf
2021-11-18 15:33:00,481 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-11-18 15:33:01,345 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Created HiveCatalog 'hive'
2021-11-18 15:33:01,371 INFO [hive.metastore] - Trying to connect to metastore with URI thrift://cdh-dev-node-119:9083
2021-11-18 15:33:01,441 INFO [hive.metastore] - Opened a connection to metastore, current connections: 1
2021-11-18 15:33:01,521 INFO [hive.metastore] - Connected to metastore.
2021-11-18 15:33:01,856 INFO [org.apache.flink.table.catalog.hive.HiveCatalog] - Connected to Hive metastore
2021-11-18 15:33:01,899 INFO [org.apache.flink.table.catalog.CatalogManager] - Set the current default catalog as [hive] and the current default database as [stream].
2021-11-18 15:33:03,290 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/681dd0aa-ba35-4a0e-b069-3ad48f030774_resources
2021-11-18 15:33:03,298 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,305 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created local directory: /var/folders/4m/n1wgh7rd2yqfv301kq00l4q40000gn/T/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774
2021-11-18 15:33:03,311 INFO [org.apache.hadoop.hive.ql.session.SessionState] - Created HDFS directory: /tmp/hive/chenxiaojun/681dd0aa-ba35-4a0e-b069-3ad48f030774/_tmp_space.db
2021-11-18 15:33:03,314 INFO [org.apache.hadoop.hive.ql.session.SessionState] - No Tez session required at this point. hive.execution.engine=mr.
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.catalog.hive.client.HiveShimV100.registerTemporaryFunction(
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
at com.hacker.flinksql.hive.HiveSqlTest.main(
I found the error code in flink-1.13.2, - line:422
this method params is null, the code:
public void registerTemporaryFunction(String funcName, Class funcClass) {
catch (IllegalAccessException | InvocationTargetException e)
{ throw new FlinkHiveException("Failed to register temp function", e); }}
my maven dependency
<!-- flink sql core -->
<!-- hive catalog -->
<!-- catalog hadoop dependency -->