When running jobs on Spark, TinkerPop currently recommends setting spark.serializer=GryoSerializer. This makes GryoSerializer responsible for serializing not just TinkerPop types but also scala runtime types and Spark internals. GryoSerializer doesn't extend either of the two serializers provided by Spark. It effectively assumes responsibility for reimplementing them.
This is problematic. It is not totally trivial to replicate the functionality of Spark's standard serializers. It is also not easy to empirically test all meaningful cases. For instance, there is a conditional within Spark that selects between two concrete Map implementations depending on whether the current RDD partition count exceeds 2k (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala#L47-L53). The implementation used below this threshold serializes fine on GryoSerializer. The implementation used above the threshold does not. Above the partition threshold, I've started getting org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.tinkerpop.shaded.kryo.KryoException: java.io.IOException: I failed to find one of the right cookies. Google leads to https://github.com/RoaringBitmap/RoaringBitmap/issues/64. However, just switching to Spark's KryoSerializer without changing anything somehow fixes the problem in my environment, implying that Spark has done something to address this problem that may not be fully replicated in TinkerPop.
However, "just switching to Spark's KryoSerializer" is not a great approach. For one thing, we lose the benefit of TinkerPop's space-efficient StarGraph serializer, and Spark traversals can produce a lot of little ego-StarGraphs. These still serialize, but KryoSerializer uses its default behavior (FieldSerializer), which is not as clever about StarGraphs as TinkerPop's StarGraphSerializer. TinkerPop's reliance on its own internal shaded Kryo means that its serializers cannot be registered with Spark's unshaded Kryo.
More concerning, it's impossible to completely switch to KryoSerializer just by tweaking the configuration. Besides spark.serializer, there is also a setting spark.closure.serializer for which the only supported value is JavaSerializer. Key TP classes that make it into the object reference graphs of Spark closures implement Serializable by resorting to TinkerPop's shaded Kryo via HadoopPools (looking at Object/VertexWritable). This leads to surprises with custom property data types. It doesn't matter if those types implement Serializable, and it doesn't matter if Spark's KryoSerializer is configured to accept those types. If those types are reachable from Object/VertexWritable, then they must be registered with TinkerPop's internal shaded Kryo, or else it will choke on them (unless it was explicitly configured to allow unregistered classes).
I suggest the following change to give users more flexibility in their choice of spark.serializer, and to allow them to reuse TinkerPop's serializers if they choose not to use GryoSerializer: introduce lightweight interfaces that decouple TinkerPop's serialization logic from the exact Kryo shaded/unshaded package doing the work. TinkerPop's serialization logic would be written against interfaces that replicate a minimal subset of Kryo, and then TP's shaded Kryo or Spark's unshaded Kryo could be plugged in underneath without having to touch the source, recompile any TinkerPop code, or munge bytecode at runtime.
This would not resolve all of the potential problems/complexity around TinkerPop serialization, but it would make it possible for users to apply the spark.serializer of their choice, switching off GryoSerializer if they so choose. Users could also continue to use GyroSerializer as they have until now.
I've already run this through a few iterations locally and have an abstraction that allows me to run with spark.serializer=KryoSerializer, register GryoMapper/GryoSerializer's standard set of types, reuse TinkerPop's StarGraph serializer, and bypass GryoSerializer in HadoopPools to boot. I just rebased it on current master. I'll submit a PR for discussion/review.