From 4685b9bef6ebb921b0c8c4b9586390b9baff775b Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Thu, 19 Oct 2017 11:05:01 -0700 Subject: [PATCH] HBASE-18962 Support atomic (all or none) BatchOperations through batchMutate() --- .../apache/hadoop/hbase/regionserver/HRegion.java | 54 +++++++++++---- .../hadoop/hbase/regionserver/RSRpcServices.java | 32 +++++---- .../hadoop/hbase/regionserver/TestHRegion.java | 79 +++++++++++++++++++++- 3 files changed, 138 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 82d4bd214409ce18755c232d48d06574e8702c4a..b172cc251b864d7942f160c25cd61a8c86b185f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3034,6 +3034,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final ObservedExceptionsInBatch observedExceptions; //Durability of the batch (highest durability of all operations) protected Durability durability; + protected boolean atomic = false; public BatchOperation(final HRegion region, T[] operations) { this.operations = operations; @@ -3149,6 +3150,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getMutation(0).getClusterIds(); } + public boolean isAtomic() { + return atomic; + } + /** * Helper method that checks and prepares only one mutation. This can be used to implement * {@link #checkAndPrepare()} for entire Batch. @@ -3179,16 +3184,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } - } catch (NoSuchColumnFamilyException nscf) { + } catch (NoSuchColumnFamilyException nscfe) { final String msg = "No such column family in batch mutation. "; if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); + LOG.warn(msg + nscfe.getMessage()); } else { - LOG.warn(msg, nscf); + LOG.warn(msg, nscfe); observedExceptions.sawNoSuchFamily(); } retCodeDetails[index] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + OperationStatusCode.BAD_FAMILY, nscfe.getMessage()); + if (isAtomic()) { + throw nscfe; + } } catch (FailedSanityCheckException fsce) { final String msg = "Batch Mutation did not pass sanity check. "; if (observedExceptions.hasSeenFailedSanityCheck()) { @@ -3199,6 +3207,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } retCodeDetails[index] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + if (isAtomic()) { + throw fsce; + } } catch (WrongRegionException we) { final String msg = "Batch mutation had a row that does not belong to this region. "; if (observedExceptions.hasSeenWrongRegion()) { @@ -3209,6 +3220,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } retCodeDetails[index] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + if (isAtomic()) { + throw we; + } } } @@ -3232,15 +3246,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; try { - rowLock = region.getRowLockInternal(mutation.getRow(), true); + rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic()); } catch (TimeoutIOException e) { // We will retry when other exceptions, but we should stop if we timeout . throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); + if (isAtomic()) { + throw ioe; + } } if (rowLock == null) { // We failed to grab another lock + if (isAtomic()) { + throw new IOException("Can't apply all operations atomically!"); + } break; // Stop acquiring more rows for this batch } else { acquiredRowLocks.add(rowLock); @@ -3361,12 +3381,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most * of the logic is same. */ - private static class MutationBatchOperation extends BatchOperation { + static class MutationBatchOperation extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup, - long nonce) { + public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic, + long nonceGroup, long nonce) { super(region, operations); + this.atomic = atomic; this.nonceGroup = nonceGroup; this.nonce = nonce; } @@ -3604,11 +3625,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi retCodeDetails[index] = OperationStatus.SUCCESS; } } else { + String msg = "Put/Delete mutations only supported in a batch"; // In case of passing Append mutations along with the Puts and Deletes in batchMutate // mark the operation return code as failure so that it will not be considered in // the doMiniBatchMutation - retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, - "Put/Delete mutations only supported in batchMutate() now"); + retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg); + + if (isAtomic()) { + throw new IOException(msg); + } } } @@ -3664,7 +3689,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most * of the logic is same. */ - private static class ReplayBatchOperation extends BatchOperation { + static class ReplayBatchOperation extends BatchOperation { private long origLogSeqNum = 0; public ReplayBatchOperation(final HRegion region, MutationReplay[] operations, long origLogSeqNum) { @@ -3777,11 +3802,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) throws IOException { + return batchMutate(mutations, false, nonceGroup, nonce); + } + + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, + long nonce) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce)); } public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 517ac3f4a9d3eb7ac49715517c5f186c382a1ee7..9600dfd05bc125cdc999ef02a0354f7eb0526c3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -601,9 +601,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Mutate a list of rows atomically. - * * @param cellScanner if non-null, the mutation data -- the Cell content. - * @param comparator @throws IOException */ private boolean checkAndRowMutate(final HRegion region, final List actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, @@ -754,10 +752,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * Run through the regionMutation rm and per Mutation, do the work, and then when * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. - * @param region - * @param actions - * @param cellScanner - * @param builder * @param cellsToReturn Could be null. May be allocated in this method. This is what this * method returns as a 'result'. * @param closeCallBack the callback to be used with multigets @@ -861,7 +855,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); mutations.clear(); } switch (type) { @@ -922,7 +916,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + try { + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); + } catch (IOException ioe) { + rpcServer.getMetrics().exception(ioe); + NameBytesPair pair = ResponseConverter.buildException(ioe); + resultOrExceptionBuilder.setException(pair); + context.incrementResponseExceptionSize(pair.getSerializedSize()); + builder.addResultOrException(resultOrExceptionBuilder.build()); + } } return cellsToReturn; } @@ -952,7 +954,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) + throws IOException { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -964,7 +967,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * is the mutation belong to. We can't sort ClientProtos.Action array, since they * are bonded to cellscanners. */ - Map mutationActionMap = new HashMap(); + Map mutationActionMap = new HashMap<>(); int i = 0; for (ClientProtos.Action action: mutations) { MutationProto m = action.getMutation(); @@ -992,7 +995,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // sort to improve lock efficiency Arrays.sort(mArray); - OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, + OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; @@ -1022,6 +1025,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + if (atomic) throw ie; for (int i = 0; i < mutations.size(); i++) { builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } @@ -1127,7 +1131,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Exposed for testing - static interface LogDelegate { + interface LogDelegate { void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); } @@ -3243,7 +3247,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); - }; + } try { checkScanNextCallSeq(request, rsh); } catch (OutOfOrderScannerNextException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index fec715108e82bd322fa96128221bde010e9041ed..cc34889f98437f842caf71c565c2aa5706802839 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; @@ -167,6 +168,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.mockito.ArgumentCaptor; @@ -203,6 +205,7 @@ public class TestHRegion { @ClassRule public static final TestRule timeout = CategoryBasedTimeout.forClass(TestHRegion.class); + @Rule public final ExpectedException thrown = ExpectedException.none(); private static final String COLUMN_FAMILY = "MyCF"; private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); @@ -1653,6 +1656,80 @@ public class TestHRegion { } } + @Test + public void testAtomicBatchPut() throws IOException { + byte[] cf = Bytes.toBytes(COLUMN_FAMILY); + byte[] qual = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("val"); + this.region = initHRegion(tableName, method, CONF, cf); + MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); + try { + long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source); + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source); + + LOG.info("First a batch put with all valid puts"); + final Put[] puts = new Put[10]; + for (int i = 0; i < 10; i++) { + puts[i] = new Put(Bytes.toBytes("row_" + i)); + puts[i].addColumn(cf, qual, val); + } + + // 1. Straight forward case, should succeed + MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true, + HConstants.NO_NONCE, HConstants.NO_NONCE); + OperationStatus[] codes = this.region.batchMutate(batchOp); + assertEquals(10, codes.length); + for (int i = 0; i < 10; i++) { + assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); + } + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); + + // 2. Failed to get lock + RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3)); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF); + final AtomicReference retFromThread = new AtomicReference<>(); + final CountDownLatch finishedPuts = new CountDownLatch(1); + final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true, + HConstants + .NO_NONCE, + HConstants.NO_NONCE); + TestThread putter = new TestThread(ctx) { + @Override + public void doWork() throws IOException { + try { + region.batchMutate(finalBatchOp); + } catch (IOException ioe) { + LOG.error("test failed!", ioe); + retFromThread.set(ioe); + } + finishedPuts.countDown(); + } + }; + LOG.info("...starting put thread while holding locks"); + ctx.addThread(putter); + ctx.startThreads(); + LOG.info("...waiting for batch puts while holding locks"); + try { + finishedPuts.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted!", e); + } + assertNotNull(retFromThread.get()); + metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source); + + // 3. Exception thrown in validation + LOG.info("Next a batch put with one invalid family"); + puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val); + batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE, + HConstants.NO_NONCE); + thrown.expect(NoSuchColumnFamilyException.class); + this.region.batchMutate(batchOp); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + } + private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount) throws InterruptedException { long startWait = System.currentTimeMillis(); -- 2.10.1 (Apple Git-78)