  SPARK-33093

Why do my Spark 3 jobs fail to use external shuffle service on YARN?



      We are running a Spark-on-YARN setup, where each client uploads their own Spark JARs for their job, to run in YARN executors. YARN exposes a shuffle service on every NodeManager's 7337 port, and clients enable use of that.

      This has worked for a while, with clients using Spark 2 JARs, but we are seeing issues when clients attempt to use Spark 3 JAR. When shuffling is either disabled, or enabled but no use of the shuffle service is made, things seems to continue working in Spark 3.

      When a Spark 3 job attempts to use the external service, we get a stack-trace that looks like this:

      java.lang.IllegalArgumentException: Unknown message type: 10
      	at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:67)
      	at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:71)
      	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:154)
      	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
      	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
      	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
      	at ...

      Message type 10 was introduced as of SPARK-27651, released in Spark 3.0.0; this error hints at an older version of BlockTransferMessage$Decoder.fromByteBuffer being used.

      ExternalShuffleBlockHandler was renamed to ExternalBlockHandler as of SPARK-28593, also released in Spark 3.0.0; this stack-trace hints at an older JAR being loaded.

      Our current Hadoop setup (Cloudera CDH parcels) is very likely to be polluting the class-path with older JARs. Trying to figure out where the old JARs come from, I added -verbose:class to the executor options, to log all class loading.

      This is where things get interesting: there is no mention of the old ExternalShuffleBlockHandler class anywhere, and BlockTransferMessage$Decoder is reported as loaded from the Spark 3 JARs:

      grep -E 'org.apache.spark.network.shuffle.protocol.BlockTransferMessage|org.apache.spark.network.shuffle.ExternalShuffleBlockHandler|org.apache.spark.network.server.TransportRequestHandler|org.apache.spark.network.server.TransportChannelHandler|org.apache.spark.network.shuffle.ExternalBlockHandler' example_shuffle_stdout.txt
      [Loaded org.apache.spark.network.server.TransportRequestHandler from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.server.TransportChannelHandler from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Type from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder from file:/hadoop/1/yarn/nm/filecache/0/2170571/spark-network-shuffle_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.server.TransportRequestHandler$1 from file:/hadoop/2/yarn/nm/filecache/0/2170513/spark-network-common_2.12-3.0.0.jar]
      [Loaded org.apache.spark.network.server.TransportRequestHandler$$Lambda$666/376989599 from org.apache.spark.network.server.TransportRequestHandler]

      I do not know how this is possible:

      • is the executor reporting a stack-trace that comes from another process rather than itself?
      • are old classes loaded without being reported by -verbose:class?

      I'm not sure how to investigate this further, as I failed to locate precisely how the instance of RpcHandler is injected into the TransportRequestHandler for my executors.

      I did try setting spark.executor.userClassPathFirst to true but that made no difference: I could confirm this was enabled in the Spark UI's Environment tab, and still got the same error. I also tried setting spark.jars and spark.yarn.jars to explicitly point to the user's Spark JARs, but that did not work: the value for those two keys was still empty in the Spark UI's Environment tab.

      What am I missing here?
      What should I try next?




