Description
We are running a Spark-on-YARN setup, where each client uploads their own Spark JARs for their job, to run in YARN executors. YARN exposes a shuffle service on every NodeManager's 7337 port, and clients enable use of that.
This has worked for a while, with clients using Spark 2 JARs, but we are seeing issues when clients attempt to use Spark 3 JAR. When shuffling is either disabled, or enabled but no use of the shuffle service is made, things seems to continue working in Spark 3.
When a Spark 3 job attempts to use the external service, we get a stack-trace that looks like this:
java.lang.IllegalArgumentException: Unknown message type: 10 at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:71) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at ...
Message type 10 was introduced as of SPARK-27651, released in Spark 3.0.0; this error hints at an older version of BlockTransferMessage$Decoder.fromByteBuffer being used.
ExternalShuffleBlockHandler was renamed to ExternalBlockHandler as of SPARK-28593, also released in Spark 3.0.0; this stack-trace hints at an older JAR being loaded.
Our current Hadoop setup (Cloudera CDH parcels) is very likely to be polluting the class-path with older JARs. Trying to figure out where the old JARs come from, I added -verbose:class to the executor options, to log all class loading.
This is where things get interesting: there is no mention of the old ExternalShuffleBlockHandler class anywhere, and BlockTransferMessage$Decoder is reported as loaded from the Spark 3 JARs:
grep -E 'org.apache.spark.network.shuffle.protocol.BlockTransferMessage|org.apache.spark.network.shuffle.ExternalShuffleBlockHandler|org.apache.spark.network.server.TransportRequestHandler|org.apache.spark.network.server.TransportChannelHandler|org.apache.spark.network.shuffle.ExternalBlockHandler' example_shuffle_stdout.txt [Loaded org.apache.spark.network.server.TransportRequestHandler from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar] [Loaded org.apache.spark.network.server.TransportChannelHandler from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar] [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar] [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Type from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar] [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar] [Loaded org.apache.spark.network.server.TransportRequestHandler$1 from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar] [Loaded org.apache.spark.network.server.TransportRequestHandler$$Lambda$666/376989599 from org.apache.spark.network.server.TransportRequestHandler]
I do not know how this is possible:
- is the executor reporting a stack-trace that comes from another process rather than itself?
- are old classes loaded without being reported by -verbose:class?
I'm not sure how to investigate this further, as I failed to locate precisely how the instance of RpcHandler is injected into the TransportRequestHandler for my executors.
I did try setting spark.executor.userClassPathFirst to true but that made no difference: I could confirm this was enabled in the Spark UI's Environment tab, and still got the same error. I also tried setting spark.jars and spark.yarn.jars to explicitly point to the user's Spark JARs, but that did not work: the value for those two keys was still empty in the Spark UI's Environment tab.
What am I missing here?
What should I try next?