diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java new file mode 100644 index 0000000..8680e19 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; + +/** + * Get notification of assignment events. The invocations are inline + * so make sure your implementation is fast else you'll slow hbase. + */ +@InterfaceAudience.Private +public interface AssignmentListener { + /** + * The region was opened on the specified server. + * @param regionInfo The opened region. + * @param serverName The remote servers name. + */ + void regionOpened(final HRegionInfo regionInfo, final ServerName serverName); + + /** + * The region was closed on the region server. + * @param regionInfo The closed region. + * @param serverName The remote servers name. + */ + void regionClosed(final HRegionInfo regionInfo); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index ba9e103..4dcd3e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -32,6 +32,7 @@ import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -222,6 +223,9 @@ public class AssignmentManager extends ZooKeeperListener { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") public static boolean TEST_SKIP_SPLIT_HANDLING = false; + /** Listeners that are called on assignment events. */ + private List listeners = new CopyOnWriteArrayList(); + /** * Constructs a new assignment manager. * @@ -284,6 +288,22 @@ public class AssignmentManager extends ZooKeeperListener { } /** + * Add the listener to the notification list. + * @param listener The AssignmentListener to register + */ + public void registerListener(final AssignmentListener listener) { + this.listeners.add(listener); + } + + /** + * Remove the listener from the notification list. + * @param listener The AssignmentListener to unregister + */ + public boolean unregisterListener(final AssignmentListener listener) { + return this.listeners.remove(listener); + } + + /** * @return Instance of ZKTableStateManager. */ public TableStateManager getTableStateManager() { @@ -600,6 +620,7 @@ public class AssignmentManager extends ZooKeeperListener { // server. If that server is online, when we reload the meta, the // region is put back to online, we need to offline it. regionStates.regionOffline(regionInfo); + sendRegionClosedNotification(regionInfo); } // Put it back in transition so that SSH can re-assign it regionStates.updateRegionState(regionInfo, State.OFFLINE, sn); @@ -1246,6 +1267,9 @@ public class AssignmentManager extends ZooKeeperListener { // Remove plan if one. clearRegionPlan(regionInfo); balancer.regionOnline(regionInfo, sn); + + // Tell our listeners that a region was opened + sendRegionOpenedNotification(regionInfo, sn); } /** @@ -1628,12 +1652,12 @@ public class AssignmentManager extends ZooKeeperListener { regionOffline(region); } return; - } else if ((t instanceof FailedServerException) || (state != null && + } else if ((t instanceof FailedServerException) || (state != null && t instanceof RegionAlreadyInTransitionException)) { long sleepTime = 0; Configuration conf = this.server.getConfiguration(); if(t instanceof FailedServerException) { - sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); } else { // RS is already processing this region, only need to update the timestamp @@ -1981,9 +2005,9 @@ public class AssignmentManager extends ZooKeeperListener { } else if(plan.getDestination().equals(newPlan.getDestination()) && previousException instanceof FailedServerException) { try { - LOG.info("Trying to re-assign " + region.getRegionNameAsString() + + LOG.info("Trying to re-assign " + region.getRegionNameAsString() + " to the same failed server."); - Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, + Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT)); } catch (InterruptedException ie) { LOG.warn("Failed to assign " @@ -3289,6 +3313,26 @@ public class AssignmentManager extends ZooKeeperListener { // remove the region plan as well just in case. clearRegionPlan(regionInfo); balancer.regionOffline(regionInfo); + + // Tell our listeners that a region was closed + sendRegionClosedNotification(regionInfo); + } + + private void sendRegionOpenedNotification(final HRegionInfo regionInfo, + final ServerName serverName) { + if (!this.listeners.isEmpty()) { + for (AssignmentListener listener : this.listeners) { + listener.regionOpened(regionInfo, serverName); + } + } + } + + private void sendRegionClosedNotification(final HRegionInfo regionInfo) { + if (!this.listeners.isEmpty()) { + for (AssignmentListener listener : this.listeners) { + listener.regionClosed(regionInfo); + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java new file mode 100644 index 0000000..bce3712 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerListener.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; + +/** + * Get notification of server events. The invocations are inline + * so make sure your implementation is fast else you'll slow hbase. + */ +@InterfaceAudience.Private +public interface ServerListener { + /** + * The server has joined the cluster. + * @param serverName The remote servers name. + */ + void serverAdded(final ServerName serverName); + + /** + * The server was removed from the cluster. + * @param serverName The remote servers name. + */ + void serverRemoved(final ServerName serverName); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 63b48f5..e209162 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -178,6 +179,9 @@ public class ServerManager { */ private Map requeuedDeadServers = new HashMap(); + /** Listeners that are called on server events. */ + private List listeners = new CopyOnWriteArrayList(); + /** * Constructor. * @param master @@ -212,6 +216,22 @@ public class ServerManager { } /** + * Add the listener to the notification list. + * @param listener The ServerListener to register + */ + public void registerListener(final ServerListener listener) { + this.listeners.add(listener); + } + + /** + * Remove the listener from the notification list. + * @param listener The ServerListener to unregister + */ + public boolean unregisterListener(final ServerListener listener) { + return this.listeners.remove(listener); + } + + /** * Let the server manager know a new regionserver has come online * @param ia The remote address * @param port The remote port @@ -305,6 +325,14 @@ public class ServerManager { } recordNewServerWithLock(serverName, sl); } + + // Tell our listeners that a server was added + if (!this.listeners.isEmpty()) { + for (ServerListener listener : this.listeners) { + listener.serverAdded(serverName); + } + } + // Note that we assume that same ts means same server, and don't expire in that case. // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky. if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) { @@ -582,6 +610,13 @@ public class ServerManager { } LOG.debug("Added=" + serverName + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + + // Tell our listeners that a server was removed + if (!this.listeners.isEmpty()) { + for (ServerListener listener : this.listeners) { + listener.serverRemoved(serverName); + } + } } public synchronized void processDeadServer(final ServerName serverName) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java new file mode 100644 index 0000000..e960945 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentListener.java @@ -0,0 +1,260 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Joiner; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestAssignmentListener { + private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static class DummyListener { + protected AtomicInteger modified = new AtomicInteger(0); + + public void awaitModifications(int count) throws InterruptedException { + while (!modified.compareAndSet(count, 0)) { + Thread.sleep(100); + } + } + } + + static class DummyAssignmentListener extends DummyListener implements AssignmentListener { + private AtomicInteger closeCount = new AtomicInteger(0); + private AtomicInteger openCount = new AtomicInteger(0); + + public DummyAssignmentListener() { + } + + public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) { + LOG.info("Assignment open region=" + regionInfo + " server=" + serverName); + openCount.incrementAndGet(); + modified.incrementAndGet(); + } + + public void regionClosed(final HRegionInfo regionInfo) { + LOG.info("Assignment close region=" + regionInfo); + closeCount.incrementAndGet(); + modified.incrementAndGet(); + } + + public void reset() { + openCount.set(0); + closeCount.set(0); + } + + public int getLoadCount() { + return openCount.get(); + } + + public int getCloseCount() { + return closeCount.get(); + } + } + + static class DummyServerListener extends DummyListener implements ServerListener { + private AtomicInteger removedCount = new AtomicInteger(0); + private AtomicInteger addedCount = new AtomicInteger(0); + + public DummyServerListener() { + } + + public void serverAdded(final ServerName serverName) { + LOG.info("Server added " + serverName); + addedCount.incrementAndGet(); + modified.incrementAndGet(); + } + + public void serverRemoved(final ServerName serverName) { + LOG.info("Server removed " + serverName); + removedCount.incrementAndGet(); + modified.incrementAndGet(); + } + + public void reset() { + addedCount.set(0); + removedCount.set(0); + } + + public int getAddedCount() { + return addedCount.get(); + } + + public int getRemovedCount() { + return removedCount.get(); + } + } + + @BeforeClass + public static void beforeAllTests() throws Exception { + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test(timeout=60000) + public void testServerListener() throws IOException, InterruptedException { + ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager(); + + DummyServerListener listener = new DummyServerListener(); + serverManager.registerListener(listener); + try { + MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster(); + + // Start a new Region Server + miniCluster.startRegionServer(); + listener.awaitModifications(1); + assertEquals(1, listener.getAddedCount()); + assertEquals(0, listener.getRemovedCount()); + + // Start another Region Server + listener.reset(); + miniCluster.startRegionServer(); + listener.awaitModifications(1); + assertEquals(1, listener.getAddedCount()); + assertEquals(0, listener.getRemovedCount()); + + int nrs = miniCluster.getRegionServerThreads().size(); + + // Stop a Region Server + listener.reset(); + miniCluster.stopRegionServer(nrs - 1); + listener.awaitModifications(1); + assertEquals(0, listener.getAddedCount()); + assertEquals(1, listener.getRemovedCount()); + + // Stop another Region Server + listener.reset(); + miniCluster.stopRegionServer(nrs - 2); + listener.awaitModifications(1); + assertEquals(0, listener.getAddedCount()); + assertEquals(1, listener.getRemovedCount()); + } finally { + serverManager.unregisterListener(listener); + } + } + + @Test(timeout=60000) + public void testAssignmentListener() throws IOException, InterruptedException { + AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager(); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + + DummyAssignmentListener listener = new DummyAssignmentListener(); + am.registerListener(listener); + try { + final String TABLE_NAME_STR = "testtb"; + final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR); + final byte[] FAMILY = Bytes.toBytes("cf"); + + // Create a new table, with a single region + LOG.info("Create Table"); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + listener.awaitModifications(1); + assertEquals(1, listener.getLoadCount()); + assertEquals(0, listener.getCloseCount()); + + // Add some data + HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME); + try { + for (int i = 0; i < 10; ++i) { + byte[] key = Bytes.toBytes("row-" + i); + Put put = new Put(key); + put.add(FAMILY, null, key); + table.put(put); + } + } finally { + table.close(); + } + + // Split the table in two + LOG.info("Split Table"); + listener.reset(); + admin.split(TABLE_NAME_STR, "row-3"); + listener.awaitModifications(3); + assertEquals(2, listener.getLoadCount()); // daughters added + assertEquals(1, listener.getCloseCount()); // parent removed + + // Wait for the Regions to be mergeable + MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster(); + int mergeable = 0; + while (mergeable < 2) { + Thread.sleep(100); + admin.majorCompact(TABLE_NAME_STR); + mergeable = 0; + for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) { + for (HRegion region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) { + mergeable += region.isMergeable() ? 1 : 0; + } + } + } + + // Merge the two regions + LOG.info("Merge Regions"); + listener.reset(); + List regions = admin.getTableRegions(TABLE_NAME); + assertEquals(2, regions.size()); + admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), + regions.get(1).getEncodedNameAsBytes(), true); + listener.awaitModifications(3); + assertEquals(1, admin.getTableRegions(TABLE_NAME).size()); + assertEquals(1, listener.getLoadCount()); // new merged region added + assertEquals(2, listener.getCloseCount()); // daughters removed + + // Delete the table + LOG.info("Drop Table"); + listener.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + listener.awaitModifications(1); + assertEquals(0, listener.getLoadCount()); + assertEquals(1, listener.getCloseCount()); + } finally { + am.unregisterListener(listener); + } + } +}