Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5058

The primary key cannot be empty when Flink reads an error from the hudi table

    XMLWordPrintableJSON

Details

    Description

      The primary key cannot be empty when Flink reads an error from the hudi table. Spark SQL is used to create tables and Spark writes data. Then the flash reports an error when reading.

      spark sql write ,USE HUDI 0.11.1

       

       

      create table test_hudi_cc16 (
        id bigint,
        name string,
        name2 string,
        ts bigint
      ) using hudi
      tblproperties (
        type = 'cow',
        primaryKey = 'id' );
       insert into test_hudi_cc16
      values
      (1, 'cc2', 'cc32',12);
       

       

      flink read, use hudi 0.12.1:

       

      CREATE CATALOG myhudi WITH(
          'type' = 'hudi',
          'default-database' = 'test_hudi1',
          'catalog.path' = '/user/hdpu/warehouse',
          'mode' = 'hms',
          'hive.conf.dir' = 'hdfs:///user/hdpu/streamx/conf_data/hive_conf'
      )
      
      select *
      from myhudi.test_hudi6.test_hudi_cc16; 

       

      error:

       

      org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_id'. Column 'id' is nullable.    at org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:352)
          at org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:312)
          at org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:88)
          at org.apache.flink.table.api.Schema.resolve(Schema.java:123)
          at org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:877)
          at org.apache.flink.table.catalog.CatalogManager.resolveCatalogBaseTable(CatalogManager.java:863)
          at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:426)
          at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:395)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1061)
          at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:649)
          at org.grg_banking.flink.sqlexecute.FlinkUtils.exeucteSqlFile2(FlinkUtils.java:260)
          at org.apache.flink.catalog.test.TestCatalog.test1(TestCatalog.java:49)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
      Process finished with exit code -1
       

       

       

      It may be a version compatibility problem. Flink hudi 0.12.1 does not support reading the table of spark hudi 0.11. However, I think it should be backward compatible with the read function.

       

      Attachments

        Activity

          People

            waywtdcc waywtdcc
            waywtdcc waywtdcc
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: