Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1079

Cannot upsert on schema with Array of Record with single field

    XMLWordPrintableJSON

Details

    Description

      I am trying to trigger upserts on a table that has an array field with records of just one field.
      Here is the code to reproduce:

        val spark = SparkSession.builder()
            .master("local[1]")
            .appName("SparkByExamples.com")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .getOrCreate();
      
        // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/
      
        val arrayStructData = Seq(
          Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
          Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
          Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
          Row("Washington",null)
        )
      
        val arrayStructSchema = new StructType()
            .add("name",StringType)
            .add("booksIntersted",ArrayType(
              new StructType()
                .add("bookName",StringType)
      //          .add("author",StringType)
      //          .add("pages",IntegerType)
            ))
      
          val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
      

      Running insert following by upsert will fail:

        df.write
            .format("hudi")
            .options(getQuickstartWriteConfigs)
            .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
            .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
            .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
            .option(HoodieWriteConfig.TABLE_NAME, tableName)
            .mode(Overwrite)
            .save(basePath)
      
        df.write
            .format("hudi")
            .options(getQuickstartWriteConfigs)
            .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
            .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
            .option(HoodieWriteConfig.TABLE_NAME, tableName)
            .mode(Append)
            .save(basePath)
      

      If I create the books record with all the fields (at least 2), it works as expected.

      The relevant part of the exception is this:

      Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is not a groupCaused by: java.lang.ClassCastException: required binary bookName (UTF8) is not a group at org.apache.parquet.schema.Type.asGroupType(Type.java:207) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more

      Another way to test is by changing the generated data in the tips to just the amount, by dropping the currency on the tips_history field, tests will start failing:
      https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1

      I have narrowed this down to this block in the parquet-avro integration: https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875

      Which always returns false after trying to decide whether reader and writer schemas are compatible. Going through that code path makes me think it's related to the fields being optional, as the inferred schema seems to be (null, string) with default null instead of (string, null) with no default.

      At this point I'm lost, tried to figure something out based on this https://github.com/apache/hudi/pull/1406/files but I'm not sure where to start.

      Attachments

        Issue Links

          Activity

            People

              xushiyan Shiyan Xu
              tase Adrian Tanase
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Time Spent - 1.5h Remaining Estimate - 0.5h
                  0.5h
                  Logged:
                  Time Spent - 1.5h Remaining Estimate - 0.5h
                  1.5h