Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30679

Can not load the data of hive dim table when project-push-down is introduced

    XMLWordPrintableJSON

Details

    Description

      维表project push down优化引入:
      https://issues.apache.org/jira/browse/FLINK-29138

      hive维表的两个问题:
      https://issues.apache.org/jira/browse/FLINK-29992
      https://issues.apache.org/jira/browse/FLINK-30679

       

       

       

      Can not load the data of hive dim table when project-push-down is introduced.

       

      hive-exec  version: 2.3.4

      flink version: 1.14.6

      flink-hive-connector: the latest code for release-1.14 branch

       

      vectorize read:

       

      java.lang.ArrayIndexOutOfBoundsException: 3
          at org.apache.flink.connectors.hive.read.HiveTableInputFormat.useOrcVectorizedRead(HiveTableInputFormat.java:276) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:129) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
          at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
          at LookupFunction$26.flatMap(Unknown Source) ~[?:?] 

       

       

      mapreduce read:

       

      java.lang.ArrayIndexOutOfBoundsException: 3
          at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.lambda$new$0(HiveMapredSplitReader.java:139) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) ~[?:1.8.0_301]
          at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) ~[?:1.8.0_301]
          at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_301]
          at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_301]
          at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_301]
          at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) ~[?:1.8.0_301]
          at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_301]
          at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_301]
          at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.<init>(HiveMapredSplitReader.java:141) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:157) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) ~[flink-connector-hive-1.14.1-SNAPSHOT.jar:1.14-SNAPSHOT]
          at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:132) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
          at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:105) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
          at LookupFunction$26.flatMap(Unknown Source) ~[?:?]
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6]
          at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) ~[flink-table-runtime_2.11-1.14.6.jar:1.14.6] 

       

       

      The sql :

       

      CREATE TABLE kafkaTableSource (
          name string,
          age int,
          sex string,
          address string,
          ptime AS PROCTIME()
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'hehuiyuan1',
          'scan.startup.mode' = 'latest-offset',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.client.id' = 'test-consumer-group',
          'properties.group.id' = 'test-consumer-group',
          'format' = 'csv'
      );
      
      CREATE TABLE printsink (
          name string,
          age int,
          sex string,
          address string,
          score bigint,
          dt string
      ) WITH (
          'connector' = 'print'
      );
      
      CREATE CATALOG myhive
      WITH (
              'type' = 'hive',
              'default-database' = 'hhy',
              'hive-version' = '2.0.0',
              'hadoop-conf-dir'='/Users/hehuiyuan/soft/hadoop/hadoop-2.7.3/etc/hadoop'
      );
      
      USE CATALOG myhive;
      USE hhy;
      
      set table.sql-dialect=hive;
      CREATE TABLE IF NOT EXISTS tmp_flink_test_text (
          name STRING,
          age INT,
          score BIGINT
      ) PARTITIONED BY (dt STRING) STORED AS TEXTFILE TBLPROPERTIES (
          'streaming-source.enable' = 'false',
          'streaming-source.partition.include' = 'all',
          'lookup.join.cache.ttl' = '5 min'
      );
      set table.sql-dialect=default;
      
      USE CATALOG default_catalog;
      INSERT INTO default_catalog.default_database.printsink
      SELECT s.name, s.age, s.sex, s.address, r.score, r.dt
      FROM default_catalog.default_database.kafkaTableSource  as s
      JOIN myhive.hhy.tmp_flink_test_text FOR SYSTEM_TIME AS OF s.ptime  AS r
      ON r.name = s.name;
       

       

       

      Attachments

        Activity

          People

            hehuiyuan hehuiyuan
            hehuiyuan hehuiyuan
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: