Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.11.0, 1.11.1, 1.11.2
-
None
-
TEST CODE LIKE THIS:
CREATE EXTERNAL TABLE hive_to_es (
key string,
value string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'hive_to_es/_doc',
'es.index.auto.create' = 'TRUE',
'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200'
);insert into hive_to_es (key, value) values ('name','tom');
insert into hive_to_es (key, value) values ('yes','aaa');select * from hive_to_es;
TEST CODE LIKE THIS: CREATE EXTERNAL TABLE hive_to_es ( key string, value string ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'hive_to_es/_doc', 'es.index.auto.create' = 'TRUE', 'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200' ); insert into hive_to_es (key, value) values ('name','tom'); insert into hive_to_es (key, value) values ('yes','aaa'); select * from hive_to_es;
Description
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop
input format
we add a patch like this:
flink-connector-hive_2.12-1.11.2.jar
org/apache/flink/connectors/hive/HiveTableSink.java +134
ADD PATCH:
// code placeholder if (sd.getOutputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat"); } if (sd.getOutputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat"); }
org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305
ADD PATCH:
// code placeholder if (sd.getInputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat"); jobConf.set("hbase.table.name", partition.getTableProps().getProperty("hbase.table.name")); jobConf.set("hbase.columns.mapping", partition.getTableProps().getProperty("hbase.columns.mapping")); } if (sd.getInputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat"); jobConf.set("location", sd.getLocation()); for (Enumeration en = partition.getTableProps().keys(); en.hasMoreElements();) { String key = en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key, partition.getTableProps().getProperty(key)); } } }