.../coprocessor/BaseRegionServerObserver.java | 6 ++++ .../hbase/coprocessor/RegionServerObserver.java | 9 ++++++ .../regionserver/RegionServerCoprocessorHost.java | 11 ++++++++ .../regionserver/ReplicationSourceManager.java | 33 ++++++++++++++-------- .../hbase/security/access/AccessController.java | 10 +++++-- 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java index 4f51d5b..7a39a47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionServerObserver.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; /** * An abstract class that implements RegionServerObserver. @@ -68,4 +69,9 @@ public class BaseRegionServerObserver implements RegionServerObserver { public void postRollBackMerge(ObserverContext ctx, HRegion regionA, HRegion regionB) throws IOException { } + @Override + public void postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint) { + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java index df1018e..2d0f358 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.MetaMutationAnnotation; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; public interface RegionServerObserver extends Coprocessor { @@ -105,4 +106,12 @@ public interface RegionServerObserver extends Coprocessor { void postRollBackMerge(final ObserverContext ctx, final HRegion regionA, final HRegion regionB) throws IOException; + /** + * This will be called after the user replication endpoint is instantiated. + * @param ctx + * @param endpoint + */ + void postCreateReplicationEndPoint(ObserverContext ctx, + ReplicationEndpoint endpoint); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 46d482c..6dbf0d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving @@ -136,6 +137,16 @@ public class RegionServerCoprocessorHost extends }); } + public void postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext ctx) throws IOException { + oserver.postCreateReplicationEndPoint(ctx, endpoint); + } + }); + } + private static abstract class CoprocessorOperation extends ObserverContext { public CoprocessorOperation() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e196588..4257b24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,7 +43,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; @@ -84,7 +86,7 @@ public class ReplicationSourceManager implements ReplicationListener { // UUID for this cluster private final UUID clusterId; // All about stopping - private final Stoppable stopper; + private final Server server; // All logs we are currently tracking private final Map> hlogsById; // Logs for recovered sources we are currently tracking @@ -111,7 +113,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param replicationPeers * @param replicationTracker * @param conf the configuration to use - * @param stopper the stopper object for this region server + * @param server the server for this region server * @param fs the file system to use * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived @@ -119,7 +121,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public ReplicationSourceManager(final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, - final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir, + final Configuration conf, final Server server, final FileSystem fs, final Path logDir, final Path oldLogDir, final UUID clusterId) { //CopyOnWriteArrayList is thread-safe. //Generally, reading is more than modifying. @@ -127,7 +129,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; - this.stopper = stopper; + this.server = server; this.hlogsById = new HashMap>(); this.hlogsByIdRecoveredQueues = new ConcurrentHashMap>(); this.oldsources = new CopyOnWriteArrayList(); @@ -243,7 +245,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer); + this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -257,7 +259,7 @@ public class ReplicationSourceManager implements ReplicationListener { String message = "Cannot add log to queue when creating a new source, queueId=" + src.getPeerClusterZnode() + ", filename=" + name; - stopper.stop(message); + server.stop(message); throw e; } src.enqueueLog(this.latestPath); @@ -359,7 +361,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param conf the configuration to use * @param fs the file system to use * @param manager the manager to use - * @param stopper the stopper object for this region server + * @param server the server object for this region server * @param peerId the id of the peer cluster * @return the created source * @throws IOException @@ -367,9 +369,13 @@ public class ReplicationSourceManager implements ReplicationListener { protected ReplicationSourceInterface getReplicationSource(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, - final Stoppable stopper, final String peerId, final UUID clusterId, + final Server server, final String peerId, final UUID clusterId, final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) throws IOException { + RegionServerCoprocessorHost rsServerHost = null; + if (server instanceof HRegionServer) { + rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); + } ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -392,6 +398,9 @@ public class ReplicationSourceManager implements ReplicationListener { @SuppressWarnings("rawtypes") Class c = Class.forName(replicationEndpointImpl); replicationEndpoint = (ReplicationEndpoint) c.newInstance(); + if(rsServerHost != null) { + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + } } catch (Exception e) { LOG.warn("Passed replication endpoint implementation throws errors", e); throw new IOException(e); @@ -399,7 +408,7 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(peerId); // init replication source - src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, + src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, replicationEndpoint, metrics); // init replication endpoint @@ -542,7 +551,7 @@ public class ReplicationSourceManager implements ReplicationListener { Thread.currentThread().interrupt(); } // We try to lock that rs' queue directory - if (stopper.isStopped()) { + if (server.isStopped()) { LOG.info("Not transferring queue since we are shutting down"); return; } @@ -578,7 +587,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - stopper, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer); if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index fa87289..16fadc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -45,13 +45,13 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.EndpointObserver; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; @@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2209,4 +2209,10 @@ public class AccessController extends BaseMasterAndRegionObserver @Override public void postRollBackMerge(ObserverContext ctx, HRegion regionA, HRegion regionB) throws IOException { } + + @Override + public void postCreateReplicationEndPoint( + ObserverContext ctx, ReplicationEndpoint endpoint) { + + } }