Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27798

ConvertToLocalRelation should tolerate expression reusing output object

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.3
    • 2.3.4, 2.4.4, 3.0.0
    • SQL

    Description

      Steps to reproduce:

      Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the from_avro function to deserialize the binary into another column. Verify that all of the rows incorrectly have the same value.

      Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to deserialize using from_avro with that same schema:

      import org.apache.avro.Schema
      import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
      import org.apache.avro.io.EncoderFactory
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.avro.from_avro
      import org.apache.spark.sql.functions.col
      
      import java.io.ByteArrayOutputStream
      
      object TestApp extends App {
        // Payload container
        case class TestEvent(payload: Array[Byte])
        // Deserialized Payload
        case class TestPayload(message: String)
        // Schema for Payload
        val simpleSchema =
          """
            |{
            |"type": "record",
            |"name" : "Payload",
            |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
            |}
          """.stripMargin
        // Convert TestPayload into avro binary
        def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = {
          val schema = new Schema.Parser().parse(avsc)
          val out = new ByteArrayOutputStream()
          val writer = new GenericDatumWriter[GenericRecord](schema)
          val encoder = EncoderFactory.get().binaryEncoder(out, null)
          val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build()
          writer.write(rootRecord, encoder)
          encoder.flush()
          out.toByteArray
        }
      
        val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
        import spark.implicits._
        List(
          TestPayload("one"),
          TestPayload("two"),
          TestPayload("three"),
          TestPayload("four")
        ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema)))
          .toDS()
          .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema))
          .show(truncate = false)
      }
      

      And here is what this program outputs:

      +----------------------+-------------------+
      |payload               |deserializedPayload|
      +----------------------+-------------------+
      |[00 06 6F 6E 65]      |[four]             |
      |[00 06 74 77 6F]      |[four]             |
      |[00 0A 74 68 72 65 65]|[four]             |
      |[00 08 66 6F 75 72]   |[four]             |
      +----------------------+-------------------+

      Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I have not yet verified that this is an issue in cluster mode as well.

       

      I dug into a bit more of the code and it seems like the resuse of result in AvroDataToCatalyst is overwriting the decoded values of previous rows. I set a breakpoint in LocalRelation and the data sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            viirya L. C. Hsieh
            ykmori15 Yosuke Mori
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment