Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19244

CSV format can't deserialize null ROW field

    XMLWordPrintableJSON

Details

    Description

      CREATE TABLE csv_table (
      f0 ROW<f0c0 VARCHAR, f0c1 VARCHAR>,
      f1 ROW<f1c0 INT, f1c1 VARCHAR>
      )

      If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
      When deserialize the data, the jsonNode of f0 would be [], then throws cast exception: Row length mismatch. 2 fields expected but was 0.

      In the real scene, I set two streams:
      First, read json_table, sink to csv_table, which has the schema above.
      Then, read csv_table, do sth.

      if json is {"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second streams failed with the exception.

      If this is a bug, I want to help to fix this and unittests.

       

      here is the  test code:

      // code placeholder
      val subDataType0 = ROW(
        FIELD("f0c0", STRING()),
        FIELD("f0c1", STRING())
      )
      val subDataType1 = ROW(
        FIELD("f1c0", INT()),
        FIELD("f1c1", INT())
      )
      val datatype = ROW(
        FIELD("f0", subDataType0),
        FIELD("f1", subDataType1))
      val rowType = datatype.getLogicalType.asInstanceOf[RowType]
      
      val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
      val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType)).build()
      def foo(r: RowData): Unit = {
        val serData = new String(serSchema.serialize(r))
        print(s"${serData}")
        val deserRow = deserSchema.deserialize(serData.getBytes)
        println(s"${deserRow}")
      }
      
      val normalRowData = GenericRowData.of(
        GenericRowData.of(BinaryStringData.fromString("hello"), BinaryStringData.fromString("world")),
        GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
      )
      // correct.
      foo(normalRowData)
      
      val nullRowData = GenericRowData.of(
        null,
        GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
      )
      /*
      Exception in thread "main" java.io.IOException: Failed to deserialize CSV row ',123;456
      ...
      Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected but was 0.
       */
      foo(nullRowData)
      

      Attachments

        Issue Links

          Activity

            People

              yingz Ying Z
              yingz Ying Z
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: