From cc32d8e97fd1381f7efcf551de1ac64edcd14631 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 26 Dec 2018 17:42:02 +0800 Subject: [PATCH] HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL --- .../hadoop/hbase/coprocessor/RegionObserver.java | 45 ++++++++++++++++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 18 +++++---- .../hbase/regionserver/RegionCoprocessorHost.java | 30 +++++++++++++++ 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index c14cbd1..43fa886 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -1029,13 +1030,57 @@ public interface RegionObserver { * @param oldCell old cell containing previous value * @param newCell the new cell containing the computed value * @return the new cell, possibly changed + * @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead. */ + @Deprecated default Cell postMutationBeforeWAL(ObserverContext ctx, MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { return newCell; } /** + * Called after a list of new cells has been created during an increment operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param oldCells old cells containing previous values + * @param newCells the new cells containing the computed values + * @return the new cells will be applied to WAL or memstore, possibly changed + */ + default List postIncrementBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, oldCells.get(i), + newCells.get(i))); + } + return toApply; + } + + /** + * Called after a list of new cells has been created during an append operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param oldCells old cells containing previous values + * @param newCells the new cells containing the computed values + * @return the new cells will be applied to WAL or memstore, possibly changed + */ + default List postAppendBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, oldCells.get(i), + newCells.get(i))); + } + return toApply; + } + + /** * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing * this hook would help in creating customised DeleteTracker and returning * the newly created DeleteTracker diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 21458c4..b14338a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8017,6 +8017,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List toApply = new ArrayList<>(deltas.size()); + List oldCells = new ArrayList<>(deltas.size()); // Get previous values for all columns in this family. TimeRange tr = null; switch (op) { @@ -8048,11 +8049,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Switch on whether this an increment or an append building the new Cell to apply. Cell newCell = null; - MutationType mutationType = null; boolean apply = true; switch (op) { case INCREMENT: - mutationType = MutationType.INCREMENT; // If delta amount to apply is 0, don't write WAL or MemStore. long deltaAmount = getLongValue(delta); // TODO: Does zero value mean reset Cell? For example, the ttl. @@ -8061,7 +8060,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); break; case APPEND: - mutationType = MutationType.APPEND; // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to. newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) @@ -8073,20 +8071,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi default: throw new UnsupportedOperationException(op.toString()); } - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newCell = - coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell); - } // If apply, we need to update memstore/WAL with new value; add it toApply. if (apply || firstWrite) { toApply.add(newCell); + oldCells.add(currentValue); } // Add to results to get returned to the Client. If null, cilent does not want results. if (results != null) { results.add(newCell); } } + + // Give coprocessors a chance to update the new cells before apply to WAL or memstore + if (coprocessorHost != null) { + // Here the operation must be increment or append. + toApply = op == Operation.INCREMENT ? + coprocessorHost.postIncrementBeforeWAL(mutation, oldCells, toApply) : + coprocessorHost.postAppendBeforeWAL(mutation, oldCells, toApply); + } return toApply; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index dea13ca..f714686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1705,6 +1705,36 @@ public class RegionCoprocessorHost }); } + public List postIncrementBeforeWAL(final Mutation mutation, final List oldCells, + List newCells) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return newCells; + } + return execOperationWithResult( + new ObserverOperationWithResult>(regionObserverGetter, + newCells) { + @Override + public List call(RegionObserver observer) throws IOException { + return observer.postIncrementBeforeWAL(this, mutation, oldCells, getResult()); + } + }); + } + + public List postAppendBeforeWAL(final Mutation mutation, final List oldCells, + List newCells) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return newCells; + } + return execOperationWithResult( + new ObserverOperationWithResult>(regionObserverGetter, + newCells) { + @Override + public List call(RegionObserver observer) throws IOException { + return observer.postAppendBeforeWAL(this, mutation, oldCells, getResult()); + } + }); + } + public Message preEndpointInvocation(final Service service, final String methodName, Message request) throws IOException { if (coprocEnvironments.isEmpty()) { -- 2.7.4