Description
When a tablet server goes down while running a query on Spark, the connection becomes unusable due to the cached tablet locations that have become stale.
Steps to reproduce
Start spark-shell with kudu-spark2 1.13.0
The problem is not reproducible with kudu-spark2 1.12.0 or below, because it was introduced in KUDU-1802 .
Run a scan query
import org.apache.kudu.spark.kudu._ val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu dummy.createOrReplaceTempView("dummy") spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
Kill a tablet server
Kill one of the tablet servers that are serving data for the query. The query should fail immediately.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.RuntimeException: org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may have expired)
Re-run the query
spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
Doesn't work, fails with an NPE.
Caused by: java.lang.RuntimeException: java.lang.NullPointerException at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697) at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 86 more Caused by: java.lang.NullPointerException at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674) ... 117 more
Re-creating the DataFrame doesn't help:
val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu dummy.createOrReplaceTempView("dummy") // Still fails with an NPE spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
Cause
KuduScanToken.java:666
// Build the list of replica metadata. List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>(); for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) { Integer serverIndex = serverIndexMap.get( new HostAndPort(replica.getRpcHost(), replica.getRpcPort())); Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder = Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder() .setRole(replica.getRoleAsEnum()) .setTsIdx(serverIndex); if (replica.getDimensionLabel() != null) { tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel()); } replicas.add(tabletMetadataBuilder.build()); }
serverIndex can be null here, because we're using the cached tablet locations that are stale now (TableLocationsCache.Entry).
Workarounds
- Restart Spark shell
- Wait until the connection becomes idle and cleaned up
DEBUG Connection: [peer master-***] handling channelInactive DEBUG Connection: [peer master-***] cleaning up while in state READY due to: connection closed
- Use kudu-spark2 1.12.0 or below