From 267b98458b3faec4324bbd51ed095567827b1bd0 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 1 Feb 2018 21:47:41 +0800 Subject: [PATCH] HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner --- .../hadoop/hbase/regionserver/RSRpcServices.java | 118 +++++++++++---------- 1 file changed, 63 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 48eac79ceb..554ea9aaeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -559,23 +559,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Mutate a list of rows atomically. * @param cellScanner if non-null, the mutation data -- the Cell content. */ - private void mutateRows(final HRegion region, final OperationQuota quota, - final List actions, final CellScanner cellScanner, - RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) - throws IOException { - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + - action.getGet()); - } - } - doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); - } - - /** - * Mutate a list of rows atomically. - * @param cellScanner if non-null, the mutation data -- the Cell content. - */ private boolean checkAndRowMutate(final HRegion region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, @@ -584,42 +567,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + int countOfCompleteMutation = 0; + try { + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + int i = 0; + for (ClientProtos.Action action: actions) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); + } + MutationType type = action.getMutation().getMutateType(); + if (rm == null) { + rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); + } + switch (type) { + case PUT: + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, put); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + rm.add(put); + break; + case DELETE: + Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); + rm.add(del); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } + // To unify the response format with doNonAtomicRegionMutation and read through client's + // AsyncProcess we have to add an empty result instance per operation + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException(resultOrExceptionOrBuilder.build()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - rm.add(put); - break; - case DELETE: - Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); - rm.add(del); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } catch (IOException e) { + // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner + // even if the malformed cells are not skipped. + for (int i = countOfCompleteMutation; i < actions.size(); ++i) { + skipCellsForMutation(actions.get(i), cellScanner); } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); + throw e; } - return region.checkAndRowMutate(row, family, qualifier, op, - comparator, rm); + return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm); } /** @@ -895,6 +888,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); } catch (IOException ioe) { + // TODO doBatchOp has handled the IOE for all non-atomic operations + // Catching IOE here may confuse readers in the future rpcServer.getMetrics().exception(ioe); NameBytesPair pair = ResponseConverter.buildException(ioe); resultOrExceptionBuilder.setException(pair); @@ -946,6 +941,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + action.getGet()); + } MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { @@ -968,8 +967,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // HBASE-17924 - // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly - // called from mutateRows()), order is preserved as its expected from the client + // Sort to improve lock efficiency for non-atomic batch of operations. If atomic + // order is preserved as its expected from the client if (!atomic) { Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); } @@ -1004,12 +1003,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + int processedMutationIndex = 0; + for (Action mutation : mutations) { + // The non-null mArray[i] means the cell scanner has been read. + if (mArray[processedMutationIndex++] == null) { + skipCellsForMutation(mutation, cells); + } + // The atomic ops use the global exception although I feel it is ok to add the exception + // to each action. + if (!atomic) { + builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); + } + } if (atomic) { throw ie; } - for (Action mutation : mutations) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } } if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); @@ -2568,8 +2576,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, quota, regionAction.getActionList(), cellScanner, - regionActionResultBuilder, spaceQuotaEnforcement); + doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement, true); processed = Boolean.TRUE; } } catch (IOException e) { -- 2.11.0 (Apple Git-81)