The original problem: Serializing GenericRecords in Spark Core results in a very high overhead, as the schema is serialized per record. (When in the actual input data of HDFS it's stored once per file. )
The extended problem: Spark 1.5 introduced the ability to register Avro schemas ahead of time using SparkConf. This solution is partial as some applications may not know exactly which schemas they're going to read ahead of time.
Adding a schema repository to the Serializer. Assuming the generic record has schemaId on them, it's possible to extract them dynamically from the read records and serialize only the schemaId.
Upon deserialization the schemaRepo will be queried once again.
The local caching mechanism will remain in tact - so in fact each Task will query the schema repo only once per schemaId.
The previous static registering of schemas will remain in place, as it is more efficient when the schemas are known ahead of time.
New flow of serializing generic record:
1) check the pre-registered schema list, if found the schema, serialize only its finger print
2) if not found, and schema repo has been set, attempt to extract the schemaId from record and check if repo contains the id. If so - serialize only the schema id
3) if no schema repo set or didn't find the schemaId in repo - compress and send the entire schema.