From d964039eafd1d22c208241cd84d9ec75acbb1159 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 9 Feb 2018 02:09:27 +0800 Subject: [PATCH] HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner --- .../hadoop/hbase/regionserver/RSRpcServices.java | 121 +++++++++++---------- .../hbase/client/TestMalformedCellFromClient.java | 80 ++++++++++++++ 2 files changed, 145 insertions(+), 56 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 44934a6e82..9b0d132b47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -556,23 +556,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - /** - * Mutate a list of rows atomically. - * @param cellScanner if non-null, the mutation data -- the Cell content. - */ - private void mutateRows(final HRegion region, final OperationQuota quota, - final List actions, final CellScanner cellScanner, - RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) - throws IOException { - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + - action.getGet()); - } - } - doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); - } - /** * Mutate a list of rows atomically. * @param cellScanner if non-null, the mutation data -- the Cell content. @@ -584,43 +567,54 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!region.getRegionInfo().isMetaRegion()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + int countOfCompleteMutation = 0; + try { + RowMutations rm = null; + int i = 0; + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + for (ClientProtos.Action action: actions) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); + } + MutationType type = action.getMutation().getMutateType(); + if (rm == null) { + rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); + } + switch (type) { + case PUT: + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, put); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + rm.add(put); + break; + case DELETE: + Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); + rm.add(del); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } + // To unify the response format with doNonAtomicRegionMutation and read through client's + // AsyncProcess we have to add an empty result instance per operation + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException( + resultOrExceptionOrBuilder.build()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - rm.add(put); - break; - case DELETE: - Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); - rm.add(del); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm); + } catch (IOException e) { + // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner + // even if the malformed cells are not skipped. + for (int i = countOfCompleteMutation; i < actions.size(); ++i) { + skipCellsForMutation(actions.get(i), cellScanner); } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); + throw e; } - return region.checkAndRowMutate(row, family, qualifier, op, - comparator, rm); } /** @@ -896,6 +890,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); } catch (IOException ioe) { + // TODO doBatchOp has handled the IOE for all non-atomic operations + // Catching IOE here may confuse readers in the future rpcServer.getMetrics().exception(ioe); NameBytesPair pair = ResponseConverter.buildException(ioe); resultOrExceptionBuilder.setException(pair); @@ -947,6 +943,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + action.getGet()); + } MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { @@ -969,8 +969,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // HBASE-17924 - // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly - // called from mutateRows()), order is preserved as its expected from the client + // Sort to improve lock efficiency for non-atomic batch of operations. If atomic + // order is preserved as its expected from the client if (!atomic) { Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); } @@ -1005,12 +1005,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + int processedMutationIndex = 0; + for (Action mutation : mutations) { + // The non-null mArray[i] means the cell scanner has been read. + if (mArray[processedMutationIndex++] == null) { + skipCellsForMutation(mutation, cells); + } + // The atomic ops use the global exception although I feel it is ok to add the exception + // to each action. + if (!atomic) { + builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); + } + } if (atomic) { throw ie; } - for (Action mutation : mutations) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } } if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); @@ -2573,8 +2582,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, quota, regionAction.getActionList(), cellScanner, - regionActionResultBuilder, spaceQuotaEnforcement); + doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement, true); processed = Boolean.TRUE; } } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index e44a2e91d9..32d5c830bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -170,4 +171,83 @@ public class TestMalformedCellFromClient { assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10])); } } + + /** + * This test depends on how regionserver process the batch ops. + * 1) group the put/delete until meeting the increment + * 2) process the batch of put/delete + * 3) process the increment + * see RSRpcServices#doNonAtomicRegionMutation + */ + @Test + public void testNonAtomicOperations() throws InterruptedException, IOException { + Increment inc = new Increment(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, 100); + List batches = new ArrayList<>(); + // the first and second puts will be group by regionserver + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + // this Increment should succeed + batches.add(inc); + // this put should succeed + batches.add(new Put(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, new byte[1])); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + assertEquals(2, e.getNumExceptions()); + for (int i = 0; i != e.getNumExceptions(); ++i) { + assertNotNull(e.getCause(i)); + assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass()); + assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); + } + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertTrue(objs[1] instanceof IOException); + assertEquals(Result.class, objs[2].getClass()); + assertEquals(Result.class, objs[3].getClass()); + } + } + + @Test + public void testRowMutations() throws InterruptedException, IOException { + Put put = new Put(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, new byte[1]); + List batches = new ArrayList<>(); + RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); + // the first and second puts will be group by regionserver + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE])); + batches.add(mutations); + // this bm should succeed + mutations = new RowMutations(Bytes.toBytes("GOOD")); + mutations.add(put); + mutations.add(put); + batches.add(mutations); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + assertEquals(1, e.getNumExceptions()); + for (int i = 0; i != e.getNumExceptions(); ++i) { + assertNotNull(e.getCause(i)); + assertTrue(e.getCause(i) instanceof IOException); + assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); + } + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertEquals(Result.class, objs[1].getClass()); + } + } + + private static void assertObjects(Object[] objs, int expectedSize) { + int count = 0; + for (Object obj : objs) { + assertNotNull(obj); + ++count; + } + assertEquals(expectedSize, count); + } } -- 2.16.1.windows.3