Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22498

cast the primary key for source table that has a decimal primary key as string, and then insert into a kudu table that has a string primary key throw the exception : UpsertStreamTableSink requires that Table has a full primary keys if it is updated

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Not A Problem
    • 1.12.1
    • None
    • Table SQL / Planner
    • None
    • flink 1.12.1

      jdk 1.8

      hive 2.1.1

      kudu 1.10.0

      kafka 2.0.0

    Description

      1. source table:

      CREATE TABLE ddl_source (
      appl_seq DECIMAL(16,2),
      name STRING,
      PRIMARY KEY(appl_seq) NOT ENFORCED
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ogg-json-03',
      'properties.bootstrap.servers' = 'xxxx:9092',
      'value.format' = 'canal-json'
      )

      *2. sink table:*create the table use impala

      create table rt_dwd.test_bug( 

          pk       string  ,

          name      string  ,

          primary key (pk)

      ) partition by hash (pk) partitions 5 stored as kudu 

      TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');

      *3. execute sql:*use blink planner

      insert into kuducatalog.default_database.`rt_dwd.test_bug`
      select CAST(appl_seq AS STRING), name  from ddl_source

       

      throw an exception :

      Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

       

      case A:if we use source table as follows, it will not throw the exception :

      CREATE TABLE ddl_source (
      appl_seq STRING,
      name STRING,
      PRIMARY KEY(appl_seq) NOT ENFORCED
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ogg-json-03',
      'properties.bootstrap.servers' = 'xxxx:9092',
      'value.format' = 'canal-json'
      )

       

      case B:or we ddl kudu table,and use sql as follows,  it will not throw the exception :

      DDL:

      create table rt_dwd.test_bug( 

          pk       decimal(16,2),

          name      string  ,

          primary key (pk)

      ) partition by hash (pk) partitions 5 stored as kudu 

      TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');

      DML:

      insert into kuducatalog.default_database.`rt_dwd.test_bug`
      select  appl_seq, name  from ddl_source

       

      When debugging the source code, it may be related to SQL parsing engine

      Attachments

        1. bug.rar
          3 kB
          Carl

        Activity

          People

            Unassigned Unassigned
            yanchenyun Carl
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 240h
                240h
                Remaining:
                Remaining Estimate - 240h
                240h
                Logged:
                Time Spent - Not Specified
                Not Specified