tl;dr version: I think the MultiPut RPC needs to be completely rewritten. The current code makes it totally impossible for an HBase client to do proper error handling. When an edit fails, the client has no clue as to what the problem was (certain error cases can be retried, others cannot e.g. when using a non-existent family) and the client doesn't even know which of the edits have been applied successfully. So the client often has to retry edits without knowing whether they've been applied or not, which leads to extra unwanted versions for the KeyValue that were successfully applied (for those who care about versions, this is essentially equivalent to data corruption). In addition, there's no way for a client to properly handle NotServingRegionException, the client has to unnecessarily invalidate cached locations of some regions and retry all edits.
Let's see why step by step what happens when a single edit in a multi-put fails.
- An HBase user calls any of the put methods on an HTable instance.
- Eventually, HTable#flushCommits is invoked to actually send the edits to the RegionServer(s).
- This takes us to HConnectionManager#processBatchOfPuts where all edits are sorted into one or more MultiPut. Each MultiPut is aggregating all the edits that are going to a particular RegionServer.
- A thread pool is used to send all the MultiPut in parallel to their respective RegionServer. Let's follow what happens for a single MultiPut.
- The MultiPut travels through the IPC code on the client and then through the network and then through the IPC code on the RegionServer.
- We're now in HRegionServer#multiPut where a new MultiPutResponse is created.
- Still in HRegionServer#multiPut. Since a MultiPut is essentially a map from region name to a list of Put for that region, there's a for loop that executes each list of Put for each region sequentially. Let's follow what happens for a single list of Put for a particular region.
- We're now in HRegionServer#put(byte, List<Put>). Each Put is associated with the row lock that was specified by the client (if any). Then the pairs of (Put, lock id) are handed to the right HRegion.
- Now we're in HRegion#put(Pair<Put, Integer>), which immediately takes us to HRegion#doMiniBatchPut.
- At this point, let's assume that we're doing just 2 edits. So the BatchOperationInProgress that doMiniBatchPut contains just 2 Put.
- The while loop in doMiniBatchPut that's going to execute each Put starts.
- The first Put fails because an exception is thrown when appending the edit to the WAL. Its batchOp.retCodes is marked as OperationStatusCode.FAILURE.
- Because there was an exception, we're back to HRegion#put(Pair<Put, Integer>) where the while loop will test that batchOp.isDone is false and do another iteration.
- doMiniBatchPut is called again and handles the remaining Put.
- The second Put succeeds normally, so its batchOp.retCodes is marked as OperationStatusCode.SUCCESS.
- doMiniBatchPut is done and returns to HRegion#put(Pair<Put, Integer>), which returns to HRegionServer#put(byte, List<Put>).
- At this point, HRegionServer#put(byte, List<Put>) does a for loop and extracts the index of the first Put that failed out of the OperationStatusCode. In our case, it'll return 0 since the first Put failed.
- This index in the list of Put of the first that failed (0 in this case) is returned to HRegionServer#multiPut, which records in the MultiPutResponse - the client knows that the first Put failed but has no idea about the other one.
So the client has no reliable way of knowing which Put failed (if any) past the first failure. All it knows is that for a particular region, they succeeded up to a particular Put, at which point there was a failure, and then the remaining may or may not have succeeded. Its best bet is to retry all the Put past the index of the first failure for this region. But this has an unintended consequence. The Put that were successful during the first run will be re-applied. This will unexpectedly create extra versions. Now I realize most people don't really care about versions, so they won't notice. But whoever relies on the versions for whatever reason will rightfully consider this to be data corruption.
As it is now, MultiPut makes proper error handling impossible. Since this RPC cannot guarantee any atomicity other than at the individual Put level, it should return to the client specific information about which Put failed in case of a failure, so that the client can do proper error handling.
This requires us to change the MultiPutResponse so that it can indicate which Put specifically failed. We could do this for instance by giving the index of the Put along with its OperationStatusCode. So in the scenario above, the MultiPutResponse would essentially return something like: "for that particular region, put #0 failed, put #1 succeeded". If we want to save a bit of space, we may want to omit the successes from the response and only mention the failures - so a response that doesn't mention any failure means that everything was successful. Not sure whether that's a good idea though.
Since doing this require an incompatible RPC change, I propose that we take the opportunity to rewrite the MultiPut RPC too. Right now it's very inefficient, it's just a hack on top of Put. When MultiPut is written to the wire, a lot of unnecessary duplicate data is sent out. The timestamp, the row key and the family are sent out to the wire N+1 times, where N is the number of edits for a particular row, instead of just once .
Alternatively, if we don't want to change the RPCs, we can fix this issue in a backward compatible way by making the while loop in HRegion#put(Pair<Put, Integer>) stop as soon as a failure is encountered.
Since the code in HRegionServer#multiPut invokes HRegionServer#put(byte, List<Put>) for each region for which there are edits, it's possible that edits for a first region all get successfully applied, then when moving on the 2nd region, a NotServingRegionException is thrown, which fails the RPC entirely and leaves the client with absolutely no clue as to which edits were successfully applied or not and which region caused the NotServingRegionException. Currently the code in HConnectionManager will simply retry everything when that happens, and it'll invalidate the cached location of all the regions involved in the multi-put (even though it could well be that just a single region has to be invalidated).
I'm not sure how to best solve this problem because the Hadoop RPC protocol doesn't give us an easy way of passing data around in an exception. The only two things I can think of are both really ugly:
- The message of the exception could contain the name of the region that caused the exception so that the client can parse the name out of the message and invalidate the right entry in its cache.
- As a special case, instead of throwing a NotServingRegionException, we'd change MultiPutResponse to also contain a list of regions no longer served by this region server, so the client could invalidate the right entries in its cache and retry just the edits that need to be retried.
I think 2. is better but it also requires an incompatible RPC change.
Simple test case to highlight the bug (against HBase trunk). The HBase client retries hopelessly until it reaches the maximum number of attempts before bailing out.
Excerpt of the log produced when running put: