From 68569a2b32f6560af8fe26d9c82d3da8bb70ac3c Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 26 Dec 2018 17:42:02 +0800 Subject: [PATCH] HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL --- .../hadoop/hbase/coprocessor/RegionObserver.java | 45 ++++++++++++++++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 20 +++++----- .../hbase/regionserver/RegionCoprocessorHost.java | 28 +++++++++++--- .../hbase/security/access/AccessController.java | 27 +++++++++++-- .../security/visibility/VisibilityController.java | 26 +++++++++++-- 5 files changed, 123 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index c14cbd1..709523e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -1029,13 +1030,57 @@ public interface RegionObserver { * @param oldCell old cell containing previous value * @param newCell the new cell containing the computed value * @return the new cell, possibly changed + * @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead. */ + @Deprecated default Cell postMutationBeforeWAL(ObserverContext ctx, MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { return newCell; } /** + * Called after a list of new cells has been created during an increment operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param oldCells old cells containing previous values and the previous value may be null + * @param newCells the new cells containing the computed values + * @return the new cells will be applied to WAL or memstore, possibly changed + */ + default List postIncrementBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, oldCells.get(i), + newCells.get(i))); + } + return toApply; + } + + /** + * Called after a list of new cells has been created during an append operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param oldCells old cells containing previous values and the previous value may be null + * @param newCells the new cells containing the computed values + * @return the new cells will be applied to WAL or memstore, possibly changed + */ + default List postAppendBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, oldCells.get(i), + newCells.get(i))); + } + return toApply; + } + + /** * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing * this hook would help in creating customised DeleteTracker and returning * the newly created DeleteTracker diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dc0fa22..49f1f0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -8015,6 +8014,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List toApply = new ArrayList<>(deltas.size()); + List oldCells = new ArrayList<>(deltas.size()); // Get previous values for all columns in this family. TimeRange tr = null; switch (op) { @@ -8041,18 +8041,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi currentValuesIndex++; } } + // Switch on whether this an increment or an append building the new Cell to apply. Cell newCell = null; - MutationType mutationType = null; switch (op) { case INCREMENT: - mutationType = MutationType.INCREMENT; long deltaAmount = getLongValue(delta); final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); break; case APPEND: - mutationType = MutationType.APPEND; newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) @@ -8063,17 +8061,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi default: throw new UnsupportedOperationException(op.toString()); } - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newCell = - coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell); - } + oldCells.add(currentValue); toApply.add(newCell); // Add to results to get returned to the Client. If null, cilent does not want results. if (results != null) { results.add(newCell); } } + + // Give coprocessors a chance to update the new cells before apply to WAL or memstore + if (coprocessorHost != null) { + // Here the operation must be increment or append. + toApply = op == Operation.INCREMENT ? + coprocessorHost.postIncrementBeforeWAL(mutation, oldCells, toApply) : + coprocessorHost.postAppendBeforeWAL(mutation, oldCells, toApply); + } return toApply; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index dea13ca..4d95c6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1691,16 +1691,32 @@ public class RegionCoprocessorHost }); } - public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, - final Cell oldCell, Cell newCell) throws IOException { + public List postIncrementBeforeWAL(final Mutation mutation, final List oldCells, + List newCells) throws IOException { if (this.coprocEnvironments.isEmpty()) { - return newCell; + return newCells; } return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, newCell) { + new ObserverOperationWithResult>(regionObserverGetter, + newCells) { @Override - public Cell call(RegionObserver observer) throws IOException { - return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); + public List call(RegionObserver observer) throws IOException { + return observer.postIncrementBeforeWAL(this, mutation, oldCells, getResult()); + } + }); + } + + public List postAppendBeforeWAL(final Mutation mutation, final List oldCells, + List newCells) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return newCells; + } + return execOperationWithResult( + new ObserverOperationWithResult>(regionObserverGetter, + newCells) { + @Override + public List call(RegionObserver observer) throws IOException { + return observer.postAppendBeforeWAL(this, mutation, oldCells, getResult()); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 82ec12d..af22c61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -1849,8 +1849,28 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, } @Override - public Cell postMutationBeforeWAL(ObserverContext ctx, - MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + public List postIncrementBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(tryCreateNewCellWithTags(mutation, oldCells.get(i), newCells.get(i))); + } + return toApply; + } + + @Override + public List postAppendBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + assert oldCells.size() == newCells.size(); + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(tryCreateNewCellWithTags(mutation, oldCells.get(i), newCells.get(i))); + } + return toApply; + } + + private Cell tryCreateNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) { // If the HFile version is insufficient to persist tags, we won't have any // work to do here if (!cellFeaturesEnabled) { @@ -1901,8 +1921,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, return newCell; } - Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); - return rewriteCell; + return PrivateCellUtil.createCell(newCell, tags); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index c4f3b95..383cf18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -127,6 +127,7 @@ import org.slf4j.LoggerFactory; public class VisibilityController implements MasterCoprocessor, RegionCoprocessor, VisibilityLabelsService.Interface, MasterObserver, RegionObserver { + private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class); private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." + VisibilityController.class.getName()); @@ -688,8 +689,26 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso } @Override - public Cell postMutationBeforeWAL(ObserverContext ctx, - MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + public List postIncrementBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(tryCreateNewCellWithTags(mutation, newCells.get(i))); + } + return toApply; + } + + @Override + public List postAppendBeforeWAL(ObserverContext ctx, + Mutation mutation, List oldCells, List newCells) throws IOException { + List toApply = new ArrayList<>(newCells.size()); + for (int i = 0; i < newCells.size(); i++) { + toApply.add(tryCreateNewCellWithTags(mutation, newCells.get(i))); + } + return toApply; + } + + private Cell tryCreateNewCellWithTags(Mutation mutation, Cell newCell) throws IOException { List tags = Lists.newArrayList(); CellVisibility cellVisibility = null; try { @@ -715,8 +734,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso } } - Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); - return rewriteCell; + return PrivateCellUtil.createCell(newCell, tags); } @Override -- 2.7.4