Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Not A Problem
-
2.4.0
-
None
-
Local Spark v2.4.0
Kotlin v1.3.21
Description
I'm trying to create an Aggregator that uses a custom container that should be serialized with Kryo:
class StringSet(other: Collection<String>) : HashSet<String>(other), KryoSerializable { companion object { @JvmStatic private val serialVersionUID = 1L } constructor() : this(Collections.emptyList()) override fun write(kryo: Kryo, output: Output) { output.writeInt(this.size) for (string in this) { output.writeString(string) } } override fun read(kryo: Kryo, input: Input) { val size = input.readInt() repeat(size) { this.add(input.readString()) } } }
However, if I look at the corresponding value in the Row after aggregation (for example by using collectAsList()), I see a byte[]. Interestingly, the first byte in that array seems to be some sort of noise, and I can deserialize by doing something like this:
val b = row.getAs<ByteArray>(2) val input = Input(b.copyOfRange(1, b.size)) // extra byte? val set = Kryo().readObject(input, StringSet::class.java)
Used configuration:
SparkConf() .setAppName("Hello Spark with Kotlin") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .registerKryoClasses(arrayOf(StringSet::class.java))
Sample repo with all the code.