Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.14.6
Description
维表project push down优化引入:
https://issues.apache.org/jira/browse/FLINK-29138
hive维表的两个问题:
https://issues.apache.org/jira/browse/FLINK-29992
https://issues.apache.org/jira/browse/FLINK-30679
Can not load the data of hive dim table when project-push-down is introduced.
hive-exec version: 2.3.4
flink version: 1.14.6
flink-hive-connector: the latest code for release-1.14 branch
vectorize read:
java.lang.ArrayIndexOutOfBoundsException: 3 at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
mapreduce read:
java.lang.ArrayIndexOutOfBoundsException: 3
at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301]
at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301]
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301]
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301]
at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
The sql :
CREATE TABLE kafkaTableSource ( name string, age int, sex string, address string, ptime AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'hehuiyuan1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.client.id' = 'test-consumer-group', 'properties.group.id' = 'test-consumer-group', 'format' = 'csv' ); CREATE TABLE printsink ( name string, age int, sex string, address string, score bigint, dt string ) WITH ( 'connector' = 'print' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'hhy', 'hive-version' = '2.0.0', 'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop' ); USE CATALOG myhive; USE hhy; set table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS tmp_flink_test_text ( name STRING, age INT, score BIGINT ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES ( 'streaming-source.enable' = 'false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5 min' ); set table.sql-dialect=default; USE CATALOG default_catalog; INSERT INTO default_catalog.default_database.printsink SELECT s.name, s.age, s.sex, s.address, r.score, r.dt FROM default_catalog.default_database.kafkaTableSource as s JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime AS r ON r.name = s.name;