From 44fadb89c296ef8f788c9b1cb68dadfb48d7d760 Mon Sep 17 00:00:00 2001 From: Bahram Chehrazy Date: Mon, 1 Apr 2019 22:50:27 -0700 Subject: [PATCH] Reset cached rssStub on region servers as soon as the master changes --- .../hbase/regionserver/HRegionServer.java | 19 +++++-- .../hbase/master/TestSplitLogManager.java | 2 +- .../TestMasterAddressTracker.java | 57 ++++++++++++++++++- .../hbase/zookeeper/MasterAddressTracker.java | 16 +++++- .../hadoop/hbase/zookeeper/ZKNodeTracker.java | 9 +++ 5 files changed, 93 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bcb1a07b12..3ebc50d878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -629,7 +629,14 @@ public class HRegionServer extends HasThread implements this.csm = new ZkCoordinatedStateManager(this); } - masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); + Runnable masterZnodeChanged = new Runnable() { + @Override + public void run() { + // Reset the region server stub when ever the master changes. + rssStub = null; + } + }; + masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this, masterZnodeChanged); masterAddressTracker.start(); clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); @@ -2431,12 +2438,13 @@ public class HRegionServer extends HasThread implements msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); } // Report to the master but only if we have already registered with the master. - if (rssStub != null && this.serverName != null) { + RegionServerStatusService.BlockingInterface rss = rssStub; + if (rss != null && this.serverName != null) { ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); builder.setServer(ProtobufUtil.toServerName(this.serverName)); builder.setErrorMessage(msg); - rssStub.reportRSFatalError(null, builder.build()); + rss.reportRSFatalError(null, builder.build()); } } catch (Throwable t) { LOG.warn("Unable to report fatal error to master", t); @@ -2645,7 +2653,8 @@ public class HRegionServer extends HasThread implements private RegionServerStartupResponse reportForDuty() throws IOException { if (this.masterless) return RegionServerStartupResponse.getDefaultInstance(); ServerName masterServerName = createRegionServerStatusStub(true); - if (masterServerName == null) return null; + RegionServerStatusService.BlockingInterface rss = rssStub; + if (masterServerName == null || rss == null) return null; RegionServerStartupResponse result = null; try { rpcServices.requestCount.reset(); @@ -2664,7 +2673,7 @@ public class HRegionServer extends HasThread implements request.setPort(port); request.setServerStartCode(this.startcode); request.setServerCurrentTime(now); - result = this.rssStub.regionServerStartup(null, request.build()); + result = rss.regionServerStartup(null, request.build()); } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); if (ioe instanceof ClockOutOfSyncException) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 83fe2b9ec1..bde7903852 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -205,7 +205,7 @@ public class TestSplitLogManager { assertEquals(1L, tot_mgr_node_create_queued.sum()); LOG.debug("waiting for task node creation"); - listener.waitForCreation(); + listener.waitForChanges(); LOG.debug("task created"); return tasknode; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java index 58b4e729cd..7408963388 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressTracker.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; import java.util.concurrent.Semaphore; import org.apache.hadoop.hbase.*; @@ -90,7 +91,7 @@ public class TestMasterAddressTracker { ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode); // Should not have a master yet - MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null); + MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null, null); addressTracker.start(); assertFalse(addressTracker.hasMaster()); zk.registerListener(addressTracker); @@ -107,7 +108,7 @@ public class TestMasterAddressTracker { // Wait for the node to be created LOG.info("Waiting for master address manager to be notified"); - listener.waitForCreation(); + listener.waitForChanges(); LOG.info("Master node created"); } return addressTracker; @@ -155,6 +156,41 @@ public class TestMasterAddressTracker { } } + @Test + public void testMasterNodeChangedCallback() throws Exception { + final ServerName sn1 = ServerName.valueOf("server1", 1001, System.currentTimeMillis()); + final ServerName sn2 = ServerName.valueOf("server2", 1001, System.currentTimeMillis()); + final ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null); + + ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode); + Runnable masterChangedCallback = mock(Runnable.class); + MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null, masterChangedCallback); + addressTracker.start(); + + // Use a listener to capture when the node is actually created + NodeCreationListener listener = new NodeCreationListener(zk, zk.getZNodePaths().masterAddressZNode); + zk.registerListener(listener); + // Callback should not have been called yet + verify(masterChangedCallback, times(0)).run(); + + LOG.info("Creating master node"); + MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode, sn1, 1002); + LOG.info("Waiting for master address manager to be notified"); + listener.waitForChanges(); + LOG.info("Master node created"); + verify(masterChangedCallback, times(1)).run(); + + LOG.info("Deleting the master node"); + MasterAddressTracker.deleteIfEquals(zk, sn1.getServerName()); + listener.waitForChanges(); + verify(masterChangedCallback, times(2)).run(); + + LOG.info("Changing master node"); + MasterAddressTracker.setMasterAddress(zk, zk.getZNodePaths().masterAddressZNode, sn2, 1003); + listener.waitForChanges(); + verify(masterChangedCallback, times(3)).run(); + } + @Test public void testNoMaster() throws Exception { final MasterAddressTracker addressTracker = setupMasterTracker(null, 1772); @@ -183,7 +219,22 @@ public class TestMasterAddressTracker { } } - public void waitForCreation() throws InterruptedException { + @Override + public void nodeDataChanged(String path) { + if(path.equals(node)) { + LOG.debug("nodeDataChanged(" + path + ")"); + lock.release(); + } + } + @Override + public void nodeDeleted(String path) { + if(path.equals(node)) { + LOG.debug("nodeDeleted(" + path + ")"); + lock.release(); + } + } + + public void waitForChanges() throws InterruptedException { lock.acquire(); } } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java index 915244e964..b86f155aa4 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java @@ -53,6 +53,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; */ @InterfaceAudience.Private public class MasterAddressTracker extends ZKNodeTracker { + + // To be called when the master znode changes + Runnable masterChangedCallback; + /** * Construct a master address listener with the specified * zookeeper reference. @@ -63,9 +67,11 @@ public class MasterAddressTracker extends ZKNodeTracker { * * @param watcher zk reference and watcher * @param abortable abortable in case of fatal error + * @param masterChangedCallback callback function to be called when the master znode changes */ - public MasterAddressTracker(ZKWatcher watcher, Abortable abortable) { + public MasterAddressTracker(ZKWatcher watcher, Abortable abortable, Runnable masterChangedCallback) { super(watcher, watcher.getZNodePaths().masterAddressZNode, abortable); + this.masterChangedCallback = masterChangedCallback; } /** @@ -278,4 +284,12 @@ public class MasterAddressTracker extends ZKNodeTracker { return false; } + + @Override + protected synchronized void nodeChanged() { + super.nodeChanged(); + if (this.masterChangedCallback != null) { + this.masterChangedCallback.run(); + } + } } diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java index 5806e5b442..1b9d98a26c 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java @@ -199,6 +199,7 @@ public abstract class ZKNodeTracker extends ZKListener { byte [] data = ZKUtil.getDataAndWatch(watcher, node); if (data != null) { this.data = data; + nodeChanged(); notifyAll(); } else { nodeDeleted(path); @@ -216,6 +217,7 @@ public abstract class ZKNodeTracker extends ZKListener { nodeCreated(path); } else { this.data = null; + nodeChanged(); } } catch(KeeperException e) { abortable.abort("Unexpected exception handling nodeDeleted event", e); @@ -230,6 +232,13 @@ public abstract class ZKNodeTracker extends ZKListener { } } + /** + * Called when the znode created, deleted or changed. + */ + protected synchronized void nodeChanged() { + LOG.info("Znode changed: {}", node); + } + /** * Checks if the baseznode set as per the property 'zookeeper.znode.parent' * exists. -- 2.20.1.windows.1