Description
kudu-spark should have one KuduContext per task, which is sent serialized from the driver with an authentication token. The KuduContext either retrieves a Kudu client from a JVM-scoped cache, or creates one and puts it in the cache, and finally imports its authentication token into the client.
Under default configuration in an un-Kerberized cluster, the client uses the authentication token to connect to the cluster. However, if -rpc_encryption=disabled, then the client will not use the authentication token. This causes the master to issue an authentication token to the client, and the new token replaces the old token in the client.
While there's one KuduContext per task, multiple tasks may run on the same executor. If this occurs, each KuduContext tries to import its authentication token into the client. If the client has already received a token from the master because encryption is disabled, then it's possible that the KuduContext's token and the master-issued token are for different users, since the KuduContext's token was issued on the driver to the driver's Unix user and the master-issued token is issued to the executor's user.
An example of the exception that occurred when running spark2-shell as root:
18/01/11 12:14:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, kudu-tserver-01, executor 1): java.lang.IllegalArgumentException: cannot import authentication data from a different user: old='yarn', new='root' at org.apache.kudu.client.SecurityContext.checkUserMatches(SecurityContext.java:128) at org.apache.kudu.client.SecurityContext.importAuthenticationCredentials(SecurityContext.java:138) at org.apache.kudu.client.AsyncKuduClient.importAuthenticationCredentials(AsyncKuduClient.java:677) at org.apache.kudu.spark.kudu.KuduContext.asyncClient$lzycompute(KuduContext.scala:103) at org.apache.kudu.spark.kudu.KuduContext.asyncClient(KuduContext.scala:100) at org.apache.kudu.spark.kudu.KuduContext.syncClient$lzycompute(KuduContext.scala:98) at org.apache.kudu.spark.kudu.KuduContext.syncClient(KuduContext.scala:98) at org.apache.kudu.spark.kudu.KuduRDD.compute(KuduRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)