From de74ccb42ab628177b29450e338f61d369d8575b Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 10 May 2017 12:23:05 +0800 Subject: [PATCH] HBASE-17931 Assign system table to servers with highest version --- .../org/apache/hadoop/hbase/util/VersionInfo.java | 40 +++++++++++++++++++++ .../apache/hadoop/hbase/util/TestVersionInfo.java | 35 ++++++++++++++++++ .../hadoop/hbase/master/AssignmentManager.java | 40 +++++++++++++++++++-- .../org/apache/hadoop/hbase/master/HMaster.java | 20 ++++++++--- .../apache/hadoop/hbase/master/MasterServices.java | 2 ++ .../apache/hadoop/hbase/master/ServerManager.java | 41 ++++++++++++++++++++-- .../hbase/zookeeper/RegionServerTracker.java | 21 +++++++++-- 7 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestVersionInfo.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/VersionInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/VersionInfo.java index 44bb89f3c9..43a43bd692 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/VersionInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/VersionInfo.java @@ -108,6 +108,46 @@ public class VersionInfo { } } + public static int compareVersion(String v1, String v2) { + //fast compare equals first + if (v1.equals(v2)) { + return 0; + } + + String s1[] = v1.split("\\.|-");//1.2.3-hotfix -> [1, 2, 3, hotfix] + String s2[] = v2.split("\\.|-"); + int index = 0; + while (index < s1.length && index < s2.length) { + int va = 10000, vb = 10000; + try { + va = Integer.parseInt(s1[index]); + } catch (Exception ingore) { + } + try { + vb = Integer.parseInt(s2[index]); + } catch (Exception ingore) { + } + if (va != vb) { + return va - vb; + } + if (va == 10000) { + // compare as String + int c = s1[index].compareTo(s2[index]); + if (c != 0) { + return c; + } + } + index++; + } + if (index < s1.length) { + // s1 is longer + return 1; + } + //s2 is longer + return -1; + } + + public static void main(String[] args) { writeTo(System.out); } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestVersionInfo.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestVersionInfo.java new file mode 100644 index 0000000000..896c5241ec --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestVersionInfo.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestVersionInfo { + + @Test + public void testCompareVersion() { + assertTrue(VersionInfo.compareVersion("1.0.0", "0.98.11") > 0); + assertTrue(VersionInfo.compareVersion("0.98.11", "1.0.1") < 0); + assertTrue(VersionInfo.compareVersion("2.0.0", "1.4.0") > 0); + assertTrue(VersionInfo.compareVersion("2.0.0", "2.0.0-SNAPSHOT") < 0); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 69ebd97e6e..6daa6cc669 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,12 +57,14 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionStateListener; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; @@ -89,6 +92,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.ipc.RemoteException; @@ -1250,18 +1254,44 @@ public class AssignmentManager { } /** + * Get a list of servers that this region can not assign to. + * For system table, we must assign them to a server with highest version. + * RS will report to master before register on zk, and only when RS have registered on zk we can + * know the version. So in fact we will never assign a system region to a RS without registering on zk. + */ + public List getSystemTableExcludeServers() { + List> serverList = serverManager.getOnlineServersList().stream() + .map((s) -> (new Pair<>(s, server.getRegionServerVersion(s)))).collect(Collectors.toList()); + if (serverList.isEmpty()) { + return new ArrayList<>(); + } + String highestVersion = Collections.max(serverList, + (Pair o1, Pair o2) -> VersionInfo + .compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); + return serverList.stream() + .filter((pair) -> (!pair.getSecond().equals(highestVersion))) + .map((pair) -> (pair.getFirst())) + .collect(Collectors.toList()); + } + + + /** * @param region the region to assign * @param forceNewPlan If true, then if an existing plan exists, a new plan * will be generated. * @return Plan for passed region (If none currently, it creates one or * if no servers to assign, it returns null). */ - private RegionPlan getRegionPlan(final HRegionInfo region, + public RegionPlan getRegionPlan(final HRegionInfo region, final boolean forceNewPlan) throws HBaseIOException { // Pickup existing plan or make a new one final String encodedName = region.getEncodedName(); + List exclude = null; + if (region.isSystemTable()) { + exclude = getSystemTableExcludeServers(); + } final List destServers = - serverManager.createDestinationServersList(); + serverManager.createDestinationServersList(exclude); if (destServers.isEmpty()){ LOG.warn("Can't move " + encodedName + @@ -2079,6 +2109,12 @@ public class AssignmentManager { return isCarryingRegion(serverName, metaHri); } + public List getCarryingSystemTables(ServerName serverName) throws IOException { + return this.getRegionStates().getServerRegions(serverName).stream() + .filter(HRegionInfo::isSystemTable) + .collect(Collectors.toList()); + } + /** * Check if the shutdown server carries the specific region. * @return whether the serverName currently hosts the region diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e4ba285c78..ea00a1ccd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -890,6 +890,9 @@ public class HMaster extends HRegionServer implements MasterServices { configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.hfileCleaner); + serverManager.checkIfShouldMoveSystemRegion(); + regionServerTracker.masterInited(); + // Set master as 'initialized'. setInitialized(true); @@ -1548,7 +1551,7 @@ public class HMaster extends HRegionServer implements MasterServices { @VisibleForTesting // Public so can be accessed by tests. public void move(final byte[] encodedRegionName, - final byte[] destServerName) throws HBaseIOException { + byte[] destServerName) throws HBaseIOException { RegionState regionState = assignmentManager.getRegionStates(). getRegionState(Bytes.toString(encodedRegionName)); @@ -1560,11 +1563,20 @@ public class HMaster extends HRegionServer implements MasterServices { } ServerName dest; + List exclude = hri.isSystemTable()? assignmentManager.getSystemTableExcludeServers() + : new ArrayList<>(1); + if (exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) { + LOG.info( + Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName) + + " because the server is in exclude list"); + destServerName = null; + } + if (destServerName == null || destServerName.length == 0) { LOG.info("Passed destination servername is null/empty so " + "choosing a server at random"); - final List destServers = this.serverManager.createDestinationServersList( - regionState.getServerName()); + exclude.add(regionState.getServerName()); + final List destServers = this.serverManager.createDestinationServersList(exclude); dest = balancer.randomAssignment(hri, destServers); if (dest == null) { LOG.debug("Unable to determine a plan to assign " + hri); @@ -2444,7 +2456,7 @@ public class HMaster extends HRegionServer implements MasterServices { if (info != null && info.hasVersionInfo()) { return info.getVersionInfo().getVersion(); } - return "Unknown"; + return "0.0.0"; //Lowest version to prevent move system region to unknown version RS. } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 4924d72217..d7a2fed285 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -478,4 +478,6 @@ public interface MasterServices extends Server { * @return {@link LockManager} to lock namespaces/tables/regions. */ LockManager getLockManager(); + + public String getRegionServerVersion(final ServerName sn); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index db0a0e5b43..6d455a9ab8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -578,6 +579,40 @@ public class ServerManager { return ZKUtil.listChildrenNoWatch(zkw, zkw.znodePaths.rsZNode); } + public synchronized void checkIfShouldMoveSystemRegion() throws IOException { + // RS register on ZK after reports startup on master + List regionsShouldMove = new ArrayList<>(); + for (ServerName server : master.getAssignmentManager().getSystemTableExcludeServers()) { + while (true) { + try { + regionsShouldMove.addAll(master.getAssignmentManager().getCarryingSystemTables(server)); + break; + } catch (IOException e) { + LOG.warn("get system table assignment failed, will retry " + e.getMessage()); + Threads.sleep(2000); + } + } + + } + if (!regionsShouldMove.isEmpty()) { + AssignmentManager am = master.getAssignmentManager(); + List plans = new ArrayList<>(); + for (HRegionInfo regionInfo : regionsShouldMove) { + RegionPlan plan = am.getRegionPlan(regionInfo, true); + if (regionInfo.isMetaRegion()) { + // Must move meta region first. + am.balance(plan); + } else { + plans.add(plan); + } + } + for (RegionPlan plan : plans) { + am.balance(plan); + } + } + + } + /* * Expire the passed server. Add it to list of dead servers and queue a * shutdown processing. @@ -1211,11 +1246,11 @@ public class ServerManager { * the draining or dying servers. * @param serverToExclude can be null if there is no server to exclude */ - public List createDestinationServersList(final ServerName serverToExclude){ + public List createDestinationServersList(final List serversToExclude){ final List destServers = getOnlineServersList(); - if (serverToExclude != null){ - destServers.remove(serverToExclude); + if (serversToExclude != null){ + destServers.removeAll(serversToExclude); } // Loop through the draining server list and remove them from the server list diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java index 69cd233176..183c369ce8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java @@ -51,6 +51,7 @@ public class RegionServerTracker extends ZooKeeperListener { private NavigableMap regionServers = new TreeMap<>(); private ServerManager serverManager; private Server server; + private volatile boolean masterInited = false; public RegionServerTracker(ZooKeeperWatcher watcher, Server server, ServerManager serverManager) { @@ -71,10 +72,14 @@ public class RegionServerTracker extends ZooKeeperListener { watcher.registerListener(this); List servers = ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.rsZNode); - add(servers); + refresh(servers); } - private void add(final List servers) throws IOException { + public void masterInited() { + masterInited = true; + } + + private void refresh(final List servers) throws IOException { synchronized(this.regionServers) { this.regionServers.clear(); for (String n: servers) { @@ -102,6 +107,16 @@ public class RegionServerTracker extends ZooKeeperListener { } } } + + if (masterInited) { + new Thread(() -> { + try { + serverManager.checkIfShouldMoveSystemRegion(); + } catch (IOException e) { + LOG.error(e); + } + }).start(); + } } private void remove(final ServerName sn) { @@ -134,7 +149,7 @@ public class RegionServerTracker extends ZooKeeperListener { try { List servers = ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.rsZNode); - add(servers); + refresh(servers); } catch (IOException e) { server.abort("Unexpected zk exception getting RS nodes", e); } catch (KeeperException e) { -- 2.11.0 (Apple Git-81)