KuduRpc implements a default `partitionKey` method:
Subclasses override this method to indicate the start key of the tablet they should be sent to, and the Java client uses this, in part, to select which tserver to send retries to. The default implementation returns null, which is a special value that is only valid as a partition key for the master table. The keep alive RPC does not override this method, so it uses the default implementation.
When KuduScanner#keepAlive is called, the initial keep alive RPC does not use partitionKey, so it works OK. However, retries go through standard retry logic, which calls delayedSendRpcToTablet, which calls sendRpcToTablet after a delay and on a timer thread. In sendRpcToTablet we call getTableLocationEntry with a null partitionkey, because the RPC never set one. That results in cache.get(partitionKey) throwing an exception (usually) because there are multiple entries in the cache for the table, but the null partition key makes the lookup act like it is looking up the master table, so the invariant check for the master table Preconditions.checkState(entries.size() <= 1) fails.
As a workaround, users can set keepAlivePeriodMs on KuduReadOptions to something very large like Long.MAX_VALUE; or, if using the default source, pass the kudu.keepAlivePeriodMs spark config with a very large value. Note that there also has to be something causing keep alive requests to fail and retry, and this is relatively rare (in my experience).
To fix, we'll need to make sure that keep alive RPCs act like scan RPCs, and are always retried on the same server as the one currently open for scanning (or no op if there is no such server).
Also, it's not wise to keep the default implementation in KuduRpc-- subclasses ought to have to make an explicit choice about the default partition key, which is a proxy for which tablet they will go to.