Here is a first patch that shows the direction I'm heading with this. A few things to note about this patch that are worth discussing (sorry it's a bit long winded but want to be sure we're all on the same page about this solution):
1) I found a handy class named SocketProxy in the ActiveMQ project and "borrowed" it here to help simulate network partitions, such as between the leader and replica, while keeping the ZooKeeper connection alive. I'm aware of the IpTables class (from SOLR-5482) but the advantage of SocketProxy is it is just native Java so runs w/o sudo and on any platforms that don't have iptables. This of course requires a little trickery when setting up your test as you need to insert the proxy in-front of the Jetty nodes, which is being accomplished by setting a proxyPort on the JettySolrRunner, see HttpPartitionTest.createJetty. I'm guessing we can build this into a test base class if we like this approach and think it will be useful for other tests.
2) I ended up going with Mark's idea #1 except I don't see where / why we need to worry about the replica publishing it's own state? In other words, what really matters, is that the leader cannot send a request to the replica, so to me, the leader's view of the replica is what matters. In my patch, the leader will publish the state of the replica as "down" when it encounters a communication error when trying to send a request to a replica. See ZkController.ensureReplicaInLeaderInitiatedRecovery() method, which is called from the DistributedUpdateProcessor.doFinish() method.
So I've thought about this in some detail and I think it will work itself out without us having to coordinate state changes. So let's just say the leader set the state to "down" and for some weird reason (which I can't really see how it would happen), the replica reset it's state to "active". This would make the replica a candidate for receiving requests again, which would just lead to another error, leading to the leader re-setting the state to "down". In a nutshell, if the leader can't talk to the replica over http, it's state gets set to "down".
One idea I did have for this is to have the leader pass the ClusterState.zkClusterStateVersion along in every request, thus allowing the replica to compare the version it is working with and if they are different, then have the replica force a state update from ZK and act accordingly. It shouldn't be too bad to implement this if we think it will be useful? Version would be passed along like the distrib.update param is today.
3) Even if more coordination is needed for #2 ^ at some point the replica gets marked as being in the down state. This ensures the leader stops trying to send requests to that replica (via the DOWN filter in the call to getReplicaProps to determine the set of Nodes to forward requests to). The leader also needs to determine if it should send the CoreAdminAction.REQUESTRECOVERY command to the downed replica's core. This occurs over HTTP, which I think is correct because if the leader can't send the recover command to the replica, then sending docs is futile as well. What I've done here is to build upon the existing code in DistributedUpdateProcessor's doFinish method to attempt sending that command every 5 secs for up to 10 minutes so long as the node is still listed as a /live_nodes in ZK. If that changes, I stop trying to hit that node from the leader since a node that is no longer live will do full recovery when it comes back.
I like this leader-initiated recovery approach because the leader's view of the replica is what matters, so I felt creating a self-initiating recovery process by which the replica realizes its state got changed by the leader doesn't do much if the HTTP connection between the leader and replica is still down.
4) Of course, there's no guarantee that connectivity will be restored within 10 minutes, so the re-try loop described in #3 ^ will timeout and the leader will stop trying to tell the replica to recover. At this point, the replica should be marked down so at least the leader is no longer trying to send requests to it, so I think the shard is in a safe state wrt consistency but after the 10 minutes, there's nothing to tell the replica to recover from the down state. Do we want the leader to just try forever? Seems like not ... Maybe this is where an ops alert could be inserted to have someone go investigate why the partition is longer than 10 minutes. Appreciate any advice on how to handle this better.
5) You'll notice that I'm using a HashSet containing replicaUrl's in ZkController to keep track of replicas that are in the "leader-initiated" recovery process, see: ZkController.replicasInLeaderInitiatedRecoveryHandling. This approach is needed because there are many DistributedUpdateProcessor's that may be receiving a flood of errors concurrently when connectivity to a replica is lost. I didn't want the leader trying to set the state to DOWN more than once when it sees a bunch of errors or to have more than one thread per replica trying to send the recovery command. There might be a better location for this code than the ZkController (maybe ZkStateReader).
As for testing, I think the unit test (HttpPartitionTest) is pretty close to the scenario we're trying to capture in this ticket.
Specifically, it tests the following process:
a. setup proxies in-front of all Jetty servers (3 in this test) by overriding the createJetty method.
b. create a collection with 1 shard and rf=2
c. send doc 1 to leader, which gets forwarded to replica successfully
d. partition occurs (using SocketProxy); however the ZK connection between the replica remains in tact (which is the crux of this issue); the leader remains the same throughout this test
e. send doc 2 to leader
f. leader send doc 2 to replica fails due to comm error, asynchronous call to doFinish starts the leader-initiated recovery process
g. leader marks replica as being down, which means it will stop trying to send requests to the replica until the situation improves as the ZkStateReader.getReplicaProps() filters out "downed" nodes. At this point, the leader is also trying to tell the replica to recover from a background thread.
h. partition is restored
i. send doc 3
j. replica recovery succeeds asynchronously, test waits until it sees the replica in the "active" state
k. test verifies both the leader and the replica have docs 1,2,3 using requests to the /get handler
Next, the test performs the same basic process but for 1000 docs while dropping and restoring the connectivity between the leader and replica every 100 docs.
I should mention that without the code in this patch, the replica will most certainly be out of sync and not know it, which of course is a no-no for a CP system (btw: I used real test-driven development methodology here by writing the test first and then implementing until the test passes).
The one minor concern I have with this test right now is the Thread.sleep(2000) before restoring connectivity with the proxy. I had to introduce this because the test was progressing too fast for the recovery process to kick-in, thus leading to test failures. I think this is OK to wait a little bit because that is more reflective of a running cluster and things do take a little time to propagate around the cluster. Just wanted to draw attention to this so you're clear it was intentional to give things time to work.