Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27798

from_avro can modify variables in other rows in local mode


    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.4.3
    • Fix Version/s: 2.3.4, 2.4.4, 3.0.0
    • Component/s: SQL
    • Labels:


      Steps to reproduce:

      Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the from_avro function to deserialize the binary into another column. Verify that all of the rows incorrectly have the same value.

      Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to deserialize using from_avro with that same schema:

      import org.apache.avro.Schema
      import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
      import org.apache.avro.io.EncoderFactory
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.avro.from_avro
      import org.apache.spark.sql.functions.col
      import java.io.ByteArrayOutputStream
      object TestApp extends App {
        // Payload container
        case class TestEvent(payload: Array[Byte])
        // Deserialized Payload
        case class TestPayload(message: String)
        // Schema for Payload
        val simpleSchema =
            |"type": "record",
            |"name" : "Payload",
            |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
        // Convert TestPayload into avro binary
        def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = {
          val schema = new Schema.Parser().parse(avsc)
          val out = new ByteArrayOutputStream()
          val writer = new GenericDatumWriter[GenericRecord](schema)
          val encoder = EncoderFactory.get().binaryEncoder(out, null)
          val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build()
          writer.write(rootRecord, encoder)
        val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
        import spark.implicits._
        ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema)))
          .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema))
          .show(truncate = false)

      And here is what this program outputs:

      |payload               |deserializedPayload|
      |[00 06 6F 6E 65]      |[four]             |
      |[00 06 74 77 6F]      |[four]             |
      |[00 0A 74 68 72 65 65]|[four]             |
      |[00 08 66 6F 75 72]   |[four]             |

      Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I have not yet verified that this is an issue in cluster mode as well.


      I dug into a bit more of the code and it seems like the resuse of result in AvroDataToCatalyst is overwriting the decoded values of previous rows. I set a breakpoint in LocalRelation and the data sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate.


          Issue Links



              • Assignee:
                viirya Liang-Chi Hsieh
                ykmori15 Yosuke Mori
              • Votes:
                1 Vote for this issue
                5 Start watching this issue


                • Created: