Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Done
-
1.13.2
-
None
-
flink:flink-1.13.2
hive: 1.1.0-cdh5.14.0
Description
when I use flink streaming and write into hive,I set the following parameters:
TBLPROPERTIES ('hive.output.file.extension'='.parquet')
I can't find the suffix ".parquet" file when I check in hdfs, but it fires normally when I use Hive SQL.
Am I using it correctly? or other reasons?
this is my demo:
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tableEnvSettings) val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf") tEnv.registerCatalog("myHive", catalog) tEnv.useCatalog("myHive") tEnv.useDatabase("xx") tEnv.getConfig.setSqlDialect(SqlDialect.HIVE) val createTSql: String = s""" |create table if not exists $hiveTable ( | ... |) |... |TBLPROPERTIES ( | 'sink.parallelism'='1', | 'partition.time-extractor.timestamp-pattern'='$$dt', | 'sink.shuffle-by-partition.enable'='true', | 'sink.partition-commit.policy.kind'='metastore,success-file', | 'hive.output.file.extension'='.parquet' |) |""".stripMargin tEnv.executeSql(createTSql) tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) tEnv.createTemporaryView("t_xx", DataStream) val insertSql: String = s""" | insert into $hiveTable | select * | from t_xx |""".stripMargin tEnv.executeSql(insertSql) tEnv.dropTemporaryView("t_userAction")