Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22498

cast the primary key for source table that has a decimal primary key as string, and then insert into a kudu table that has a string primary key throw the exception : UpsertStreamTableSink requires that Table has a full primary keys if it is updated

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Not A Problem
    • 1.12.1
    • None
    • Table SQL / Planner
    • None
    • flink 1.12.1

      jdk 1.8

      hive 2.1.1

      kudu 1.10.0

      kafka 2.0.0

    Description

      1. source table:

      CREATE TABLE ddl_source (
      appl_seq DECIMAL(16,2),
      name STRING,
      PRIMARY KEY(appl_seq) NOT ENFORCED
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ogg-json-03',
      'properties.bootstrap.servers' = 'xxxx:9092',
      'value.format' = 'canal-json'
      )

      *2. sink table:*create the table use impala

      create table rt_dwd.test_bug( 

          pk       string  ,

          name      string  ,

          primary key (pk)

      ) partition by hash (pk) partitions 5 stored as kudu 

      TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');

      *3. execute sql:*use blink planner

      insert into kuducatalog.default_database.`rt_dwd.test_bug`
      select CAST(appl_seq AS STRING), name  from ddl_source

       

      throw an exception :

      Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

       

      case A:if we use source table as follows, it will not throw the exception :

      CREATE TABLE ddl_source (
      appl_seq STRING,
      name STRING,
      PRIMARY KEY(appl_seq) NOT ENFORCED
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'ogg-json-03',
      'properties.bootstrap.servers' = 'xxxx:9092',
      'value.format' = 'canal-json'
      )

       

      case B:or we ddl kudu table,and use sql as follows,  it will not throw the exception :

      DDL:

      create table rt_dwd.test_bug( 

          pk       decimal(16,2),

          name      string  ,

          primary key (pk)

      ) partition by hash (pk) partitions 5 stored as kudu 

      TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');

      DML:

      insert into kuducatalog.default_database.`rt_dwd.test_bug`
      select  appl_seq, name  from ddl_source

       

      When debugging the source code, it may be related to SQL parsing engine

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            yanchenyun Carl
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 240h
              240h
              Remaining:
              Remaining Estimate - 240h
              240h
              Logged:
              Time Spent - Not Specified
              Not Specified

              Slack

                Issue deployment