From 3404d25c849a71842e2a625d574dd6829250d0be Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 1 Feb 2018 21:47:41 +0800 Subject: [PATCH] HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner --- .../hadoop/hbase/regionserver/RSRpcServices.java | 118 +++++++------- .../hbase/client/TestPartialMalformedCells.java | 171 +++++++++++++++++++++ 2 files changed, 234 insertions(+), 55 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestPartialMalformedCells.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e540464..c3b8ec4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -559,23 +559,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Mutate a list of rows atomically. * @param cellScanner if non-null, the mutation data -- the Cell content. */ - private void mutateRows(final HRegion region, final OperationQuota quota, - final List actions, final CellScanner cellScanner, - RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) - throws IOException { - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + - action.getGet()); - } - } - doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); - } - - /** - * Mutate a list of rows atomically. - * @param cellScanner if non-null, the mutation data -- the Cell content. - */ private boolean checkAndRowMutate(final HRegion region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, @@ -584,42 +567,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + int countOfCompleteMutation = 0; + try { + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); - for (ClientProtos.Action action: actions) { - if (action.hasGet()) { - throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + int i = 0; + for (ClientProtos.Action action: actions) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); + } + MutationType type = action.getMutation().getMutateType(); + if (rm == null) { + rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); + } + switch (type) { + case PUT: + Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, put); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + rm.add(put); + break; + case DELETE: + Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); + rm.add(del); + break; + default: + throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } + // To unify the response format with doNonAtomicRegionMutation and read through client's + // AsyncProcess we have to add an empty result instance per operation + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i++); + builder.addResultOrException(resultOrExceptionOrBuilder.build()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - rm.add(put); - break; - case DELETE: - Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); - rm.add(del); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + } catch (IOException e) { + // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner + // even if the malformed cells are not skipped. + for (int i = countOfCompleteMutation; i < actions.size(); ++i) { + skipCellsForMutation(actions.get(i), cellScanner); } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); + throw e; } - return region.checkAndRowMutate(row, family, qualifier, op, - comparator, rm, Boolean.TRUE); + return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm, true); } /** @@ -895,6 +888,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); } catch (IOException ioe) { + // TODO doBatchOp has handled the IOE for all non-atomic operations + // Catching IOE here may confuse readers in the future rpcServer.getMetrics().exception(ioe); NameBytesPair pair = ResponseConverter.buildException(ioe); resultOrExceptionBuilder.setException(pair); @@ -946,6 +941,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { + if (action.hasGet()) { + throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + + action.getGet()); + } MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { @@ -968,8 +967,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // HBASE-17924 - // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly - // called from mutateRows()), order is preserved as its expected from the client + // Sort to improve lock efficiency for non-atomic batch of operations. If atomic + // order is preserved as its expected from the client if (!atomic) { Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); } @@ -1004,12 +1003,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + int processedMutationIndex = 0; + for (Action mutation : mutations) { + // The non-null mArray[i] means the cell scanner has been read. + if (mArray[processedMutationIndex++] == null) { + skipCellsForMutation(mutation, cells); + } + // The atomic ops use the global exception although I feel it is ok to add the exception + // to each action. + if (!atomic) { + builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); + } + } if (atomic) { throw ie; } - for (Action mutation : mutations) { - builder.addResultOrException(getResultOrException(ie, mutation.getIndex())); - } } if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); @@ -2568,8 +2576,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, quota, regionAction.getActionList(), cellScanner, - regionActionResultBuilder, spaceQuotaEnforcement); + doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), + cellScanner, spaceQuotaEnforcement, true); processed = Boolean.TRUE; } } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestPartialMalformedCells.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestPartialMalformedCells.java new file mode 100644 index 0000000..d3d74e3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestPartialMalformedCells.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO: add the test for batch of checkAndMutate after HBASE-8458 is resolved + */ +@Category({MediumTests.class, ClientTests.class}) +public class TestPartialMalformedCells { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPartialMalformedCells.class); + + @Rule + public TestName name = new TestName(); + + private static final Logger LOG = LoggerFactory.getLogger(TestPartialMalformedCells.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // disable the retry + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * This test depends on how regionserver process the batch ops. + * 1) group the put/delete until meeting the increment + * 2) process the batch of put/delete + * 3) process the increment + * see RSRpcServices#doNonAtomicRegionMutation + */ + @Test + public void testNonAtomicOperations() throws InterruptedException, IOException { + Increment inc = new Increment(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, 100); + final long cellSize = + PrivateCellUtil.estimatedSerializedSizeOf(inc.get(FAMILY, null).get(0)) + 10; + final TableName tableName = TableName.valueOf(name.getMethodName()); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(cellSize)).build(); + TEST_UTIL.getConnection().getAdmin().createTable(desc); + List batches = new ArrayList<>(); + // the first and second puts will be group by regionserver + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[(int) cellSize])); + batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[(int) cellSize])); + // this Increment should succeed + batches.add(inc); + // this put should succeed + batches.add(new Put(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, new byte[1])); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + assertEquals(2, e.getNumExceptions()); + for (int i = 0; i != e.getNumExceptions(); ++i) { + assertNotNull(e.getCause(i)); + assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass()); + assertEquals("fail", Bytes.toString(e.getRow(i).getRow())); + } + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertTrue(objs[1] instanceof IOException); + assertEquals(Result.class, objs[2].getClass()); + assertEquals(Result.class, objs[3].getClass()); + } + } + + @Test + public void testRowMutation() throws InterruptedException, IOException { + Put put = new Put(Bytes.toBytes("GOOD")).addColumn(FAMILY, null, new byte[1]); + final long cellSize = + PrivateCellUtil.estimatedSerializedSizeOf(put.get(FAMILY, null).get(0)) + 10; + final TableName tableName = TableName.valueOf(name.getMethodName()); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(cellSize)).build(); + TEST_UTIL.getConnection().getAdmin().createTable(desc); + List batches = new ArrayList<>(); + RowMutations mutations = new RowMutations(Bytes.toBytes("fail")); + // the first and second puts will be group by regionserver + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[(int) cellSize])); + mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[(int) cellSize])); + batches.add(mutations); + // this bm should succeed + mutations = new RowMutations(Bytes.toBytes("GOOD")); + mutations.add(put); + mutations.add(put); + batches.add(mutations); + Object[] objs = new Object[batches.size()]; + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.batch(batches, objs); + fail("Where is the exception? We put the malformed cells!!!"); + } catch (RetriesExhaustedWithDetailsException e) { + // TODO: check the content of RetriesExhaustedWithDetailsException if HBASE-19900 is resolved + } finally { + assertObjects(objs, batches.size()); + assertTrue(objs[0] instanceof IOException); + assertEquals(Result.class, objs[1].getClass()); + } + } + + private static void assertObjects(Object[] objs, int expectedSize) { + int count = 0; + for (Object obj : objs) { + assertNotNull(obj); + ++count; + } + assertEquals(expectedSize, count); + } +} -- 2.7.4