.../security/visibility/VisibilityConstants.java | 6 + .../src/main/java/org/apache/hadoop/hbase/Tag.java | 22 +- .../main/java/org/apache/hadoop/hbase/TagType.java | 2 + .../coprocessor/BaseRegionServerObserver.java | 7 + .../hadoop/hbase/coprocessor/CoprocessorHost.java | 9 +- .../hbase/coprocessor/RegionServerObserver.java | 12 + .../DefaultVisibilityExpressionResolver.java | 7 + .../regionserver/RegionServerCoprocessorHost.java | 63 ++- .../regionserver/ReplicationSourceManager.java | 39 +- .../hbase/security/access/AccessController.java | 7 + .../DefaultVisibilityLabelServiceImpl.java | 68 ++- .../security/visibility/VisibilityController.java | 151 ++++++- .../visibility/VisibilityLabelOrdinalProvider.java | 7 + .../visibility/VisibilityLabelService.java | 25 +- .../security/visibility/VisibilityLabelsCache.java | 3 +- .../visibility/VisibilityReplicationEndpoint.java | 161 +++++++ .../hbase/security/visibility/VisibilityUtils.java | 33 ++ .../ExpAsStringVisibilityLabelServiceImpl.java | 89 +++- ...tVisibilityLabelReplicationWithExpAsString.java | 211 +++++++++ .../TestVisibilityLabelsReplication.java | 487 +++++++++++++++++++++ 20 files changed, 1359 insertions(+), 50 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java index ce5bca0..89a94a9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java @@ -52,4 +52,10 @@ public final class VisibilityConstants { public static final String CHECK_AUTHS_FOR_MUTATION = "hbase.security.visibility.mutations.checkauths"; + public static final String NOT_OPERATOR = "!"; + public static final String AND_OPERATOR = "&"; + public static final String OR_OPERATOR = "|"; + public static final String OPEN_PARAN = "("; + public static final String CLOSED_PARAN = ")"; + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index f214edb..d225855 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -35,7 +35,7 @@ public class Tag { public final static int TYPE_LENGTH_SIZE = Bytes.SIZEOF_BYTE; public final static int TAG_LENGTH_SIZE = Bytes.SIZEOF_SHORT; public final static int INFRASTRUCTURE_SIZE = TYPE_LENGTH_SIZE + TAG_LENGTH_SIZE; - private static final int MAX_TAG_LENGTH = (2 * Short.MAX_VALUE) + 1 - TAG_LENGTH_SIZE; + public static final int MAX_TAG_LENGTH = (2 * Short.MAX_VALUE) + 1 - TAG_LENGTH_SIZE; private final byte type; private final byte[] bytes; @@ -190,6 +190,26 @@ public class Tag { } return tags; } + + /** + * Write a list of tags into a byte array + * + * @param tags + * @return the serialized tag data as bytes + */ + public static byte[] fromList(List tags) { + int length = 0; + for (Tag tag : tags) { + length += tag.length; + } + byte[] b = new byte[length]; + int pos = 0; + for (Tag tag : tags) { + System.arraycopy(tag.bytes, tag.offset, b, pos, tag.length); + pos += tag.length; + } + return b; + } /** * Retrieve the first tag from the tags byte array matching the passed in tag type diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index 45c8476..b113516 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -28,4 +28,6 @@ public final class TagType { public static final byte VISIBILITY_TAG_TYPE = (byte) 2; public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; + // String based tag type used in replication + public static final byte STRING_VIS_TAG_TYPE = (byte) 7; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java index 5bc23d3..c21cdf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; /** * An abstract class that implements RegionServerObserver. @@ -76,4 +77,10 @@ public class BaseRegionServerObserver implements RegionServerObserver { public void postRollWALWriterRequest(ObserverContext ctx) throws IOException { } + @Override + public ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint) { + return endpoint; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index dc6d67e..08e74e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -33,12 +33,8 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import com.google.protobuf.Descriptors; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; @@ -48,6 +44,8 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.Delete; @@ -73,6 +71,7 @@ import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.io.MultipleIOException; +import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; @@ -99,7 +98,7 @@ public abstract class CoprocessorHost { public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror"; public static final boolean DEFAULT_ABORT_ON_ERROR = true; - private static final Log LOG = LogFactory.getLog(CoprocessorHost.class); + protected static final Log LOG = LogFactory.getLog(CoprocessorHost.class); protected Abortable abortable; /** Ordered set of loaded coprocessors with lock */ protected SortedSet coprocessors = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index 8a76d46..dfb993b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; public interface RegionServerObserver extends Coprocessor { @@ -121,4 +122,15 @@ public interface RegionServerObserver extends Coprocessor { void postRollWALWriterRequest(final ObserverContext ctx) throws IOException; + /** + * This will be called after the replication endpoint is instantiated. + * + * @param ctx + * @param endpoint + * - the base endpoint for replication + * @return the endpoint to use during replication. + */ + ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java index c7d4052..56eb1c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java @@ -113,6 +113,13 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression public int getLabelOrdinal(String label) { return labels.get(label); } + + @Override + public String getLabel(int ordinal) { + // Unused + throw new UnsupportedOperationException( + "getLabel should not be used in VisibilityExpressionResolver"); + } }; return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index f91642b..9e122cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -22,20 +22,20 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.MetaMutationAnnotation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving @@ -158,8 +158,49 @@ public class RegionServerCoprocessorHost extends }); } - private static abstract class CoprocessorOperation - extends ObserverContext { + public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) + throws IOException { + return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null + : new CoprocessOperationWithResult() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + try { + oserver.getClass().getDeclaredMethod("postCreateReplicationEndPoint", + ObserverContext.class, ReplicationEndpoint.class); + } catch (NoSuchMethodException e) { + LOG.warn("The RegionServer Observer class " + + oserver.getClass().getName() + + " does not have the" + + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility" + + " labels as strings"); + setResult(endpoint); + return; + } catch (SecurityException e) { + LOG.warn("The RegionServer Observer class " + + oserver.getClass().getName() + + " does not have the" + + "method postCreateReplicationEndPoint(). Consider upgrading inorder to replicate visibility" + + " labels as strings"); + setResult(endpoint); + return; + } + setResult(oserver.postCreateReplicationEndPoint(ctx, getResult())); + } + }); + } + + private T execOperationWithResult(final T defaultValue, + final CoprocessOperationWithResult ctx) throws IOException { + if (ctx == null) + return defaultValue; + ctx.setResult(defaultValue); + execOperation(ctx); + return ctx.getResult(); + } + + private static abstract class CoprocessorOperation extends + ObserverContext { public CoprocessorOperation() { } @@ -170,6 +211,18 @@ public class RegionServerCoprocessorHost extends } } + private static abstract class CoprocessOperationWithResult extends CoprocessorOperation { + private T result = null; + + public void setResult(final T result) { + this.result = result; + } + + public T getResult() { + return this.result; + } + } + private boolean execOperation(final CoprocessorOperation ctx) throws IOException { if (ctx == null) return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index f247bb1..42b0cfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -39,11 +39,13 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; @@ -53,7 +55,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; -import org.apache.zookeeper.KeeperException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -85,7 +86,7 @@ public class ReplicationSourceManager implements ReplicationListener { // UUID for this cluster private final UUID clusterId; // All about stopping - private final Stoppable stopper; + private final Server server; // All logs we are currently tracking private final Map> hlogsById; // Logs for recovered sources we are currently tracking @@ -120,7 +121,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, - final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, + final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. @@ -128,7 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; - this.stopper = stopper; + this.server = server; this.hlogsById = new HashMap>(); this.hlogsByIdRecoveredQueues = new ConcurrentHashMap>(); this.oldsources = new CopyOnWriteArrayList(); @@ -245,7 +246,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer); + this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -259,7 +260,7 @@ public class ReplicationSourceManager implements ReplicationListener { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; - stopper.stop(message); + server.stop(message); throw e; } src.enqueueLog(this.latestPath); @@ -369,9 +370,13 @@ public class ReplicationSourceManager implements ReplicationListener { protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, - final Stoppable stopper, final String peerId, final UUID clusterId, + final Server server, final String peerId, final UUID clusterId, final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) throws IOException { + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getCoprocessorHost(); + } ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -394,6 +399,16 @@ public class ReplicationSourceManager implements ReplicationListener { @SuppressWarnings("rawtypes") Class c = Class.forName(replicationEndpointImpl); replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + if(rsServerHost != null) { + // We may have to use reflections here to see if the method is really there. + // If not do not go with the visibility replication, go with the normal one + ReplicationEndpoint newReplicationEndPoint = rsServerHost + .postCreateReplicationEndPoint(replicationEndpoint); + if(newReplicationEndPoint != null) { + // Override the newly created endpoint from the hook with configured end point + replicationEndpoint = newReplicationEndPoint; + } + } } catch (Exception e) { LOG.warn("Passed replication endpoint implementation throws errors", e); throw new IOException(e); @@ -401,7 +416,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, + src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, replicationEndpoint, metrics); // init replication endpoint @@ -544,7 +559,7 @@ public class ReplicationSourceManager implements ReplicationListener { Thread.currentThread().interrupt(); } // We try to lock that rs' queue directory - if (stopper.isStopped()) { + if (server.isStopped()) { LOG.info("Not transferring queue since we are shutting down"); return; } @@ -580,7 +595,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - stopper, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer); if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index ceb6eec..34251b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2314,4 +2315,10 @@ public class AccessController extends BaseMasterAndRegionObserver @Override public void postRollWALWriterRequest(ObserverContext ctx) throws IOException { } + + @Override + public ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint) { + return endpoint; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java index f3dbcda..b2f4d84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/DefaultVisibilityLabelServiceImpl.java @@ -38,12 +38,13 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -593,4 +594,69 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService } return matchFound; } + + @Override + public byte[] encodeVisibilityForReplication(final List tags, final Byte serializationFormat) + throws IOException { + if (tags.size() > 0 + && (serializationFormat == null || + serializationFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)) { + return createModifiedVisExpression(tags); + } + return null; + } + + /** + * @param tags + * - all the visibility tags associated with the current Cell + * @return - the modified visibility expression as byte[] + */ + private byte[] createModifiedVisExpression(final List tags) + throws IOException { + StringBuilder visibilityString = new StringBuilder(); + for (Tag tag : tags) { + if (tag.getType() == TagType.VISIBILITY_TAG_TYPE) { + if (visibilityString.length() != 0) { + visibilityString.append(VisibilityConstants.CLOSED_PARAN).append( + VisibilityConstants.OR_OPERATOR); + } + int offset = tag.getTagOffset(); + int endOffset = offset + tag.getTagLength(); + boolean expressionStart = true; + while (offset < endOffset) { + Pair result = StreamUtils.readRawVarint32(tag.getBuffer(), offset); + int currLabelOrdinal = result.getFirst(); + if (currLabelOrdinal < 0) { + int temp = -currLabelOrdinal; + String label = this.labelsCache.getLabel(temp); + if (expressionStart) { + // Quote every label in case of unicode characters if present + visibilityString.append(VisibilityConstants.OPEN_PARAN) + .append(VisibilityConstants.NOT_OPERATOR).append(CellVisibility.quote(label)); + } else { + visibilityString.append(VisibilityConstants.AND_OPERATOR) + .append(VisibilityConstants.NOT_OPERATOR).append(CellVisibility.quote(label)); + } + } else { + String label = this.labelsCache.getLabel(currLabelOrdinal); + if (expressionStart) { + visibilityString.append(VisibilityConstants.OPEN_PARAN).append( + CellVisibility.quote(label)); + } else { + visibilityString.append(VisibilityConstants.AND_OPERATOR).append( + CellVisibility.quote(label)); + } + } + expressionStart = false; + offset += result.getSecond(); + } + } + } + if (visibilityString.length() != 0) { + visibilityString.append(VisibilityConstants.CLOSED_PARAN); + // Return the string formed as byte[] + return Bytes.toBytes(visibilityString.toString()); + } + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 98cdbba..85c75e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -32,14 +32,13 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver; +import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; @@ -93,12 +93,15 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.security.access.AccessController; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; @@ -135,6 +138,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements static { RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_TAG_TYPE); RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE); + RESERVED_VIS_TAG_TYPES.add(TagType.STRING_VIS_TAG_TYPE); } @Override @@ -145,14 +149,13 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements + " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY + " accordingly."); } + if (env instanceof RegionServerCoprocessorEnvironment) { - throw new RuntimeException( - "Visibility controller should not be configured as " + - "'hbase.coprocessor.regionserver.classes'."); + throw new RuntimeException("Visibility controller should not be configured as " + + "'hbase.coprocessor.regionserver.classes'."); } - - if (env instanceof RegionCoprocessorEnvironment) { - // VisibilityLabelService to be instantiated only with Region Observer. + // Do not create for master CPs + if (!(env instanceof MasterCoprocessorEnvironment)) { visibilityLabelService = VisibilityLabelServiceManager.getInstance() .getVisibilityLabelService(this.conf); } @@ -283,12 +286,24 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements continue; } boolean sanityFailure = false; + boolean modifiedTagFound = false; + Pair pair = new Pair(false, null); for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - if (!checkForReservedVisibilityTagPresence(cellScanner.current())) { + pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair); + if (!pair.getFirst()) { miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE, "Mutation contains cell with reserved type tag")); sanityFailure = true; break; + } else { + // Indicates that the cell has a the tag which was modified in the src replication cluster + Tag tag = pair.getSecond(); + if (cellVisibility == null && tag != null) { + // May need to store only the first one + cellVisibility = new CellVisibility(Bytes.toString(tag.getBuffer(), tag.getTagOffset(), + tag.getTagLength())); + modifiedTagFound = true; + } } } if (!sanityFailure) { @@ -315,6 +330,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements Cell cell = cellScanner.current(); List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLengthUnsigned()); + if (modifiedTagFound) { + // Rewrite the tags by removing the modified tags. + removeReplicationVisibilityTag(tags); + } tags.addAll(visibilityTags); Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), @@ -389,13 +408,81 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements ctx.bypass(); } - // Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. - // This tag type is reserved and should not be explicitly set by user. - private boolean checkForReservedVisibilityTagPresence(Cell cell) throws IOException { + /** + * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This + * tag type is reserved and should not be explicitly set by user. + * + * @param cell + * - the cell under consideration + * @param pair - an optional pair of type which would be reused + * if already set and new one will be created if null is passed + * @return a pair - if the boolean is false then it indicates + * that the cell has a RESERVERD_VIS_TAG and with boolean as true, not + * null tag indicates that a string modified tag was found. + */ + private Pair checkForReservedVisibilityTagPresence(Cell cell, + Pair pair) throws IOException { + if (pair == null) { + pair = new Pair(false, null); + } else { + pair.setFirst(false); + pair.setSecond(null); + } // Bypass this check when the operation is done by a system/super user. // This is done because, while Replication, the Cells coming to the peer cluster with reserved // typed tags and this is fine and should get added to the peer cluster table if (isSystemOrSuperUser()) { + // Does the cell contain special tag which indicates that the replicated + // cell visiblilty tags + // have been modified + Tag modifiedTag = null; + if (cell.getTagsLength() > 0) { + Iterator tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), + cell.getTagsOffset(), cell.getTagsLength()); + while (tagsIterator.hasNext()) { + Tag tag = tagsIterator.next(); + if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) { + modifiedTag = tag; + break; + } + } + } + pair.setFirst(true); + pair.setSecond(modifiedTag); + return pair; + } + if (cell.getTagsLength() > 0) { + Iterator tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + while (tagsItr.hasNext()) { + if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) { + return pair; + } + } + } + pair.setFirst(true); + return pair; + } + + /** + * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This + * tag type is reserved and should not be explicitly set by user. There are + * two versions of this method one that accepts pair and other without pair. + * In case of preAppend and preIncrement the additional operations are not + * needed like checking for STRING_VIS_TAG_TYPE and hence the API without pair + * could be used. + * + * @param cell + * @return + * @throws IOException + */ + private boolean checkForReservedVisibilityTagPresence(Cell cell) throws IOException { + // Bypass this check when the operation is done by a system/super user. + // This is done because, while Replication, the Cells coming to the peer + // cluster with reserved + // typed tags and this is fine and should get added to the peer cluster + // table + if (isSystemOrSuperUser()) { return true; } if (cell.getTagsLengthUnsigned() > 0) { @@ -410,6 +497,17 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements return true; } + private void removeReplicationVisibilityTag(List tags) throws IOException { + Iterator iterator = tags.iterator(); + while (iterator.hasNext()) { + Tag tag = iterator.next(); + if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) { + iterator.remove(); + break; + } + } + } + @Override public RegionScanner preScannerOpen(ObserverContext e, Scan scan, RegionScanner s) throws IOException { @@ -855,4 +953,33 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements public void postTruncateTableHandler(ObserverContext ctx, TableName tableName) throws IOException { } + + /** + * A RegionServerObserver impl that provides the custom + * VisibilityReplicationEndpoint. This class should be configured as the + * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be + * replicated as string. The value for the configuration should be + * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'. + */ + public static class VisibilityReplication extends BaseRegionServerObserver { + private Configuration conf; + private VisibilityLabelService visibilityLabelService; + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + this.conf = env.getConfiguration(); + visibilityLabelService = VisibilityLabelServiceManager.getInstance() + .getVisibilityLabelService(this.conf); + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } + + @Override + public ReplicationEndpoint postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint) { + return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelOrdinalProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelOrdinalProvider.java index 81be70b..e82589f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelOrdinalProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelOrdinalProvider.java @@ -28,4 +28,11 @@ public interface VisibilityLabelOrdinalProvider { * existing label. */ public int getLabelOrdinal(String label); + + /** + * Returns the string associated with the ordinal. Not be used in MR. + * @param ordinal representing the visibility label's ordinal + * @return label associated with the string, null if not found + */ + public String getLabel(int ordinal); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java index 7f4c2ca..cc317d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelService.java @@ -20,10 +20,11 @@ package org.apache.hadoop.hbase.security.visibility; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.OperationStatus; @@ -139,4 +140,24 @@ public interface VisibilityLabelService extends Configurable { */ boolean matchVisibility(List putVisTags, Byte putVisTagFormat, List deleteVisTags, Byte deleteVisTagFormat) throws IOException; + + /** + * Provides a way to modify the visibility tags of type {@link TagType} + * .VISIBILITY_TAG_TYPE, that are part of the cell created from the WALEdits + * that are prepared for replication while calling + * {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} + * .replicate(). + * {@link org.apache.hadoop.hbase.security.visibility.VisibilityReplicationEndpoint} + * calls this API to provide an opportunity to modify the visibility tags + * before replicating. + * + * @param visTags + * the visibility tags associated with the cell + * @param serializationFormat + * the serialization format associated with the tag + * @return the modified visibility expression in the form of byte[] + * @throws IOException + */ + byte[] encodeVisibilityForReplication(final List visTags, + final Byte serializationFormat) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java index 693134b..a5c2155 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsCache.java @@ -29,8 +29,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations; @@ -174,6 +174,7 @@ public class VisibilityLabelsCache implements VisibilityLabelOrdinalProvider { * @return The label having the given ordinal. Returns null when no label exist in * the system with given ordinal */ + @Override public String getLabel(int ordinal) { this.lock.readLock().lock(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java new file mode 100644 index 0000000..75da263 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.visibility; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; + +import com.google.common.util.concurrent.ListenableFuture; + +@InterfaceAudience.Private +public class VisibilityReplicationEndpoint implements ReplicationEndpoint { + + private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class); + private ReplicationEndpoint delegator; + private VisibilityLabelService visibilityLabelsService; + + public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, + VisibilityLabelService visibilityLabelsService) { + this.delegator = endpoint; + this.visibilityLabelsService = visibilityLabelsService; + } + + @Override + public void init(Context context) throws IOException { + delegator.init(context); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + if (!delegator.canReplicateToSameCluster()) { + // Only when the replication is inter cluster replication we need to covert the visibility tags to + // string based tags. But for intra cluster replication like region replicas it is not needed. + List entries = replicateContext.getEntries(); + List visTags = new ArrayList(); + List nonVisTags = new ArrayList(); + List newEntries = new ArrayList(entries.size()); + for (Entry entry : entries) { + WALEdit newEdit = new WALEdit(); + ArrayList cells = entry.getEdit().getCells(); + for (Cell cell : cells) { + if (cell.getTagsLengthUnsigned() > 0) { + visTags.clear(); + nonVisTags.clear(); + Byte serializationFormat = VisibilityUtils.extractAndPartitionTags(cell, visTags, + nonVisTags); + if (!visTags.isEmpty()) { + try { + byte[] modifiedVisExpression = visibilityLabelsService + .encodeVisibilityForReplication(visTags, serializationFormat); + if (modifiedVisExpression != null) { + nonVisTags.add(new Tag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression)); + } + } catch (Exception ioe) { + LOG.error( + "Exception while reading the visibility labels from the cell. The replication " + + "would happen as per the existing format and not as string type for the cell " + + cell + ".", ioe); + // just return the old entries as it is without applying the string type change + newEdit.add(cell); + continue; + } + // Recreate the cell with the new tags and the existing tags + Cell newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell + .getTypeByte()), cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), nonVisTags); + newEdit.add(newCell); + } else { + newEdit.add(cell); + } + } else { + newEdit.add(cell); + } + } + newEntries.add(new Entry(entry.getKey(), newEdit)); + } + replicateContext.setEntries(newEntries); + return delegator.replicate(replicateContext); + } else { + return delegator.replicate(replicateContext); + } + } + + @Override + public synchronized UUID getPeerUUID() { + return delegator.getPeerUUID(); + } + + @Override + public boolean canReplicateToSameCluster() { + return delegator.canReplicateToSameCluster(); + } + + @Override + public WALEntryFilter getWALEntryfilter() { + return delegator.getWALEntryfilter(); + } + + @Override + public boolean isRunning() { + return delegator.isRunning(); + } + + @Override + public ListenableFuture start() { + return delegator.start(); + } + + @Override + public State startAndWait() { + return delegator.startAndWait(); + } + + @Override + public State state() { + return delegator.state(); + } + + @Override + public ListenableFuture stop() { + return delegator.stop(); + } + + @Override + public State stopAndWait() { + return delegator.stopAndWait(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java index d1c5bb0..f2bdb13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java @@ -226,6 +226,39 @@ public class VisibilityUtils { return serializationFormat; } + /** + * Extracts and partitions the visibility tags and nonVisibility Tags + * + * @param cell - the cell for which we would extract and partition the + * visibility and non visibility tags + * @param visTags + * - all the visibilty tags of type TagType.VISIBILITY_TAG_TYPE would + * be added to this list + * @param nonVisTags - all the non visibility tags would be added to this list + * @return - the serailization format of the tag. Can be null if no tags are found or + * if there is no visibility tag found + */ + public static Byte extractAndPartitionTags(Cell cell, List visTags, + List nonVisTags) { + Byte serializationFormat = null; + if (cell.getTagsLength() > 0) { + Iterator tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + while (tagsIterator.hasNext()) { + Tag tag = tagsIterator.next(); + if (tag.getType() == TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) { + serializationFormat = tag.getBuffer()[tag.getTagOffset()]; + } else if (tag.getType() == VISIBILITY_TAG_TYPE) { + visTags.add(tag); + } else { + // ignore string encoded visibility expressions, will be added in replication handling + nonVisTags.add(tag); + } + } + } + return serializationFormat; + } + public static boolean isVisibilityTagsPresent(Cell cell) { if (cell.getTagsLengthUnsigned() == 0) { return false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java index 5d30b74..c133e3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java @@ -32,13 +32,13 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -58,9 +58,10 @@ import org.apache.hadoop.hbase.util.Bytes; import com.google.common.collect.Lists; /** - * This is a VisibilityLabelService where labels in Mutation's visibility expression will be - * persisted as Strings itself rather than ordinals in 'labels' table. Also there is no need to add - * labels to the system, prior to using them in Mutations/Authorizations. + * This is a VisibilityLabelService where labels in Mutation's visibility + * expression will be persisted as Strings itself rather than ordinals in + * 'labels' table. Also there is no need to add labels to the system, prior to + * using them in Mutations/Authorizations. */ @InterfaceAudience.Private public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelService { @@ -71,7 +72,7 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer private static final byte STRING_SERIALIZATION_FORMAT = 2; private static final Tag STRING_SERIALIZATION_FORMAT_TAG = new Tag( TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE, - new byte[]{STRING_SERIALIZATION_FORMAT}); + new byte[] { STRING_SERIALIZATION_FORMAT }); private final ExpressionParser expressionParser = new ExpressionParser(); private final ExpressionExpander expressionExpander = new ExpressionExpander(); private Configuration conf; @@ -80,7 +81,8 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer @Override public OperationStatus[] addLabels(List labels) throws IOException { - // Not doing specific label add. We will just add labels in Mutation visibility expression as it + // Not doing specific label add. We will just add labels in Mutation + // visibility expression as it // is along with every cell. OperationStatus[] status = new OperationStatus[labels.size()]; for (int i = 0; i < labels.size(); i++) { @@ -251,7 +253,8 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer offset += len; } if (includeKV) { - // We got one visibility expression getting evaluated to true. Good to include this + // We got one visibility expression getting evaluated to true. + // Good to include this // KV in the result then. return true; } @@ -277,7 +280,8 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer Collections.sort(labels); Collections.sort(notLabels); // We will write the NOT labels 1st followed by normal labels - // Each of the label we will write with label length (as short 1st) followed by the label bytes. + // Each of the label we will write with label length (as short 1st) followed + // by the label bytes. // For a NOT node we will write the label length as -ve. for (String label : notLabels) { byte[] bLabel = Bytes.toBytes(label); @@ -376,7 +380,8 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer private static boolean checkForMatchingVisibilityTagsWithSortedOrder(List putVisTags, List deleteVisTags) { boolean matchFound = false; - // If the size does not match. Definitely we are not comparing the equal tags. + // If the size does not match. Definitely we are not comparing the equal + // tags. if ((deleteVisTags.size()) == putVisTags.size()) { for (Tag tag : deleteVisTags) { matchFound = false; @@ -387,9 +392,71 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer break; } } - if (!matchFound) break; + if (!matchFound) + break; } } return matchFound; } + + @Override + public byte[] encodeVisibilityForReplication(final List tags, final Byte serializationFormat) + throws IOException { + if (tags.size() > 0 && (serializationFormat == null + || serializationFormat == STRING_SERIALIZATION_FORMAT)) { + return createModifiedVisExpression(tags); + } + return null; + } + + /** + * @param tags - all the tags associated with the current Cell + * @return - the modified visibility expression as byte[] + */ + private byte[] createModifiedVisExpression(final List tags) + throws IOException { + StringBuilder visibilityString = new StringBuilder(); + for (Tag tag : tags) { + if (tag.getType() == TagType.VISIBILITY_TAG_TYPE) { + if (visibilityString.length() != 0) { + visibilityString.append(VisibilityConstants.CLOSED_PARAN + + VisibilityConstants.OR_OPERATOR); + } + int offset = tag.getTagOffset(); + int endOffset = offset + tag.getTagLength(); + boolean expressionStart = true; + while (offset < endOffset) { + short len = Bytes.toShort(tag.getBuffer(), offset); + offset += 2; + if (len < 0) { + len = (short) (-1 * len); + String label = Bytes.toString(tag.getBuffer(), offset, len); + if (expressionStart) { + visibilityString.append(VisibilityConstants.OPEN_PARAN + + VisibilityConstants.NOT_OPERATOR + CellVisibility.quote(label)); + } else { + visibilityString.append(VisibilityConstants.AND_OPERATOR + + VisibilityConstants.NOT_OPERATOR + CellVisibility.quote(label)); + } + } else { + String label = Bytes.toString(tag.getBuffer(), offset, len); + if (expressionStart) { + visibilityString.append(VisibilityConstants.OPEN_PARAN + CellVisibility.quote(label)); + } else { + visibilityString.append(VisibilityConstants.AND_OPERATOR + + CellVisibility.quote(label)); + } + } + expressionStart = false; + offset += len; + } + } + } + if (visibilityString.length() != 0) { + visibilityString.append(VisibilityConstants.CLOSED_PARAN); + // Return the string formed as byte[] + return Bytes.toBytes(visibilityString.toString()); + } + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java new file mode 100644 index 0000000..788a824 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.visibility; + +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +@Category(org.apache.hadoop.hbase.MediumTests.class) +public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilityLabelsReplication { + private static final Log LOG = LogFactory + .getLog(TestVisibilityLabelReplicationWithExpAsString.class); + + @Override + @Before + public void setup() throws Exception { + expected[0] = 4; + expected[1] = 6; + expected[2] = 4; + expected[3] = 0; + expected[3] = 3; + expectedVisString[0] = "(\"public\"&\"secret\"&\"topsecret\")|(\"confidential\"&\"topsecret\")"; + expectedVisString[1] = "(\"private\"&\"public\")|(\"private\"&\"topsecret\")|" + + "(\"confidential\"&\"public\")|(\"confidential\"&\"topsecret\")"; + expectedVisString[2] = "(!\"topsecret\"&\"secret\")|(!\"topsecret\"&\"confidential\")"; + expectedVisString[3] = "(\"secret\"&\"" + COPYRIGHT + "\\\"" + ACCENT + "\\\\" + SECRET + + "\\\"" + "\u0027&\\\\" + "\")"; + // setup configuration + conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + conf.setBoolean("hbase.online.schema.update.enable", true); + conf.setInt("hfile.format.version", 3); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf.setInt("replication.source.size.capacity", 10240); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setBoolean("dfs.support.append", true); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + setVisibilityLabelServiceImpl(conf, ExpAsStringVisibilityLabelServiceImpl.class); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + VisibilityTestUtil.enableVisiblityLabels(conf); + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + VisibilityReplication.class.getName()); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + SimpleCP.class.getName()); + // Have to reset conf1 in case zk cluster location different + // than default + conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, + ScanLabelGenerator.class); + conf.set("hbase.superuser", "admin"); + conf.set("hbase.superuser", User.getCurrent().getShortName()); + SUPERUSER = User.createUserForTesting(conf, User.getCurrent().getShortName(), + new String[] { "supergroup" }); + User.createUserForTesting(conf, + User.getCurrent().getShortName(), new String[] { "supergroup" }); + USER1 = User.createUserForTesting(conf, "user1", new String[] {}); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); + zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true); + replicationAdmin = new ReplicationAdmin(conf); + + // Base conf2 on conf1 so it gets the right zk cluster. + conf1 = HBaseConfiguration.create(conf); + conf1.setInt("hfile.format.version", 3); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf1.setBoolean("dfs.support.append", true); + conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForTagsAtSink.class.getName()); + setVisibilityLabelServiceImpl(conf1, ExpAsStringVisibilityLabelServiceImpl.class); + TEST_UTIL1 = new HBaseTestingUtility(conf1); + TEST_UTIL1.setZkCluster(miniZK); + zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true); + replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey()); + + TEST_UTIL.startMiniCluster(1); + // Wait for the labels table to become available + TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000); + TEST_UTIL1.startMiniCluster(1); + HBaseAdmin hBaseAdmin = TEST_UTIL.getHBaseAdmin(); + HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); + HColumnDescriptor desc = new HColumnDescriptor(fam); + desc.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(desc); + try { + hBaseAdmin.createTable(table); + } finally { + if (hBaseAdmin != null) { + hBaseAdmin.close(); + } + } + HBaseAdmin hBaseAdmin1 = TEST_UTIL1.getHBaseAdmin(); + try { + hBaseAdmin1.createTable(table); + } finally { + if (hBaseAdmin1 != null) { + hBaseAdmin1.close(); + } + } + addLabels(); + setAuths(conf); + setAuths(conf1); + } + + protected static void setVisibilityLabelServiceImpl(Configuration conf, Class clazz) { + conf.setClass(VisibilityLabelServiceManager.VISIBILITY_LABEL_SERVICE_CLASS, + clazz, VisibilityLabelService.class); + } + + @Override + protected void verifyGet(final byte[] row, final String visString, final int expected, + final boolean nullExpected, final String... auths) throws IOException, + InterruptedException { + PrivilegedExceptionAction scanAction = new PrivilegedExceptionAction() { + HTable table2 = null; + + public Void run() throws Exception { + try { + table2 = new HTable(conf1, TABLE_NAME_BYTES); + CellScanner cellScanner; + Cell current; + Get get = new Get(row); + get.setAuthorizations(new Authorizations(auths)); + Result result = table2.get(get); + cellScanner = result.cellScanner(); + boolean advance = cellScanner.advance(); + if (nullExpected) { + assertTrue(!advance); + return null; + } + current = cellScanner.current(); + assertArrayEquals(CellUtil.cloneRow(current), row); + assertEquals(expected, TestCoprocessorForTagsAtSink.tags.size()); + boolean foundNonVisTag = false; + for(Tag t : TestCoprocessorForTagsAtSink.tags) { + if(t.getType() == NON_VIS_TAG_TYPE) { + assertEquals(TEMP, Bytes.toString(t.getValue())); + foundNonVisTag = true; + break; + } + } + doAssert(row, visString); + assertTrue(foundNonVisTag); + return null; + } finally { + if (table2 != null) { + table2.close(); + } + } + } + }; + USER1.runAs(scanAction); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java new file mode 100644 index 0000000..73555bf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.visibility; + +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(org.apache.hadoop.hbase.MediumTests.class) +public class TestVisibilityLabelsReplication { + private static final Log LOG = LogFactory.getLog(TestVisibilityLabelsReplication.class); + protected static final int NON_VIS_TAG_TYPE = 100; + protected static final String TEMP = "temp"; + protected static Configuration conf; + protected static Configuration conf1; + protected static String TABLE_NAME = "TABLE_NAME"; + protected static byte[] TABLE_NAME_BYTES = Bytes.toBytes(TABLE_NAME); + protected static ReplicationAdmin replicationAdmin; + public static final String TOPSECRET = "topsecret"; + public static final String PUBLIC = "public"; + public static final String PRIVATE = "private"; + public static final String CONFIDENTIAL = "confidential"; + public static final String COPYRIGHT = "\u00A9ABC"; + public static final String ACCENT = "\u0941"; + public static final String SECRET = "secret"; + public static final String UNICODE_VIS_TAG = COPYRIGHT + "\"" + ACCENT + "\\" + SECRET + "\"" + + "\u0027&\\"; + public static HBaseTestingUtility TEST_UTIL; + public static HBaseTestingUtility TEST_UTIL1; + public static final byte[] row1 = Bytes.toBytes("row1"); + public static final byte[] row2 = Bytes.toBytes("row2"); + public static final byte[] row3 = Bytes.toBytes("row3"); + public static final byte[] row4 = Bytes.toBytes("row4"); + public final static byte[] fam = Bytes.toBytes("info"); + public final static byte[] qual = Bytes.toBytes("qual"); + public final static byte[] value = Bytes.toBytes("value"); + protected static ZooKeeperWatcher zkw1; + protected static ZooKeeperWatcher zkw2; + protected static int expected[] = { 4, 6, 4, 0, 3 }; + private static final String NON_VISIBILITY = "non-visibility"; + protected static String[] expectedVisString = { + "(\"secret\"&\"topsecret\"&\"public\")|(\"topsecret\"&\"confidential\")", + "(\"public\"&\"private\")|(\"topsecret\"&\"private\")|" + + "(\"confidential\"&\"public\")|(\"topsecret\"&\"confidential\")", + "(!\"topsecret\"&\"secret\")|(!\"topsecret\"&\"confidential\")", + "(\"secret\"&\"" + COPYRIGHT + "\\\"" + ACCENT + "\\\\" + SECRET + "\\\"" + "\u0027&\\\\" + + "\")" }; + + @Rule + public final TestName TEST_NAME = new TestName(); + public static User SUPERUSER, USER1; + + @Before + public void setup() throws Exception { + // setup configuration + conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + conf.setBoolean("hbase.online.schema.update.enable", true); + conf.setInt("hfile.format.version", 3); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf.setInt("replication.source.size.capacity", 10240); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setBoolean("dfs.support.append", true); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + setVisibilityLabelServiceImpl(conf); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + VisibilityTestUtil.enableVisiblityLabels(conf); + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + VisibilityReplication.class.getName()); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + SimpleCP.class.getName()); + // Have to reset conf1 in case zk cluster location different + // than default + conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, + ScanLabelGenerator.class); + conf.set("hbase.superuser", User.getCurrent().getShortName()); + SUPERUSER = User.createUserForTesting(conf, User.getCurrent().getShortName(), + new String[] { "supergroup" }); + // User.createUserForTesting(conf, User.getCurrent().getShortName(), new + // String[] { "supergroup" }); + USER1 = User.createUserForTesting(conf, "user1", new String[] {}); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); + zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true); + replicationAdmin = new ReplicationAdmin(conf); + + // Base conf2 on conf1 so it gets the right zk cluster. + conf1 = HBaseConfiguration.create(conf); + conf1.setInt("hfile.format.version", 3); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf1.setBoolean("dfs.support.append", true); + conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessorForTagsAtSink.class.getName()); + // setVisibilityLabelServiceImpl(conf1); + USER1 = User.createUserForTesting(conf1, "user1", new String[] {}); + TEST_UTIL1 = new HBaseTestingUtility(conf1); + TEST_UTIL1.setZkCluster(miniZK); + zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true); + replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey()); + + TEST_UTIL.startMiniCluster(1); + // Wait for the labels table to become available + TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000); + TEST_UTIL1.startMiniCluster(1); + HBaseAdmin hBaseAdmin = TEST_UTIL.getHBaseAdmin(); + HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); + HColumnDescriptor desc = new HColumnDescriptor(fam); + desc.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(desc); + try { + hBaseAdmin.createTable(table); + } finally { + if (hBaseAdmin != null) { + hBaseAdmin.close(); + } + } + HBaseAdmin hBaseAdmin1 = TEST_UTIL1.getHBaseAdmin(); + try { + hBaseAdmin1.createTable(table); + } finally { + if (hBaseAdmin1 != null) { + hBaseAdmin1.close(); + } + } + addLabels(); + setAuths(conf); + setAuths(conf1); + } + + protected static void setVisibilityLabelServiceImpl(Configuration conf) { + conf.setClass(VisibilityLabelServiceManager.VISIBILITY_LABEL_SERVICE_CLASS, + DefaultVisibilityLabelServiceImpl.class, VisibilityLabelService.class); + } + + @Test + public void testVisibilityReplication() throws Exception { + TableName tableName = TableName.valueOf(TABLE_NAME); + HTable table = writeData(tableName, "(" + SECRET + "&" + PUBLIC + ")" + "|(" + CONFIDENTIAL + + ")&(" + TOPSECRET + ")", "(" + PRIVATE + "|" + CONFIDENTIAL + ")&(" + PUBLIC + "|" + + TOPSECRET + ")", "(" + SECRET + "|" + CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET, + CellVisibility.quote(UNICODE_VIS_TAG) + "&" + SECRET); + int retry = 0; + try { + Scan s = new Scan(); + s.setAuthorizations(new Authorizations(SECRET, CONFIDENTIAL, PRIVATE, TOPSECRET, + UNICODE_VIS_TAG)); + ResultScanner scanner = table.getScanner(s); + Result[] next = scanner.next(4); + + assertTrue(next.length == 4); + CellScanner cellScanner = next[0].cellScanner(); + cellScanner.advance(); + Cell current = cellScanner.current(); + assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), + current.getRowLength(), row1, 0, row1.length)); + cellScanner = next[1].cellScanner(); + cellScanner.advance(); + current = cellScanner.current(); + assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), + current.getRowLength(), row2, 0, row2.length)); + cellScanner = next[2].cellScanner(); + cellScanner.advance(); + current = cellScanner.current(); + assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), + current.getRowLength(), row3, 0, row3.length)); + cellScanner = next[3].cellScanner(); + cellScanner.advance(); + current = cellScanner.current(); + assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), + current.getRowLength(), row4, 0, row4.length)); + HTable table2 = null; + try { + table2 = new HTable(TEST_UTIL1.getConfiguration(), TABLE_NAME_BYTES); + s = new Scan(); + // Ensure both rows are replicated + scanner = table2.getScanner(s); + next = scanner.next(4); + while (next.length == 0 && retry <= 10) { + scanner = table2.getScanner(s); + next = scanner.next(4); + Thread.sleep(2000); + retry++; + } + assertTrue(next.length == 4); + verifyGet(row1, expectedVisString[0], expected[0], false, TOPSECRET, CONFIDENTIAL); + TestCoprocessorForTagsAtSink.tags.clear(); + verifyGet(row2, expectedVisString[1], expected[1], false, CONFIDENTIAL, PUBLIC); + TestCoprocessorForTagsAtSink.tags.clear(); + verifyGet(row3, expectedVisString[2], expected[2], false, PRIVATE, SECRET); + verifyGet(row3, "", expected[3], true, TOPSECRET, SECRET); + verifyGet(row4, expectedVisString[3], expected[4], false, UNICODE_VIS_TAG, SECRET); + } finally { + if (table2 != null) { + table2.close(); + } + } + } finally { + if (table != null) { + table.close(); + } + } + } + + protected static void doAssert(byte[] row, String visTag) throws Exception { + if (VisibilityReplicationEndPointForTest.lastEntries == null) { + return; // first call + } + Assert.assertEquals(1, VisibilityReplicationEndPointForTest.lastEntries.size()); + List cells = VisibilityReplicationEndPointForTest.lastEntries.get(0).getEdit().getCells(); + Assert.assertEquals(4, cells.size()); + boolean tagFound = false; + for (Cell cell : cells) { + if ((Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, + row.length))) { + List tags = Tag + .asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + for (Tag tag : tags) { + if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) { + assertEquals(visTag, Bytes.toString(tag.getValue())); + tagFound = true; + break; + } + } + } + } + assertTrue(tagFound); + } + + protected void verifyGet(final byte[] row, final String visString, final int expected, + final boolean nullExpected, final String... auths) throws IOException, + InterruptedException { + PrivilegedExceptionAction scanAction = new PrivilegedExceptionAction() { + HTable table2 = null; + + public Void run() throws Exception { + try { + table2 = new HTable(conf1, TABLE_NAME_BYTES); + CellScanner cellScanner; + Cell current; + Get get = new Get(row); + get.setAuthorizations(new Authorizations(auths)); + Result result = table2.get(get); + cellScanner = result.cellScanner(); + boolean advance = cellScanner.advance(); + if (nullExpected) { + assertTrue(!advance); + return null; + } + current = cellScanner.current(); + assertArrayEquals(CellUtil.cloneRow(current), row); + for (Tag tag : TestCoprocessorForTagsAtSink.tags) { + LOG.info("The tag type is " + tag.getType()); + } + assertEquals(expected, TestCoprocessorForTagsAtSink.tags.size()); + Tag tag = TestCoprocessorForTagsAtSink.tags.get(1); + if (tag.getType() != NON_VIS_TAG_TYPE) { + assertEquals(TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE, tag.getType()); + } + tag = TestCoprocessorForTagsAtSink.tags.get(0); + boolean foundNonVisTag = false; + for (Tag t : TestCoprocessorForTagsAtSink.tags) { + if (t.getType() == NON_VIS_TAG_TYPE) { + assertEquals(TEMP, Bytes.toString(t.getValue())); + foundNonVisTag = true; + break; + } + } + doAssert(row, visString); + assertTrue(foundNonVisTag); + return null; + } finally { + if (table2 != null) { + table2.close(); + } + } + } + }; + USER1.runAs(scanAction); + } + + public static void addLabels() throws Exception { + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { + public VisibilityLabelsResponse run() throws Exception { + String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE, UNICODE_VIS_TAG }; + try { + VisibilityClient.addLabels(conf, labels); + } catch (Throwable t) { + throw new IOException(t); + } + return null; + } + }; + SUPERUSER.runAs(action); + } + + public static void setAuths(final Configuration conf) throws Exception { + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { + public VisibilityLabelsResponse run() throws Exception { + try { + return VisibilityClient.setAuths(conf, new String[] { SECRET, CONFIDENTIAL, PRIVATE, + TOPSECRET, UNICODE_VIS_TAG }, "user1"); + } catch (Throwable e) { + throw new Exception(e); + } + } + }; + VisibilityLabelsResponse response = SUPERUSER.runAs(action); + } + + static HTable writeData(TableName tableName, String... labelExps) throws Exception { + HTable table = null; + try { + table = new HTable(conf, TABLE_NAME_BYTES); + int i = 1; + List puts = new ArrayList(); + for (String labelExp : labelExps) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); + put.setCellVisibility(new CellVisibility(labelExp)); + put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); + puts.add(put); + i++; + } + table.put(puts); + } finally { + if (table != null) { + table.flushCommits(); + } + } + return table; + } + // A simple BaseRegionbserver impl that allows to add a non-visibility tag from the + // attributes of the Put mutation. The existing cells in the put mutation is overwritten + // with a new cell that has the visibility tags and the non visibility tag + public static class SimpleCP extends BaseRegionObserver { + @Override + public void prePut(ObserverContext e, Put m, WALEdit edit, + Durability durability) throws IOException { + byte[] attribute = m.getAttribute(NON_VISIBILITY); + byte[] cf = null; + List updatedCells = new ArrayList(); + if (attribute != null) { + for (List edits : m.getFamilyCellMap().values()) { + for (Cell cell : edits) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + if (cf == null) { + cf = kv.getFamily(); + } + Tag tag = new Tag((byte) NON_VIS_TAG_TYPE, attribute); + List tagList = new ArrayList(); + tagList.add(tag); + tagList.addAll(kv.getTags()); + byte[] fromList = Tag.fromList(tagList); + KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0, + kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(), + kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0, + kv.getValueLength(), tagList); + ((List) updatedCells).add(newKV); + } + } + m.getFamilyCellMap().remove(cf); + // Update the family map + m.getFamilyCellMap().put(cf, updatedCells); + } + } + } + + public static class TestCoprocessorForTagsAtSink extends BaseRegionObserver { + public static List tags = null; + + @Override + public void postGetOp(ObserverContext e, Get get, + List results) throws IOException { + if (results.size() > 0) { + // Check tag presence in the 1st cell in 1st Result + if (!results.isEmpty()) { + Cell cell = results.get(0); + tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + } + } + } + } + + /** + * An extn of VisibilityReplicationEndpoint to verify the tags that are replicated + */ + public static class VisibilityReplicationEndPointForTest extends VisibilityReplicationEndpoint { + static AtomicInteger replicateCount = new AtomicInteger(); + static volatile List lastEntries = null; + + public VisibilityReplicationEndPointForTest(ReplicationEndpoint endpoint, + VisibilityLabelService visibilityLabelsService) { + super(endpoint, visibilityLabelsService); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + boolean ret = super.replicate(replicateContext); + lastEntries = replicateContext.getEntries(); + replicateCount.incrementAndGet(); + return ret; + } + } +}