  1. Spark
  2. SPARK-12222

deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception


    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.0
    • Fix Version/s: 1.6.0, 2.0.0
    • Component/s: Spark Core
      here are some problems when deserialize RoaringBitmap. see the examples below:
      run this piece of code
      import com.esotericsoftware.kryo.io.

      {Input => KryoInput, Output => KryoOutput}

      import java.io.DataInput

      class KryoInputDataInputBridge(input: KryoInput) extends DataInput

      { override def readLong(): Long = input.readLong() override def readChar(): Char = input.readChar() override def readFloat(): Float = input.readFloat() override def readByte(): Byte = input.readByte() override def readShort(): Short = input.readShort() override def readUTF(): String = input.readString() // readString in kryo does utf8 override def readInt(): Int = input.readInt() override def readUnsignedShort(): Int = input.readShortUnsigned() override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt override def readFully(b: Array[Byte]): Unit = input.read(b) override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) override def readLine(): String = throw new UnsupportedOperationException("readLine") override def readBoolean(): Boolean = input.readBoolean() override def readUnsignedByte(): Int = input.readByteUnsigned() override def readDouble(): Double = input.readDouble() }

      class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput

      { override def writeFloat(v: Float): Unit = output.writeFloat(v) // There is no "readChars" counterpart, except maybe "readLine", which is not supported override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") override def writeDouble(v: Double): Unit = output.writeDouble(v) override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8 override def writeShort(v: Int): Unit = output.writeShort(v) override def writeInt(v: Int): Unit = output.writeInt(v) override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) override def write(b: Int): Unit = output.write(b) override def write(b: Array[Byte]): Unit = output.write(b) override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len) override def writeBytes(s: String): Unit = output.writeString(s) override def writeChar(v: Int): Unit = output.writeChar(v.toChar) override def writeLong(v: Long): Unit = output.writeLong(v) override def writeByte(v: Int): Unit = output.writeByte(v) }

      val outStream = new FileOutputStream("D:
      val output = new KryoOutput(outStream)
      val bitmap = new RoaringBitmap
      bitmap.serialize(new KryoOutputDataOutputBridge(output))

      val inStream = new FileInputStream("D:
      val input = new KryoInput(inStream)
      val ret = new RoaringBitmap
      ret.deserialize(new KryoInputDataInputBridge(input))


      this will throw `Buffer underflow` error:
      com.esotericsoftware.kryo.KryoException: Buffer underflow.
      at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
      at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
      at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
      at org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes

      after same investigation, i found this is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`.

      So i think we can fix this issue in this two ways:
      1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in kryo (i am not sure the upgrade is safe in spark, can you check it? @davies )

      2) we can bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method:
      class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
      override def skipBytes(n: Int): Int = {
      var remaining: Long = n
      while (remaining > 0)

      { val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int] input.skip(skip) remaining -= skip }





              scwf Fei Wang
              scwf Fei Wang
              Created: