Description
I noticed in several thread dumps that it appears kyro is serializing the class names associated with our keys and values.
Kyro supports pre-registering classes so that you don't have to serialize the class name and spark supports this via the spark.kryo.registrator property. We should do this so we don't have to serialize class names.
Thread 12154: (state = BLOCKED) - java.lang.Object.hashCode() @bci=0 (Compiled frame; information may be imprecise) - com.esotericsoftware.kryo.util.ObjectMap.get(java.lang.Object) @bci=1, line=265 (Compiled frame) - com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(java.lang.Class) @bci=18, line=61 (Compiled frame) - com.esotericsoftware.kryo.Kryo.getRegistration(java.lang.Class) @bci=20, line=429 (Compiled frame) - com.esotericsoftware.kryo.util.DefaultClassResolver.readName(com.esotericsoftware.kryo.io.Input) @bci=242, line=148 (Compiled frame) - com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(com.esotericsoftware.kryo.io.Input) @bci=65, line=115 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClass(com.esotericsoftware.kryo.io.Input) @bci=20, line=610 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=21, line=721 (Compiled frame) - com.twitter.chill.Tuple2Serializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=6, line=41 (Compiled frame) - com.twitter.chill.Tuple2Serializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=33 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=126, line=729 (Compiled frame) - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=142 (Compiled frame) - org.apache.spark.serializer.DeserializationStream$$anon$1.getNext() @bci=10, line=133 (Compiled frame) - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame) - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=32 (Compiled frame) - scala.collection.Iterator$$anon$13.hasNext() @bci=4, line=371 (Compiled frame) - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=32 (Compiled frame) - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame) - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=191, line=217 (Compiled frame) - org.apache.spark.shuffle.hash.HashShuffleReader.read() @bci=278, line=61 (Interpreted frame) - org.apache.spark.rdd.ShuffledRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=46, line=92 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=263 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=230 (Interpreted frame) - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=35 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=263 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=230 (Interpreted frame) - org.apache.spark.rdd.MapPartitionsRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=24, line=35 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=263 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=230 (Interpreted frame) - org.apache.spark.rdd.UnionRDD.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=22, line=87 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=263 (Interpreted frame) - org.apache.spark.rdd.RDD.iterator(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=33, line=230 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=166, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)